Compare commits
10 Commits
02331752fe
...
d5b84da8f5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5b84da8f5 | ||
|
|
b6526b8360 | ||
|
|
c0977a6b7a | ||
|
|
feb48ee4bc | ||
|
|
88f6746c85 | ||
|
|
5817dce70e | ||
|
|
04c7a5c9e1 | ||
|
|
83c8fccb78 | ||
|
|
cb44dfd49d | ||
|
|
6ca1f83727 |
@@ -19,6 +19,8 @@ type Config struct {
|
||||
SlackToken string
|
||||
SlackChannels string
|
||||
PostgresConn string
|
||||
BasicAuthUser string
|
||||
BasicAuthPassword string
|
||||
storage Storage
|
||||
queue Queue
|
||||
driver Driver
|
||||
|
||||
31
driver.go
31
driver.go
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -22,6 +23,33 @@ type Driver interface {
|
||||
Set(context.Context, string, string, []byte) error
|
||||
}
|
||||
|
||||
func FillWithTestdata(ctx context.Context, driver Driver) error {
|
||||
d := "./testdata/slack_events"
|
||||
entries, err := os.ReadDir(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
b, err := os.ReadFile(path.Join(d, entry.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m, err := ParseSlack(b)
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := driver.Set(nil, "m", m.ID, m.Serialize()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Postgres struct {
|
||||
db *sql.DB
|
||||
}
|
||||
@@ -199,6 +227,9 @@ func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
|
||||
ram.m[ns] = map[string][]byte{}
|
||||
}
|
||||
ram.m[ns][id] = v
|
||||
if v == nil {
|
||||
delete(ram.m[ns], id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -28,6 +28,24 @@ func TestDriverRAM(t *testing.T) {
|
||||
testDriver(t, NewRAM())
|
||||
}
|
||||
|
||||
func TestFillTestdata(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||
defer can()
|
||||
|
||||
ram := NewRAM()
|
||||
if err := FillWithTestdata(ctx, ram); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n := 0
|
||||
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
|
||||
n += 1
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(n)
|
||||
}
|
||||
|
||||
func TestDriverBBolt(t *testing.T) {
|
||||
testDriver(t, NewTestDBIn(t.TempDir()))
|
||||
}
|
||||
@@ -76,6 +94,12 @@ func testDriver(t *testing.T, d Driver) {
|
||||
t.Error("cannot set from full:", err)
|
||||
}
|
||||
|
||||
if err := d.ForEach(ctx, "m", func(string, []byte) error {
|
||||
return errors.New("should have no hits")
|
||||
}); err != nil {
|
||||
t.Error("failed to forEach empty:", err)
|
||||
}
|
||||
|
||||
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
||||
t.Error("cannot get from deleted:", err)
|
||||
} else if b != nil {
|
||||
|
||||
20
main.go
20
main.go
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@@ -51,6 +52,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
||||
errc := make(chan error)
|
||||
go func() {
|
||||
defer close(errc)
|
||||
log.Printf("listening on %s", s.Addr)
|
||||
errc <- s.ListenAndServe()
|
||||
}()
|
||||
|
||||
@@ -61,6 +63,7 @@ func newHandler(cfg Config) http.HandlerFunc {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Message(cfg)))
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if cfg.Debug {
|
||||
@@ -73,6 +76,17 @@ func newHandler(cfg Config) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Message(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
|
||||
http.Error(w, "shoo", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
if cfg.InitializeSlack {
|
||||
return handlerPostAPIV1EventsSlackInitialize
|
||||
@@ -117,14 +131,18 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
}
|
||||
|
||||
m, err := ParseSlack(b)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
return
|
||||
} else if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||
log.Printf("failed to ingest %+v: %v", m, err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
log.Printf("ingested %+v", m)
|
||||
}
|
||||
}
|
||||
|
||||
34
message.go
34
message.go
@@ -5,6 +5,11 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
@@ -17,12 +22,17 @@ type Message struct {
|
||||
Event string
|
||||
Plaintext string
|
||||
Asset string
|
||||
Resolved bool
|
||||
}
|
||||
|
||||
func (m Message) Empty() bool {
|
||||
return m == (Message{})
|
||||
}
|
||||
|
||||
func (m Message) Time() time.Time {
|
||||
return time.Unix(int64(m.TS), 0)
|
||||
}
|
||||
|
||||
func (m Message) Serialize() []byte {
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
@@ -54,6 +64,9 @@ type (
|
||||
slackEvent struct {
|
||||
ID string `json:"event_ts"`
|
||||
Channel string
|
||||
// rewrites
|
||||
Nested *slackEvent `json:"message"`
|
||||
PreviousMessage *slackEvent `json:"previous_message"`
|
||||
// human
|
||||
ParentID string `json:"thread_ts"`
|
||||
Text string
|
||||
@@ -100,9 +113,9 @@ func ParseSlack(b []byte) (Message, error) {
|
||||
|
||||
if s.Event.Bot.Name != "" {
|
||||
if len(s.Event.Attachments) == 0 {
|
||||
return Message{}, errors.New("bot message has no attachments")
|
||||
return Message{}, ErrIrrelevantMessage
|
||||
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
|
||||
return Message{}, errors.New("bot message attachment is not Firing")
|
||||
return Message{}, ErrIrrelevantMessage
|
||||
}
|
||||
return Message{
|
||||
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
|
||||
@@ -114,6 +127,7 @@ func ParseSlack(b []byte) (Message, error) {
|
||||
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
|
||||
Plaintext: s.Event.Attachments[0].Text,
|
||||
Asset: "TODO",
|
||||
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -133,5 +147,21 @@ func ParseSlack(b []byte) (Message, error) {
|
||||
func parseSlack(b []byte) (slackMessage, error) {
|
||||
var result slackMessage
|
||||
err := json.Unmarshal(b, &result)
|
||||
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
||||
result.Event.Blocks = result.Event.Nested.Blocks
|
||||
result.Event.Bot = result.Event.Nested.Bot
|
||||
result.Event.Attachments = result.Event.Nested.Attachments
|
||||
result.Event.Nested = nil
|
||||
}
|
||||
if result.Event.PreviousMessage != nil {
|
||||
if result.Event.PreviousMessage.ID != "" {
|
||||
result.Event.ID = result.Event.PreviousMessage.ID
|
||||
}
|
||||
result.Event.PreviousMessage = nil
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (this slackEvent) Empty() bool {
|
||||
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
|
||||
}
|
||||
|
||||
@@ -79,6 +79,41 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Asset: "TODO",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert_resolved.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712916339,
|
||||
Event: slackEvent{
|
||||
ID: "1712916339.000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Bot: slackBot{
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "2ecc71",
|
||||
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
{Value: "P3", Title: "Priority"},
|
||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||
},
|
||||
Actions: []slackAction{},
|
||||
}},
|
||||
},
|
||||
},
|
||||
message: Message{
|
||||
ID: "1712916339.000300/1712916339",
|
||||
TS: 1712916339,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712916339.000300",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "#11069",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "TODO",
|
||||
Resolved: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, d := range cases {
|
||||
@@ -107,6 +142,9 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
if got != want.message {
|
||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
||||
}
|
||||
if time := got.Time(); time.Unix() != int64(got.TS) {
|
||||
t.Error("not unix time", got.TS, time)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,10 +11,7 @@ func TestQueue(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
db := NewTestDBIn(t.TempDir())
|
||||
defer db.Close()
|
||||
|
||||
q := NewQueue(db)
|
||||
q := NewQueue(NewRAM())
|
||||
|
||||
for i := 0; i < 39; i++ {
|
||||
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
|
||||
|
||||
50
storage.go
50
storage.go
@@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -17,12 +19,56 @@ func NewStorage(driver Driver) Storage {
|
||||
return Storage{driver: driver}
|
||||
}
|
||||
|
||||
func (s Storage) Threads(ctx context.Context) ([]string, error) {
|
||||
return s.ThreadsSince(ctx, time.Unix(0, 0))
|
||||
}
|
||||
|
||||
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
|
||||
messages, err := s.messagesWhere(ctx, func(m Message) bool {
|
||||
return !t.After(m.Time())
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
threads := map[string]struct{}{}
|
||||
for _, m := range messages {
|
||||
threads[m.Thread] = struct{}{}
|
||||
}
|
||||
result := make([]string, 0, len(threads))
|
||||
for k := range threads {
|
||||
result = append(result, k)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
|
||||
return s.messagesWhere(ctx, func(m Message) bool {
|
||||
return m.Thread == thread
|
||||
})
|
||||
}
|
||||
|
||||
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
|
||||
result := make([]Message, 0)
|
||||
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
|
||||
m := MustDeserialize(v)
|
||||
if !where(m) {
|
||||
return nil
|
||||
}
|
||||
result = append(result, m)
|
||||
return nil
|
||||
})
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return result[i].TS < result[j].TS
|
||||
})
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s Storage) Upsert(ctx context.Context, m Message) error {
|
||||
return s.driver.Set(ctx, "storage", m.ID, m.Serialize())
|
||||
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
|
||||
}
|
||||
|
||||
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
|
||||
b, err := s.driver.Get(ctx, "storage", id)
|
||||
b, err := s.driver.Get(ctx, "m", id)
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
|
||||
@@ -2,18 +2,51 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
//func newStorageFromTestdata(t *testing.T) {
|
||||
|
||||
func TestStorage(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
||||
defer can()
|
||||
|
||||
db := NewTestDBIn(t.TempDir())
|
||||
defer db.Close()
|
||||
t.Run("Threads", func(t *testing.T) {
|
||||
s := NewStorage(NewRAM())
|
||||
|
||||
s := NewStorage(db)
|
||||
mX1 := Message{ID: "1", Thread: "X", TS: 1}
|
||||
mX2 := Message{ID: "2", Thread: "X", TS: 2}
|
||||
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
|
||||
|
||||
for _, m := range []Message{mX1, mX2, mY1} {
|
||||
if err := s.Upsert(ctx, m); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
if threads, err := s.Threads(ctx); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(threads) != 2 {
|
||||
t.Error(threads)
|
||||
} else if !slices.Contains(threads, "X") {
|
||||
t.Error(threads, "X")
|
||||
} else if !slices.Contains(threads, "Y") {
|
||||
t.Error(threads, "Y")
|
||||
}
|
||||
|
||||
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(threads) != 1 {
|
||||
t.Error(threads)
|
||||
} else if threads[0] != "Y" {
|
||||
t.Error(threads[0])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Get Upsert", func(t *testing.T) {
|
||||
s := NewStorage(NewRAM())
|
||||
|
||||
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
|
||||
t.Error("failed to get 404", err)
|
||||
@@ -33,4 +66,5 @@ func TestStorage(t *testing.T) {
|
||||
} else if m != m2 {
|
||||
t.Error(m2)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user