Compare commits

...

8 Commits

Author SHA1 Message Date
Bel LaPointe
8557ddc522 boo 2024-04-17 17:32:07 -06:00
Bel LaPointe
1e43c2a14e split 2024-04-17 17:28:48 -06:00
Bel LaPointe
04c574ffec f it 2024-04-17 16:58:06 -06:00
Bel LaPointe
b2f64037e2 GET /api/v1/version 2024-04-17 16:23:15 -06:00
Bel LaPointe
fbd151f9ef when initializing slack, stash token in driver 2024-04-17 16:07:43 -06:00
Bel LaPointe
5f21098fdc test and update asset pattern to catch ip addresses 2024-04-17 03:58:43 -06:00
Bel LaPointe
7c2d663401 do not cry to me 2024-04-16 15:08:21 -06:00
Bel LaPointe
95b0394199 slack can parse optional channel wrapper for scrape 2024-04-16 15:03:24 -06:00
5 changed files with 124 additions and 28 deletions

View File

@@ -38,7 +38,7 @@ type Config struct {
} }
var ( var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]` renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]|ip-[0-9]+-[0-9]+-[0-9]+-[0-9]+\.[a-z]+-[a-z]+-[0-9]+\.compute\.internal`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]` renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)` renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
) )

43
main.go
View File

@@ -85,6 +85,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc { func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
@@ -99,6 +100,12 @@ func newHandler(cfg Config) http.HandlerFunc {
} }
} }
var Version = "undef"
func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
json.NewEncoder(w).Encode(map[string]any{"version": Version})
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) { if !basicAuth(cfg, w, r) {
@@ -159,7 +166,8 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
if cfg.Debug { if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON) log.Printf("rpc/scrapeslack => %s", messageJSON)
} }
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
errs = append(errs, err) errs = append(errs, err)
} else { } else {
n += 1 n += 1
@@ -195,12 +203,13 @@ func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack { if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize return handlerPostAPIV1EventsSlackInitialize(cfg)
} }
return _newHandlerPostAPIV1EventsSlack(cfg) return _newHandlerPostAPIV1EventsSlack(cfg)
} }
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body) b, _ := io.ReadAll(r.Body)
var challenge struct { var challenge struct {
Token string Token string
@@ -211,14 +220,32 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
cfg.driver.ExecContext(r.Context(), `
CREATE TABLE
IF NOT EXISTS
initialization (
label TEXT,
token TEXT,
updated TIMESTAMP
)
`)
if _, err := cfg.driver.ExecContext(r.Context(), `
INSERT
INTO initialization (label, token, updated)
VALUES ('slack_events_webhook_token', $1, $2)
`, challenge.Token, time.Now().UTC()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Println("stashed new slack initialization token", challenge.Token)
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
} }
}
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body) body, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b)) r.Body = io.NopCloser(bytes.NewReader(body))
var allowList struct { var allowList struct {
Token string Token string
@@ -226,7 +253,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
Channel string Channel string
} }
} }
if err := json.Unmarshal(b, &allowList); err != nil { if err := json.Unmarshal(body, &allowList); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} else if allowList.Token != cfg.SlackToken { } else if allowList.Token != cfg.SlackToken {
@@ -243,7 +270,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return return
} }
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil { if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil {
log.Printf("failed to ingest: %v", err) log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -101,7 +102,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
`, reservation) `, reservation)
if err := row.Err(); err != nil { if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err) return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil { } else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
} }

View File

@@ -61,14 +61,7 @@ func newSlackToModelProcess(cfg Config) processFunc {
cfg.DatacenterPattern: &s.Datacenter, cfg.DatacenterPattern: &s.Datacenter,
cfg.EventNamePattern: &s.EventName, cfg.EventNamePattern: &s.EventName,
} { } {
r := regexp.MustCompile(pattern) *ptr = withPattern(pattern, *ptr)
parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
} }
event := model.Event{} event := model.Event{}
@@ -93,6 +86,17 @@ func newSlackToModelProcess(cfg Config) processFunc {
} }
} }
func withPattern(pattern string, given string) string {
r := regexp.MustCompile(pattern)
parsed := r.FindString(given)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(given)[i]
}
}
return parsed
}
type ( type (
parsedSlackMessage struct { parsedSlackMessage struct {
ID string ID string
@@ -227,6 +231,11 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
} }
func _parseSlack(b []byte) (slackMessage, error) { func _parseSlack(b []byte) (slackMessage, error) {
var wrapper ChannelWrapper
if err := json.Unmarshal(b, &wrapper); err == nil && len(wrapper.V) > 0 {
b = wrapper.V
}
var result slackMessage var result slackMessage
err := json.Unmarshal(b, &result) err := json.Unmarshal(b, &result)
switch result.Type { switch result.Type {
@@ -247,6 +256,9 @@ func _parseSlack(b []byte) (slackMessage, error) {
} }
result.Event.PreviousMessage = nil result.Event.PreviousMessage = nil
} }
if wrapper.Channel != "" {
result.Event.Channel = wrapper.Channel
}
return result, err return result, err
} }
@@ -257,3 +269,8 @@ func (this slackEvent) Empty() bool {
func (this parsedSlackMessage) Time() time.Time { func (this parsedSlackMessage) Time() time.Time {
return time.Unix(int64(this.TS), 0) return time.Unix(int64(this.TS), 0)
} }
type ChannelWrapper struct {
Channel string
V json.RawMessage
}

View File

@@ -249,3 +249,54 @@ func TestParseSlackTestdata(t *testing.T) {
}) })
} }
} }
func TestWrappedSlack(t *testing.T) {
b, _ := os.ReadFile("testdata/slack_events/human_thread_message_from_opsgenie_alert.json")
b2, _ := json.Marshal(ChannelWrapper{Channel: "X", V: json.RawMessage(b)})
if got, err := _parseSlack(b); err != nil {
t.Fatal(err)
} else if got2, err := _parseSlack(b2); err != nil {
t.Fatal(err)
} else if got2.Event.Channel != "X" {
t.Error(got2.Event.Channel)
} else if got2.Event.ParentID == "" {
t.Error(got2.Event)
} else if got.Event.ParentID != got2.Event.ParentID {
t.Error(got, got2)
}
}
func TestWithPattern(t *testing.T) {
cases := map[string]struct {
given string
pattern string
want string
}{
"pods unavailable on node": {
given: `pods are unavailable on node ip-12-345-67-890.xx-yyyyy-1.compute.internal.`,
pattern: renderAssetPattern,
want: `ip-12-345-67-890.xx-yyyyy-1.compute.internal`,
},
"redis err": {
given: `Redis instance red-abc123 is emitting Some error repeatedly`,
pattern: renderAssetPattern,
want: `red-abc123`,
},
"pg err": {
given: `db dpg-xyz123 is in a pinch`,
pattern: renderAssetPattern,
want: `dpg-xyz123`,
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
got := withPattern(c.pattern, c.given)
if got != c.want {
t.Errorf("withPattern(%q, %q) expected %q but got %q", c.pattern, c.given, c.want, got)
}
})
}
}