impl PersistenceToRecap pipeline where each resolved event gets an ai recap of each of its threads that have messages persisted under the thread as a Recap column

main
Bel LaPointe 2024-04-19 13:19:14 -06:00
parent 20256bd6b4
commit 6d81164161
8 changed files with 177 additions and 59 deletions

View File

@ -34,6 +34,7 @@ type Config struct {
slackToModelPipeline Pipeline
slackScrapePipeline Pipeline
modelToPersistencePipeline Pipeline
persistenceToRecapPipeline Pipeline
}
var (
@ -153,5 +154,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.slackScrapePipeline = slackScrapePipeline
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToRecapPipeline = persistenceToRecapPipeline
return result, nil
}

44
main.go
View File

@ -40,6 +40,7 @@ func run(ctx context.Context, cfg Config) error {
cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline,
cfg.slackScrapePipeline,
cfg.persistenceToRecapPipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
@ -89,7 +90,7 @@ func newHandler(cfg Config) http.HandlerFunc {
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
mux.Handle("GET /api/v1/rpc/aievent", http.HandlerFunc(newHandlerGetAPIV1RPCAIEvent(cfg)))
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
@ -108,52 +109,19 @@ func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
json.NewEncoder(w).Encode(map[string]any{"version": Version})
}
func newHandlerGetAPIV1RPCAIEvent(cfg Config) http.HandlerFunc {
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
event := r.URL.Query().Get("id")
threadSummaries := []string{}
if err := func(ctx context.Context) error {
threads, err := cfg.storage.GetEventThreads(ctx, event)
if err != nil {
return err
}
if len(threads) == 0 {
return nil
}
for _, thread := range threads {
prompt := fmt.Sprintf("Summarize the Slack thread in 1 sentence. List any suggested follow ups.\n\n---\n\n")
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
if err != nil {
return err
}
if len(messages) == 0 {
continue
}
for _, message := range messages {
prompt += fmt.Sprintf("%s\n%s\n\n", message.Author, message.Plaintext)
}
summary, err := cfg.ai.Do(ctx, prompt)
if err != nil {
return err
}
threadSummaries = append(threadSummaries, summary)
}
return nil
}(r.Context()); err != nil {
b, _ := json.Marshal(ModelIDs{Event: event})
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"threads": threadSummaries})
json.NewEncoder(w).Encode(map[string]any{"event": event})
}
}

View File

@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
return int(port)
}()
u := fmt.Sprintf("http://localhost:%d", port)
var err error
cfg := Config{}
cfg.DatacenterPattern = renderDatacenterPattern
@ -44,12 +45,16 @@ func TestRun(t *testing.T) {
cfg.Port = port
cfg.driver = NewTestDriver(t)
cfg.storage, _ = NewStorage(ctx, cfg.driver)
cfg.ai = NewAINoop()
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
cfg.ai = NewAINoop()
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
if err != nil {
t.Fatal(err)
}
go func() {
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
@ -86,33 +91,40 @@ func TestRun(t *testing.T) {
}
})
t.Run("GET /api/v1/rpc/aievent", func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert_3.json"))
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
resp, err := http.Post(fmt.Sprintf("%s/api/v1/events/slack", u), "application/json", bytes.NewReader(b))
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
resp, err = http.Get(fmt.Sprintf("%s/api/v1/rpc/aievent?id=%s", u, "11067"))
if err != nil {
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
b, _ = io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Fatalf("(%d) %s", resp.StatusCode, b)
for ctx.Err() == nil {
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
break
}
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 100):
}
}
if err := ctx.Err(); err != nil {
t.Fatal("timed out waiting for recap")
}
t.Logf("%s", b)
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
if thread.Recap == "" {
t.Error(thread.Recap)
}
t.Log(thread.Recap)
})
}

View File

@ -8,6 +8,7 @@ type Thread struct {
TS uint64
Channel string
EventID string
Recap string
}
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {

78
recap.go Normal file
View File

@ -0,0 +1,78 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
)
type PersistenceToRecap struct {
pipeline Pipeline
}
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer := NewNoopQueue()
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToRecapProcess(cfg),
}, nil
}
func newPersistenceToRecapProcess(cfg Config) processFunc {
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
var m ModelIDs
if err := json.Unmarshal(modelIDs, &m); err != nil {
return nil, fmt.Errorf("received non model ids payload: %w", err)
}
if m.Event == "" {
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
return nil, err
} else if !event.Resolved {
} else if err := func() error {
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
if err != nil {
return err
}
for _, thread := range threads {
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
if err != nil {
return err
} else if len(messages) < 2 {
continue
}
prompt := []string{"Summarize the Slack thread in 1 sentence. List any suggested follow ups."}
prompt = append(prompt, "---")
for _, message := range messages {
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
}
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
if err != nil {
return err
}
thread.Recap = recap
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
return err
}
log.Println("recapped", thread.ID)
}
return nil
}(); err != nil {
return nil, err
}
if cfg.Debug {
log.Printf("persisted recap")
}
return nil, nil
}
}

50
recap_test.go Normal file
View File

@ -0,0 +1,50 @@
package main
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestNewPersistenceToRecapProcess(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
s, _ := NewStorage(ctx, d)
cfg := Config{
driver: d,
storage: s,
ai: NewAINoop(),
Debug: true,
}
proc := newPersistenceToRecapProcess(cfg)
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
t.Fatal(err)
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
t.Fatal(err)
}
b, _ := json.Marshal(ModelIDs{Event: "Event"})
if _, err := proc(ctx, b); err != nil {
t.Error(err)
}
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
t.Error(err)
} else if thread.Recap == "" {
t.Error("no recap:", thread.Recap)
} else {
t.Logf("%+v", thread)
}
}

View File

@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
"",
"",
"Datastores Non-Critical",
false,
true,
),
Message: model.NewMessage(
"1712927439.728409/1712927439",
@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Color: "2ecc71",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical",
Resolved: true,
},
},
"opsgenie_alert_resolved.json": {

View File

@ -29,7 +29,8 @@
"attachments": [
{
"id": 1,
"color": "F4511E",
"realcolor": "F4511E",
"color": "2ecc71",
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",