From 6d811641617c25ce2f767d7a4e4c416f43571850 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Fri, 19 Apr 2024 13:19:14 -0600 Subject: [PATCH] impl PersistenceToRecap pipeline where each resolved event gets an ai recap of each of its threads that have messages persisted under the thread as a Recap column --- config.go | 7 ++ main.go | 44 ++----------- main_test.go | 48 ++++++++------ model/thread.go | 1 + recap.go | 78 +++++++++++++++++++++++ recap_test.go | 50 +++++++++++++++ slack_test.go | 5 +- testdata/slack_events/opsgenie_alert.json | 3 +- 8 files changed, 177 insertions(+), 59 deletions(-) create mode 100644 recap.go create mode 100644 recap_test.go diff --git a/config.go b/config.go index c5cadbb..049c105 100644 --- a/config.go +++ b/config.go @@ -34,6 +34,7 @@ type Config struct { slackToModelPipeline Pipeline slackScrapePipeline Pipeline modelToPersistencePipeline Pipeline + persistenceToRecapPipeline Pipeline } var ( @@ -153,5 +154,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, } result.slackScrapePipeline = slackScrapePipeline + persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result) + if err != nil { + return Config{}, err + } + result.persistenceToRecapPipeline = persistenceToRecapPipeline + return result, nil } diff --git a/main.go b/main.go index f8da0b2..e0b2d64 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,7 @@ func run(ctx context.Context, cfg Config) error { cfg.slackToModelPipeline, cfg.modelToPersistencePipeline, cfg.slackScrapePipeline, + cfg.persistenceToRecapPipeline, ): return err case err := <-listenAndServe(ctx, cfg): @@ -89,7 +90,7 @@ func newHandler(cfg Config) http.HandlerFunc { mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version)) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) - mux.Handle("GET /api/v1/rpc/aievent", http.HandlerFunc(newHandlerGetAPIV1RPCAIEvent(cfg))) + mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg))) return func(w http.ResponseWriter, r *http.Request) { if cfg.Debug { @@ -108,52 +109,19 @@ func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) { json.NewEncoder(w).Encode(map[string]any{"version": Version}) } -func newHandlerGetAPIV1RPCAIEvent(cfg Config) http.HandlerFunc { +func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } event := r.URL.Query().Get("id") - threadSummaries := []string{} - if err := func(ctx context.Context) error { - threads, err := cfg.storage.GetEventThreads(ctx, event) - if err != nil { - return err - } - if len(threads) == 0 { - return nil - } - - for _, thread := range threads { - prompt := fmt.Sprintf("Summarize the Slack thread in 1 sentence. List any suggested follow ups.\n\n---\n\n") - - messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID) - if err != nil { - return err - } - if len(messages) == 0 { - continue - } - - for _, message := range messages { - prompt += fmt.Sprintf("%s\n%s\n\n", message.Author, message.Plaintext) - } - - summary, err := cfg.ai.Do(ctx, prompt) - if err != nil { - return err - } - threadSummaries = append(threadSummaries, summary) - } - - return nil - }(r.Context()); err != nil { + b, _ := json.Marshal(ModelIDs{Event: event}) + if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - - json.NewEncoder(w).Encode(map[string]any{"threads": threadSummaries}) + json.NewEncoder(w).Encode(map[string]any{"event": event}) } } diff --git a/main_test.go b/main_test.go index 6fba337..40865da 100644 --- a/main_test.go +++ b/main_test.go @@ -36,6 +36,7 @@ func TestRun(t *testing.T) { return int(port) }() u := fmt.Sprintf("http://localhost:%d", port) + var err error cfg := Config{} cfg.DatacenterPattern = renderDatacenterPattern @@ -44,12 +45,16 @@ func TestRun(t *testing.T) { cfg.Port = port cfg.driver = NewTestDriver(t) cfg.storage, _ = NewStorage(ctx, cfg.driver) + cfg.ai = NewAINoop() cfg.SlackToken = "redacted" cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg) cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg) cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg) - cfg.ai = NewAINoop() + cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg) + if err != nil { + t.Fatal(err) + } go func() { if err := run(ctx, cfg); err != nil && ctx.Err() == nil { @@ -86,33 +91,40 @@ func TestRun(t *testing.T) { } }) - t.Run("GET /api/v1/rpc/aievent", func(t *testing.T) { - b, err := os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert_3.json")) + t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) { + b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json")) if err != nil { t.Fatal(err) } - resp, err := http.Post(fmt.Sprintf("%s/api/v1/events/slack", u), "application/json", bytes.NewReader(b)) + if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil { + t.Fatal(err) + } + + b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json")) if err != nil { t.Fatal(err) } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - - resp, err = http.Get(fmt.Sprintf("%s/api/v1/rpc/aievent?id=%s", u, "11067")) - if err != nil { + if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil { t.Fatal(err) } - defer resp.Body.Close() - b, _ = io.ReadAll(resp.Body) - if resp.StatusCode != http.StatusOK { - t.Fatalf("(%d) %s", resp.StatusCode, b) + for ctx.Err() == nil { + if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" { + break + } + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond * 100): + } + } + if err := ctx.Err(); err != nil { + t.Fatal("timed out waiting for recap") } - t.Logf("%s", b) + thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409") + if thread.Recap == "" { + t.Error(thread.Recap) + } + t.Log(thread.Recap) }) } diff --git a/model/thread.go b/model/thread.go index 74574e8..ab8bd35 100644 --- a/model/thread.go +++ b/model/thread.go @@ -8,6 +8,7 @@ type Thread struct { TS uint64 Channel string EventID string + Recap string } func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread { diff --git a/recap.go b/recap.go new file mode 100644 index 0000000..afe3e2b --- /dev/null +++ b/recap.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" +) + +type PersistenceToRecap struct { + pipeline Pipeline +} + +func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) { + reader, err := NewQueue(ctx, "new_persistence", cfg.driver) + if err != nil { + return Pipeline{}, err + } + writer := NewNoopQueue() + return Pipeline{ + writer: writer, + reader: reader, + process: newPersistenceToRecapProcess(cfg), + }, nil +} + +func newPersistenceToRecapProcess(cfg Config) processFunc { + return func(ctx context.Context, modelIDs []byte) ([]byte, error) { + var m ModelIDs + if err := json.Unmarshal(modelIDs, &m); err != nil { + return nil, fmt.Errorf("received non model ids payload: %w", err) + } + + if m.Event == "" { + } else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil { + return nil, err + } else if !event.Resolved { + } else if err := func() error { + threads, err := cfg.storage.GetEventThreads(ctx, event.ID) + if err != nil { + return err + } + for _, thread := range threads { + messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID) + if err != nil { + return err + } else if len(messages) < 2 { + continue + } + + prompt := []string{"Summarize the Slack thread in 1 sentence. List any suggested follow ups."} + prompt = append(prompt, "---") + for _, message := range messages { + prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext)) + } + + recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n")) + if err != nil { + return err + } + thread.Recap = recap + if err := cfg.storage.UpsertThread(ctx, thread); err != nil { + return err + } + log.Println("recapped", thread.ID) + } + return nil + }(); err != nil { + return nil, err + } + + if cfg.Debug { + log.Printf("persisted recap") + } + return nil, nil + } +} diff --git a/recap_test.go b/recap_test.go new file mode 100644 index 0000000..518a28e --- /dev/null +++ b/recap_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/breel-render/spoc-bot-vr/model" +) + +func TestNewPersistenceToRecapProcess(t *testing.T) { + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + + d := NewTestDriver(t) + s, _ := NewStorage(ctx, d) + + cfg := Config{ + driver: d, + storage: s, + ai: NewAINoop(), + Debug: true, + } + + proc := newPersistenceToRecapProcess(cfg) + + if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil { + t.Fatal(err) + } else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil { + t.Fatal(err) + } else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil { + t.Fatal(err) + } else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil { + t.Fatal(err) + } + + b, _ := json.Marshal(ModelIDs{Event: "Event"}) + if _, err := proc(ctx, b); err != nil { + t.Error(err) + } + + if thread, err := s.GetThread(ctx, "Thread"); err != nil { + t.Error(err) + } else if thread.Recap == "" { + t.Error("no recap:", thread.Recap) + } else { + t.Logf("%+v", thread) + } +} diff --git a/slack_test.go b/slack_test.go index 9a4c0d4..daa0257 100644 --- a/slack_test.go +++ b/slack_test.go @@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) { "", "", "Datastores Non-Critical", - false, + true, ), Message: model.NewMessage( "1712927439.728409/1712927439", @@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) { Name: "Opsgenie for Alert Management", }, Attachments: []slackAttachment{{ - Color: "F4511E", + Color: "2ecc71", Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed", Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", Fields: []slackField{ @@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) { Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Author: "Opsgenie for Alert Management", Team: "Datastores Non-Critical", + Resolved: true, }, }, "opsgenie_alert_resolved.json": { diff --git a/testdata/slack_events/opsgenie_alert.json b/testdata/slack_events/opsgenie_alert.json index c11d14e..00cbe1f 100644 --- a/testdata/slack_events/opsgenie_alert.json +++ b/testdata/slack_events/opsgenie_alert.json @@ -29,7 +29,8 @@ "attachments": [ { "id": 1, - "color": "F4511E", + "realcolor": "F4511E", + "color": "2ecc71", "fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" \nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", "text": "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", "title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",