Compare commits

...

10 Commits

Author SHA1 Message Date
Bel LaPointe
d5b84da8f5 move FillWithTestdata from RAM method to function 2024-04-12 13:35:27 -06:00
Bel LaPointe
b6526b8360 impl NewRam().Fill() to load testdata 2024-04-12 13:33:06 -06:00
Bel LaPointe
c0977a6b7a return ErrIrrelevantMessage and log http server ignored messages 2024-04-12 13:32:53 -06:00
Bel LaPointe
feb48ee4bc Fix alert resolved via bot messages 2024-04-12 13:25:04 -06:00
Bel LaPointe
88f6746c85 add storage.Threads, storage.ThreadsSince, storage.Thread 2024-04-12 13:02:18 -06:00
Bel LaPointe
5817dce70e swap to RAM for test driver 2024-04-12 13:02:06 -06:00
Bel LaPointe
04c7a5c9e1 add message.Time() 2024-04-12 13:01:42 -06:00
Bel LaPointe
83c8fccb78 test RAM driver cant delete 2024-04-12 13:01:27 -06:00
Bel LaPointe
cb44dfd49d fix RAM driver cant delete 2024-04-12 13:00:56 -06:00
Bel LaPointe
6ca1f83727 stub handling GET /.../messages 2024-04-12 11:23:30 -06:00
9 changed files with 256 additions and 36 deletions

View File

@@ -13,15 +13,17 @@ import (
) )
type Config struct { type Config struct {
Port int Port int
Debug bool Debug bool
InitializeSlack bool InitializeSlack bool
SlackToken string SlackToken string
SlackChannels string SlackChannels string
PostgresConn string PostgresConn string
storage Storage BasicAuthUser string
queue Queue BasicAuthPassword string
driver Driver storage Storage
queue Queue
driver Driver
} }
func newConfig(ctx context.Context) (Config, error) { func newConfig(ctx context.Context) (Config, error) {

View File

@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"path" "path"
"sync" "sync"
"time" "time"
@@ -22,6 +23,33 @@ type Driver interface {
Set(context.Context, string, string, []byte) error Set(context.Context, string, string, []byte) error
} }
func FillWithTestdata(ctx context.Context, driver Driver) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
b, err := os.ReadFile(path.Join(d, entry.Name()))
if err != nil {
return err
}
m, err := ParseSlack(b)
if errors.Is(err, ErrIrrelevantMessage) {
continue
} else if err != nil {
return err
}
if err := driver.Set(nil, "m", m.ID, m.Serialize()); err != nil {
return err
}
}
return nil
}
type Postgres struct { type Postgres struct {
db *sql.DB db *sql.DB
} }
@@ -199,6 +227,9 @@ func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.m[ns] = map[string][]byte{} ram.m[ns] = map[string][]byte{}
} }
ram.m[ns][id] = v ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil return nil
} }

View File

@@ -28,6 +28,24 @@ func TestDriverRAM(t *testing.T) {
testDriver(t, NewRAM()) testDriver(t, NewRAM())
} }
func TestFillTestdata(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram); err != nil {
t.Fatal(err)
}
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) { func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir())) testDriver(t, NewTestDBIn(t.TempDir()))
} }
@@ -76,6 +94,12 @@ func testDriver(t *testing.T, d Driver) {
t.Error("cannot set from full:", err) t.Error("cannot set from full:", err)
} }
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil { if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err) t.Error("cannot get from deleted:", err)
} else if b != nil { } else if b != nil {

20
main.go
View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
@@ -51,6 +52,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
errc := make(chan error) errc := make(chan error)
go func() { go func() {
defer close(errc) defer close(errc)
log.Printf("listening on %s", s.Addr)
errc <- s.ListenAndServe() errc <- s.ListenAndServe()
}() }()
@@ -61,6 +63,7 @@ func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Message(cfg)))
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug { if cfg.Debug {
@@ -73,6 +76,17 @@ func newHandler(cfg Config) http.HandlerFunc {
} }
} }
func newHandlerGetAPIV1Message(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack { if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize return handlerPostAPIV1EventsSlackInitialize
@@ -117,14 +131,18 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
} }
m, err := ParseSlack(b) m, err := ParseSlack(b)
if err != nil { if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
if err := cfg.storage.Upsert(r.Context(), m); err != nil { if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
log.Printf("ingested %+v", m)
} }
} }

View File

@@ -5,6 +5,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"time"
)
var (
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
) )
type Message struct { type Message struct {
@@ -17,12 +22,17 @@ type Message struct {
Event string Event string
Plaintext string Plaintext string
Asset string Asset string
Resolved bool
} }
func (m Message) Empty() bool { func (m Message) Empty() bool {
return m == (Message{}) return m == (Message{})
} }
func (m Message) Time() time.Time {
return time.Unix(int64(m.TS), 0)
}
func (m Message) Serialize() []byte { func (m Message) Serialize() []byte {
b, err := json.Marshal(m) b, err := json.Marshal(m)
if err != nil { if err != nil {
@@ -54,6 +64,9 @@ type (
slackEvent struct { slackEvent struct {
ID string `json:"event_ts"` ID string `json:"event_ts"`
Channel string Channel string
// rewrites
Nested *slackEvent `json:"message"`
PreviousMessage *slackEvent `json:"previous_message"`
// human // human
ParentID string `json:"thread_ts"` ParentID string `json:"thread_ts"`
Text string Text string
@@ -100,9 +113,9 @@ func ParseSlack(b []byte) (Message, error) {
if s.Event.Bot.Name != "" { if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 { if len(s.Event.Attachments) == 0 {
return Message{}, errors.New("bot message has no attachments") return Message{}, ErrIrrelevantMessage
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") { } else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return Message{}, errors.New("bot message attachment is not Firing") return Message{}, ErrIrrelevantMessage
} }
return Message{ return Message{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS), ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
@@ -114,6 +127,7 @@ func ParseSlack(b []byte) (Message, error) {
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0], Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
Plaintext: s.Event.Attachments[0].Text, Plaintext: s.Event.Attachments[0].Text,
Asset: "TODO", Asset: "TODO",
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
}, nil }, nil
} }
@@ -133,5 +147,21 @@ func ParseSlack(b []byte) (Message, error) {
func parseSlack(b []byte) (slackMessage, error) { func parseSlack(b []byte) (slackMessage, error) {
var result slackMessage var result slackMessage
err := json.Unmarshal(b, &result) err := json.Unmarshal(b, &result)
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot
result.Event.Attachments = result.Event.Nested.Attachments
result.Event.Nested = nil
}
if result.Event.PreviousMessage != nil {
if result.Event.PreviousMessage.ID != "" {
result.Event.ID = result.Event.PreviousMessage.ID
}
result.Event.PreviousMessage = nil
}
return result, err return result, err
} }
func (this slackEvent) Empty() bool {
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
}

View File

@@ -79,6 +79,41 @@ func TestParseSlackTestdata(t *testing.T) {
Asset: "TODO", Asset: "TODO",
}, },
}, },
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: Message{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "#11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "TODO",
Resolved: true,
},
},
} }
for name, d := range cases { for name, d := range cases {
@@ -107,6 +142,9 @@ func TestParseSlackTestdata(t *testing.T) {
if got != want.message { if got != want.message {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got) t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
} }
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
}) })
}) })
} }

View File

@@ -11,10 +11,7 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
db := NewTestDBIn(t.TempDir()) q := NewQueue(NewRAM())
defer db.Close()
q := NewQueue(db)
for i := 0; i < 39; i++ { for i := 0; i < 39; i++ {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil { if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {

View File

@@ -3,6 +3,8 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"sort"
"time"
) )
var ( var (
@@ -17,12 +19,56 @@ func NewStorage(driver Driver) Storage {
return Storage{driver: driver} return Storage{driver: driver}
} }
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
messages, err := s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
if err != nil {
return nil, err
}
threads := map[string]struct{}{}
for _, m := range messages {
threads[m.Thread] = struct{}{}
}
result := make([]string, 0, len(threads))
for k := range threads {
result = append(result, k)
}
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error { func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "storage", m.ID, m.Serialize()) return s.driver.Set(ctx, "m", m.ID, m.Serialize())
} }
func (s Storage) Get(ctx context.Context, id string) (Message, error) { func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "storage", id) b, err := s.driver.Get(ctx, "m", id)
if err != nil { if err != nil {
return Message{}, err return Message{}, err
} }

View File

@@ -2,35 +2,69 @@ package main
import ( import (
"context" "context"
"slices"
"testing" "testing"
"time" "time"
) )
//func newStorageFromTestdata(t *testing.T) {
func TestStorage(t *testing.T) { func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second) ctx, can := context.WithTimeout(context.Background(), time.Second)
defer can() defer can()
db := NewTestDBIn(t.TempDir()) t.Run("Threads", func(t *testing.T) {
defer db.Close() s := NewStorage(NewRAM())
s := NewStorage(db) mX1 := Message{ID: "1", Thread: "X", TS: 1}
mX2 := Message{ID: "2", Thread: "X", TS: 2}
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
if _, err := s.Get(ctx, "id"); err != ErrNotFound { for _, m := range []Message{mX1, mX2, mY1} {
t.Error("failed to get 404", err) if err := s.Upsert(ctx, m); err != nil {
} t.Fatal(err)
}
}
m := Message{ if threads, err := s.Threads(ctx); err != nil {
ID: "id", t.Error(err)
TS: 1, } else if len(threads) != 2 {
} t.Error(threads)
} else if !slices.Contains(threads, "X") {
t.Error(threads, "X")
} else if !slices.Contains(threads, "Y") {
t.Error(threads, "Y")
}
if err := s.Upsert(ctx, m); err != nil { if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
t.Error("failed to upsert", err) t.Error(err)
} } else if len(threads) != 1 {
t.Error(threads)
} else if threads[0] != "Y" {
t.Error(threads[0])
}
})
if m2, err := s.Get(ctx, "id"); err != nil { t.Run("Get Upsert", func(t *testing.T) {
t.Error("failed to get", err) s := NewStorage(NewRAM())
} else if m != m2 {
t.Error(m2) if _, err := s.Get(ctx, "id"); err != ErrNotFound {
} t.Error("failed to get 404", err)
}
m := Message{
ID: "id",
TS: 1,
}
if err := s.Upsert(ctx, m); err != nil {
t.Error("failed to upsert", err)
}
if m2, err := s.Get(ctx, "id"); err != nil {
t.Error("failed to get", err)
} else if m != m2 {
t.Error(m2)
}
})
} }