Compare commits

...

10 Commits

Author SHA1 Message Date
Bel LaPointe
d5b84da8f5 move FillWithTestdata from RAM method to function 2024-04-12 13:35:27 -06:00
Bel LaPointe
b6526b8360 impl NewRam().Fill() to load testdata 2024-04-12 13:33:06 -06:00
Bel LaPointe
c0977a6b7a return ErrIrrelevantMessage and log http server ignored messages 2024-04-12 13:32:53 -06:00
Bel LaPointe
feb48ee4bc Fix alert resolved via bot messages 2024-04-12 13:25:04 -06:00
Bel LaPointe
88f6746c85 add storage.Threads, storage.ThreadsSince, storage.Thread 2024-04-12 13:02:18 -06:00
Bel LaPointe
5817dce70e swap to RAM for test driver 2024-04-12 13:02:06 -06:00
Bel LaPointe
04c7a5c9e1 add message.Time() 2024-04-12 13:01:42 -06:00
Bel LaPointe
83c8fccb78 test RAM driver cant delete 2024-04-12 13:01:27 -06:00
Bel LaPointe
cb44dfd49d fix RAM driver cant delete 2024-04-12 13:00:56 -06:00
Bel LaPointe
6ca1f83727 stub handling GET /.../messages 2024-04-12 11:23:30 -06:00
9 changed files with 256 additions and 36 deletions

View File

@@ -19,6 +19,8 @@ type Config struct {
SlackToken string
SlackChannels string
PostgresConn string
BasicAuthUser string
BasicAuthPassword string
storage Storage
queue Queue
driver Driver

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"
@@ -22,6 +23,33 @@ type Driver interface {
Set(context.Context, string, string, []byte) error
}
func FillWithTestdata(ctx context.Context, driver Driver) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
b, err := os.ReadFile(path.Join(d, entry.Name()))
if err != nil {
return err
}
m, err := ParseSlack(b)
if errors.Is(err, ErrIrrelevantMessage) {
continue
} else if err != nil {
return err
}
if err := driver.Set(nil, "m", m.ID, m.Serialize()); err != nil {
return err
}
}
return nil
}
type Postgres struct {
db *sql.DB
}
@@ -199,6 +227,9 @@ func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.m[ns] = map[string][]byte{}
}
ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil
}

View File

@@ -28,6 +28,24 @@ func TestDriverRAM(t *testing.T) {
testDriver(t, NewRAM())
}
func TestFillTestdata(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram); err != nil {
t.Fatal(err)
}
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir()))
}
@@ -76,6 +94,12 @@ func testDriver(t *testing.T, d Driver) {
t.Error("cannot set from full:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {

20
main.go
View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -51,6 +52,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
errc := make(chan error)
go func() {
defer close(errc)
log.Printf("listening on %s", s.Addr)
errc <- s.ListenAndServe()
}()
@@ -61,6 +63,7 @@ func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Message(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
@@ -73,6 +76,17 @@ func newHandler(cfg Config) http.HandlerFunc {
}
}
func newHandlerGetAPIV1Message(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize
@@ -117,14 +131,18 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
}
m, err := ParseSlack(b)
if err != nil {
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("ingested %+v", m)
}
}

View File

@@ -5,6 +5,11 @@ import (
"errors"
"fmt"
"strings"
"time"
)
var (
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
)
type Message struct {
@@ -17,12 +22,17 @@ type Message struct {
Event string
Plaintext string
Asset string
Resolved bool
}
func (m Message) Empty() bool {
return m == (Message{})
}
func (m Message) Time() time.Time {
return time.Unix(int64(m.TS), 0)
}
func (m Message) Serialize() []byte {
b, err := json.Marshal(m)
if err != nil {
@@ -54,6 +64,9 @@ type (
slackEvent struct {
ID string `json:"event_ts"`
Channel string
// rewrites
Nested *slackEvent `json:"message"`
PreviousMessage *slackEvent `json:"previous_message"`
// human
ParentID string `json:"thread_ts"`
Text string
@@ -100,9 +113,9 @@ func ParseSlack(b []byte) (Message, error) {
if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 {
return Message{}, errors.New("bot message has no attachments")
return Message{}, ErrIrrelevantMessage
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return Message{}, errors.New("bot message attachment is not Firing")
return Message{}, ErrIrrelevantMessage
}
return Message{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
@@ -114,6 +127,7 @@ func ParseSlack(b []byte) (Message, error) {
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
Plaintext: s.Event.Attachments[0].Text,
Asset: "TODO",
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
}, nil
}
@@ -133,5 +147,21 @@ func ParseSlack(b []byte) (Message, error) {
func parseSlack(b []byte) (slackMessage, error) {
var result slackMessage
err := json.Unmarshal(b, &result)
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot
result.Event.Attachments = result.Event.Nested.Attachments
result.Event.Nested = nil
}
if result.Event.PreviousMessage != nil {
if result.Event.PreviousMessage.ID != "" {
result.Event.ID = result.Event.PreviousMessage.ID
}
result.Event.PreviousMessage = nil
}
return result, err
}
func (this slackEvent) Empty() bool {
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
}

View File

@@ -79,6 +79,41 @@ func TestParseSlackTestdata(t *testing.T) {
Asset: "TODO",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: Message{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "#11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "TODO",
Resolved: true,
},
},
}
for name, d := range cases {
@@ -107,6 +142,9 @@ func TestParseSlackTestdata(t *testing.T) {
if got != want.message {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}

View File

@@ -11,10 +11,7 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
db := NewTestDBIn(t.TempDir())
defer db.Close()
q := NewQueue(db)
q := NewQueue(NewRAM())
for i := 0; i < 39; i++ {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {

View File

@@ -3,6 +3,8 @@ package main
import (
"context"
"errors"
"sort"
"time"
)
var (
@@ -17,12 +19,56 @@ func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
messages, err := s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
if err != nil {
return nil, err
}
threads := map[string]struct{}{}
for _, m := range messages {
threads[m.Thread] = struct{}{}
}
result := make([]string, 0, len(threads))
for k := range threads {
result = append(result, k)
}
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "storage", m.ID, m.Serialize())
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "storage", id)
b, err := s.driver.Get(ctx, "m", id)
if err != nil {
return Message{}, err
}

View File

@@ -2,18 +2,51 @@ package main
import (
"context"
"slices"
"testing"
"time"
)
//func newStorageFromTestdata(t *testing.T) {
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second)
defer can()
db := NewTestDBIn(t.TempDir())
defer db.Close()
t.Run("Threads", func(t *testing.T) {
s := NewStorage(NewRAM())
s := NewStorage(db)
mX1 := Message{ID: "1", Thread: "X", TS: 1}
mX2 := Message{ID: "2", Thread: "X", TS: 2}
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
for _, m := range []Message{mX1, mX2, mY1} {
if err := s.Upsert(ctx, m); err != nil {
t.Fatal(err)
}
}
if threads, err := s.Threads(ctx); err != nil {
t.Error(err)
} else if len(threads) != 2 {
t.Error(threads)
} else if !slices.Contains(threads, "X") {
t.Error(threads, "X")
} else if !slices.Contains(threads, "Y") {
t.Error(threads, "Y")
}
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
t.Error(err)
} else if len(threads) != 1 {
t.Error(threads)
} else if threads[0] != "Y" {
t.Error(threads[0])
}
})
t.Run("Get Upsert", func(t *testing.T) {
s := NewStorage(NewRAM())
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
t.Error("failed to get 404", err)
@@ -33,4 +66,5 @@ func TestStorage(t *testing.T) {
} else if m != m2 {
t.Error(m2)
}
})
}