Compare commits
8 Commits
d9d91193dd
...
8557ddc522
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8557ddc522 | ||
|
|
1e43c2a14e | ||
|
|
04c574ffec | ||
|
|
b2f64037e2 | ||
|
|
fbd151f9ef | ||
|
|
5f21098fdc | ||
|
|
7c2d663401 | ||
|
|
95b0394199 |
@@ -38,7 +38,7 @@ type Config struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
|
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]|ip-[0-9]+-[0-9]+-[0-9]+-[0-9]+\.[a-z]+-[a-z]+-[0-9]+\.compute\.internal`
|
||||||
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
||||||
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
|
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
|
||||||
)
|
)
|
||||||
|
|||||||
43
main.go
43
main.go
@@ -85,6 +85,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|||||||
func newHandler(cfg Config) http.HandlerFunc {
|
func newHandler(cfg Config) http.HandlerFunc {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
|
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||||
|
|
||||||
@@ -99,6 +100,12 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var Version = "undef"
|
||||||
|
|
||||||
|
func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
||||||
|
}
|
||||||
|
|
||||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !basicAuth(cfg, w, r) {
|
if !basicAuth(cfg, w, r) {
|
||||||
@@ -159,7 +166,8 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
|||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
||||||
}
|
}
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
} else {
|
} else {
|
||||||
n += 1
|
n += 1
|
||||||
@@ -195,12 +203,13 @@ func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
|
|||||||
|
|
||||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||||
if cfg.InitializeSlack {
|
if cfg.InitializeSlack {
|
||||||
return handlerPostAPIV1EventsSlackInitialize
|
return handlerPostAPIV1EventsSlackInitialize(cfg)
|
||||||
}
|
}
|
||||||
return _newHandlerPostAPIV1EventsSlack(cfg)
|
return _newHandlerPostAPIV1EventsSlack(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
|
func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
b, _ := io.ReadAll(r.Body)
|
b, _ := io.ReadAll(r.Body)
|
||||||
var challenge struct {
|
var challenge struct {
|
||||||
Token string
|
Token string
|
||||||
@@ -211,14 +220,32 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques
|
|||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cfg.driver.ExecContext(r.Context(), `
|
||||||
|
CREATE TABLE
|
||||||
|
IF NOT EXISTS
|
||||||
|
initialization (
|
||||||
|
label TEXT,
|
||||||
|
token TEXT,
|
||||||
|
updated TIMESTAMP
|
||||||
|
)
|
||||||
|
`)
|
||||||
|
if _, err := cfg.driver.ExecContext(r.Context(), `
|
||||||
|
INSERT
|
||||||
|
INTO initialization (label, token, updated)
|
||||||
|
VALUES ('slack_events_webhook_token', $1, $2)
|
||||||
|
`, challenge.Token, time.Now().UTC()); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("stashed new slack initialization token", challenge.Token)
|
||||||
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
b, _ := io.ReadAll(r.Body)
|
body, _ := io.ReadAll(r.Body)
|
||||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
||||||
|
|
||||||
var allowList struct {
|
var allowList struct {
|
||||||
Token string
|
Token string
|
||||||
@@ -226,7 +253,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
Channel string
|
Channel string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(b, &allowList); err != nil {
|
if err := json.Unmarshal(body, &allowList); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
} else if allowList.Token != cfg.SlackToken {
|
} else if allowList.Token != cfg.SlackToken {
|
||||||
@@ -243,7 +270,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil {
|
||||||
log.Printf("failed to ingest: %v", err)
|
log.Printf("failed to ingest: %v", err)
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|||||||
3
queue.go
3
queue.go
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -101,7 +102,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
`, reservation)
|
`, reservation)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||||
} else if err := row.Scan(&payload); err != nil {
|
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
||||||
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
33
slack.go
33
slack.go
@@ -61,14 +61,7 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
cfg.DatacenterPattern: &s.Datacenter,
|
cfg.DatacenterPattern: &s.Datacenter,
|
||||||
cfg.EventNamePattern: &s.EventName,
|
cfg.EventNamePattern: &s.EventName,
|
||||||
} {
|
} {
|
||||||
r := regexp.MustCompile(pattern)
|
*ptr = withPattern(pattern, *ptr)
|
||||||
parsed := r.FindString(*ptr)
|
|
||||||
for i, name := range r.SubexpNames() {
|
|
||||||
if i > 0 && name != "" {
|
|
||||||
parsed = r.FindStringSubmatch(*ptr)[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*ptr = parsed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
event := model.Event{}
|
event := model.Event{}
|
||||||
@@ -93,6 +86,17 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func withPattern(pattern string, given string) string {
|
||||||
|
r := regexp.MustCompile(pattern)
|
||||||
|
parsed := r.FindString(given)
|
||||||
|
for i, name := range r.SubexpNames() {
|
||||||
|
if i > 0 && name != "" {
|
||||||
|
parsed = r.FindStringSubmatch(given)[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parsed
|
||||||
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
parsedSlackMessage struct {
|
parsedSlackMessage struct {
|
||||||
ID string
|
ID string
|
||||||
@@ -227,6 +231,11 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func _parseSlack(b []byte) (slackMessage, error) {
|
func _parseSlack(b []byte) (slackMessage, error) {
|
||||||
|
var wrapper ChannelWrapper
|
||||||
|
if err := json.Unmarshal(b, &wrapper); err == nil && len(wrapper.V) > 0 {
|
||||||
|
b = wrapper.V
|
||||||
|
}
|
||||||
|
|
||||||
var result slackMessage
|
var result slackMessage
|
||||||
err := json.Unmarshal(b, &result)
|
err := json.Unmarshal(b, &result)
|
||||||
switch result.Type {
|
switch result.Type {
|
||||||
@@ -247,6 +256,9 @@ func _parseSlack(b []byte) (slackMessage, error) {
|
|||||||
}
|
}
|
||||||
result.Event.PreviousMessage = nil
|
result.Event.PreviousMessage = nil
|
||||||
}
|
}
|
||||||
|
if wrapper.Channel != "" {
|
||||||
|
result.Event.Channel = wrapper.Channel
|
||||||
|
}
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,3 +269,8 @@ func (this slackEvent) Empty() bool {
|
|||||||
func (this parsedSlackMessage) Time() time.Time {
|
func (this parsedSlackMessage) Time() time.Time {
|
||||||
return time.Unix(int64(this.TS), 0)
|
return time.Unix(int64(this.TS), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ChannelWrapper struct {
|
||||||
|
Channel string
|
||||||
|
V json.RawMessage
|
||||||
|
}
|
||||||
|
|||||||
@@ -249,3 +249,54 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWrappedSlack(t *testing.T) {
|
||||||
|
b, _ := os.ReadFile("testdata/slack_events/human_thread_message_from_opsgenie_alert.json")
|
||||||
|
b2, _ := json.Marshal(ChannelWrapper{Channel: "X", V: json.RawMessage(b)})
|
||||||
|
|
||||||
|
if got, err := _parseSlack(b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got2, err := _parseSlack(b2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got2.Event.Channel != "X" {
|
||||||
|
t.Error(got2.Event.Channel)
|
||||||
|
} else if got2.Event.ParentID == "" {
|
||||||
|
t.Error(got2.Event)
|
||||||
|
} else if got.Event.ParentID != got2.Event.ParentID {
|
||||||
|
t.Error(got, got2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWithPattern(t *testing.T) {
|
||||||
|
cases := map[string]struct {
|
||||||
|
given string
|
||||||
|
pattern string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
"pods unavailable on node": {
|
||||||
|
given: `pods are unavailable on node ip-12-345-67-890.xx-yyyyy-1.compute.internal.`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `ip-12-345-67-890.xx-yyyyy-1.compute.internal`,
|
||||||
|
},
|
||||||
|
"redis err": {
|
||||||
|
given: `Redis instance red-abc123 is emitting Some error repeatedly`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `red-abc123`,
|
||||||
|
},
|
||||||
|
"pg err": {
|
||||||
|
given: `db dpg-xyz123 is in a pinch`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `dpg-xyz123`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, d := range cases {
|
||||||
|
c := d
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
got := withPattern(c.pattern, c.given)
|
||||||
|
if got != c.want {
|
||||||
|
t.Errorf("withPattern(%q, %q) expected %q but got %q", c.pattern, c.given, c.want, got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user