package main import ( "context" "encoding/json" "fmt" "log" "strings" ) type PersistenceToRecap struct { pipeline Pipeline } func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) { reader, err := NewQueue(ctx, "new_persistence", cfg.driver) if err != nil { return Pipeline{}, err } writer := NewNoopQueue() return Pipeline{ writer: writer, reader: reader, process: newPersistenceToRecapProcess(cfg), }, nil } func newPersistenceToRecapProcess(cfg Config) processFunc { return func(ctx context.Context, modelIDs []byte) ([]byte, error) { var m ModelIDs if err := json.Unmarshal(modelIDs, &m); err != nil { return nil, fmt.Errorf("received non model ids payload: %w", err) } if m.Event == "" { } else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil { return nil, err } else if !event.Resolved { } else if err := func() error { threads, err := cfg.storage.GetEventThreads(ctx, event.ID) if err != nil { return err } for _, thread := range threads { messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID) if err != nil { return err } else if len(messages) < 2 { continue } prompt := []string{ cfg.RecapPromptIntro, "---", messages[0].Plaintext, "---", cfg.RecapPrompt, "---", } for _, message := range messages[1:] { prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext)) } recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n")) if err != nil { return err } thread.Recap = recap if err := cfg.storage.UpsertThread(ctx, thread); err != nil { return err } log.Println("recapped", thread.ID) if cfg.Debug { log.Printf("Recapped %q as %q from %q/%q and %+v", thread.ID, thread.Recap, cfg.RecapPromptIntro, cfg.RecapPrompt, messages) } } return nil }(); err != nil { return nil, err } if cfg.Debug { log.Printf("persisted recap") } return nil, nil } }