Compare commits

...

10 Commits

Author SHA1 Message Date
Bel LaPointe
d5b84da8f5 move FillWithTestdata from RAM method to function 2024-04-12 13:35:27 -06:00
Bel LaPointe
b6526b8360 impl NewRam().Fill() to load testdata 2024-04-12 13:33:06 -06:00
Bel LaPointe
c0977a6b7a return ErrIrrelevantMessage and log http server ignored messages 2024-04-12 13:32:53 -06:00
Bel LaPointe
feb48ee4bc Fix alert resolved via bot messages 2024-04-12 13:25:04 -06:00
Bel LaPointe
88f6746c85 add storage.Threads, storage.ThreadsSince, storage.Thread 2024-04-12 13:02:18 -06:00
Bel LaPointe
5817dce70e swap to RAM for test driver 2024-04-12 13:02:06 -06:00
Bel LaPointe
04c7a5c9e1 add message.Time() 2024-04-12 13:01:42 -06:00
Bel LaPointe
83c8fccb78 test RAM driver cant delete 2024-04-12 13:01:27 -06:00
Bel LaPointe
cb44dfd49d fix RAM driver cant delete 2024-04-12 13:00:56 -06:00
Bel LaPointe
6ca1f83727 stub handling GET /.../messages 2024-04-12 11:23:30 -06:00
9 changed files with 256 additions and 36 deletions

View File

@@ -13,15 +13,17 @@ import (
)
type Config struct {
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels string
PostgresConn string
storage Storage
queue Queue
driver Driver
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels string
PostgresConn string
BasicAuthUser string
BasicAuthPassword string
storage Storage
queue Queue
driver Driver
}
func newConfig(ctx context.Context) (Config, error) {

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"
@@ -22,6 +23,33 @@ type Driver interface {
Set(context.Context, string, string, []byte) error
}
func FillWithTestdata(ctx context.Context, driver Driver) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
b, err := os.ReadFile(path.Join(d, entry.Name()))
if err != nil {
return err
}
m, err := ParseSlack(b)
if errors.Is(err, ErrIrrelevantMessage) {
continue
} else if err != nil {
return err
}
if err := driver.Set(nil, "m", m.ID, m.Serialize()); err != nil {
return err
}
}
return nil
}
type Postgres struct {
db *sql.DB
}
@@ -199,6 +227,9 @@ func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.m[ns] = map[string][]byte{}
}
ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil
}

View File

@@ -28,6 +28,24 @@ func TestDriverRAM(t *testing.T) {
testDriver(t, NewRAM())
}
func TestFillTestdata(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram); err != nil {
t.Fatal(err)
}
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir()))
}
@@ -76,6 +94,12 @@ func testDriver(t *testing.T, d Driver) {
t.Error("cannot set from full:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {

20
main.go
View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -51,6 +52,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
errc := make(chan error)
go func() {
defer close(errc)
log.Printf("listening on %s", s.Addr)
errc <- s.ListenAndServe()
}()
@@ -61,6 +63,7 @@ func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Message(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
@@ -73,6 +76,17 @@ func newHandler(cfg Config) http.HandlerFunc {
}
}
func newHandlerGetAPIV1Message(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize
@@ -117,14 +131,18 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
}
m, err := ParseSlack(b)
if err != nil {
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("ingested %+v", m)
}
}

View File

@@ -5,6 +5,11 @@ import (
"errors"
"fmt"
"strings"
"time"
)
var (
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
)
type Message struct {
@@ -17,12 +22,17 @@ type Message struct {
Event string
Plaintext string
Asset string
Resolved bool
}
func (m Message) Empty() bool {
return m == (Message{})
}
func (m Message) Time() time.Time {
return time.Unix(int64(m.TS), 0)
}
func (m Message) Serialize() []byte {
b, err := json.Marshal(m)
if err != nil {
@@ -54,6 +64,9 @@ type (
slackEvent struct {
ID string `json:"event_ts"`
Channel string
// rewrites
Nested *slackEvent `json:"message"`
PreviousMessage *slackEvent `json:"previous_message"`
// human
ParentID string `json:"thread_ts"`
Text string
@@ -100,9 +113,9 @@ func ParseSlack(b []byte) (Message, error) {
if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 {
return Message{}, errors.New("bot message has no attachments")
return Message{}, ErrIrrelevantMessage
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return Message{}, errors.New("bot message attachment is not Firing")
return Message{}, ErrIrrelevantMessage
}
return Message{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
@@ -114,6 +127,7 @@ func ParseSlack(b []byte) (Message, error) {
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
Plaintext: s.Event.Attachments[0].Text,
Asset: "TODO",
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
}, nil
}
@@ -133,5 +147,21 @@ func ParseSlack(b []byte) (Message, error) {
func parseSlack(b []byte) (slackMessage, error) {
var result slackMessage
err := json.Unmarshal(b, &result)
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot
result.Event.Attachments = result.Event.Nested.Attachments
result.Event.Nested = nil
}
if result.Event.PreviousMessage != nil {
if result.Event.PreviousMessage.ID != "" {
result.Event.ID = result.Event.PreviousMessage.ID
}
result.Event.PreviousMessage = nil
}
return result, err
}
func (this slackEvent) Empty() bool {
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
}

View File

@@ -79,6 +79,41 @@ func TestParseSlackTestdata(t *testing.T) {
Asset: "TODO",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: Message{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "#11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "TODO",
Resolved: true,
},
},
}
for name, d := range cases {
@@ -107,6 +142,9 @@ func TestParseSlackTestdata(t *testing.T) {
if got != want.message {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}

View File

@@ -11,10 +11,7 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
db := NewTestDBIn(t.TempDir())
defer db.Close()
q := NewQueue(db)
q := NewQueue(NewRAM())
for i := 0; i < 39; i++ {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {

View File

@@ -3,6 +3,8 @@ package main
import (
"context"
"errors"
"sort"
"time"
)
var (
@@ -17,12 +19,56 @@ func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
messages, err := s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
if err != nil {
return nil, err
}
threads := map[string]struct{}{}
for _, m := range messages {
threads[m.Thread] = struct{}{}
}
result := make([]string, 0, len(threads))
for k := range threads {
result = append(result, k)
}
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "storage", m.ID, m.Serialize())
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "storage", id)
b, err := s.driver.Get(ctx, "m", id)
if err != nil {
return Message{}, err
}

View File

@@ -2,35 +2,69 @@ package main
import (
"context"
"slices"
"testing"
"time"
)
//func newStorageFromTestdata(t *testing.T) {
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second)
defer can()
db := NewTestDBIn(t.TempDir())
defer db.Close()
t.Run("Threads", func(t *testing.T) {
s := NewStorage(NewRAM())
s := NewStorage(db)
mX1 := Message{ID: "1", Thread: "X", TS: 1}
mX2 := Message{ID: "2", Thread: "X", TS: 2}
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
t.Error("failed to get 404", err)
}
for _, m := range []Message{mX1, mX2, mY1} {
if err := s.Upsert(ctx, m); err != nil {
t.Fatal(err)
}
}
m := Message{
ID: "id",
TS: 1,
}
if threads, err := s.Threads(ctx); err != nil {
t.Error(err)
} else if len(threads) != 2 {
t.Error(threads)
} else if !slices.Contains(threads, "X") {
t.Error(threads, "X")
} else if !slices.Contains(threads, "Y") {
t.Error(threads, "Y")
}
if err := s.Upsert(ctx, m); err != nil {
t.Error("failed to upsert", err)
}
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
t.Error(err)
} else if len(threads) != 1 {
t.Error(threads)
} else if threads[0] != "Y" {
t.Error(threads[0])
}
})
if m2, err := s.Get(ctx, "id"); err != nil {
t.Error("failed to get", err)
} else if m != m2 {
t.Error(m2)
}
t.Run("Get Upsert", func(t *testing.T) {
s := NewStorage(NewRAM())
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
t.Error("failed to get 404", err)
}
m := Message{
ID: "id",
TS: 1,
}
if err := s.Upsert(ctx, m); err != nil {
t.Error("failed to upsert", err)
}
if m2, err := s.Get(ctx, "id"); err != nil {
t.Error("failed to get", err)
} else if m != m2 {
t.Error(m2)
}
})
}