Compare commits

..

16 Commits

Author SHA1 Message Date
Bel LaPointe
5785ea37ae better recap prompts by doing an intro with the OP 2024-04-19 14:01:24 -06:00
Bel LaPointe
f27d416a5a get recap prompt from $RECAP_PROMPT 2024-04-19 13:32:12 -06:00
Bel LaPointe
81793876f8 readme 2024-04-19 13:24:48 -06:00
Bel LaPointe
6d81164161 impl PersistenceToRecap pipeline where each resolved event gets an ai recap of each of its threads that have messages persisted under the thread as a Recap column 2024-04-19 13:19:14 -06:00
Bel LaPointe
20256bd6b4 try and raise ollama timeout 2024-04-19 13:18:12 -06:00
Bel LaPointe
e5e98e2890 reenable queue new_persistence 2024-04-19 12:44:58 -06:00
Bel LaPointe
4fb26ec775 fix sqlite :memory: dont actually work 2024-04-19 12:42:36 -06:00
Bel LaPointe
782b9ec3cf if no therads or no messages then no ai 2024-04-19 12:39:50 -06:00
Bel LaPointe
9d7f69bd8a default ollama model to llama3 2024-04-19 12:36:25 -06:00
Bel LaPointe
12de99da57 impl GET /api/v1/rpc/aievent?id=123 2024-04-19 12:34:49 -06:00
Bel LaPointe
81fe8070ca impl storage GetEventThreads 2024-04-19 12:25:40 -06:00
Bel LaPointe
79de56e236 impl storage.GetThreadMessages 2024-04-19 12:21:31 -06:00
Bel LaPointe
f485b5ea88 from OLLAMA_U_R_L to OLLAMA_URL 2024-04-19 11:54:09 -06:00
Bel LaPointe
894536d209 oops drop bad log 2024-04-18 15:05:25 -06:00
Bel LaPointe
f8861a73b5 async slack scrape goes up to ?since 2024-04-18 14:56:33 -06:00
Bel LaPointe
14de286415 go test -tags=ai -v -run=AI works with ollama which is cool and fast with llama3 2024-04-18 14:08:28 -06:00
21 changed files with 658 additions and 346 deletions

View File

@@ -4,14 +4,9 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
## TODO
- dedupe critical+noncritical
- what SLO/SLI can I help benoit with
- break into smaller goals
- sell to the team
- scott; like to keep state in incident.io and zendesk
- @spoc -ignore, @spoc -s summary
- limit rps from rpc/slackscrape
- rpc/slackscrape to async
- limit queue retries
```

139
ai.go
View File

@@ -1,13 +1,10 @@
package main
import (
"bytes"
"context"
"os"
"strings"
"net/http"
"time"
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
"github.com/nikolaydubina/llama2.go/llama2"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
)
@@ -37,133 +34,23 @@ func NewAIOllama(url, model string) AIOllama {
}
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
c := &http.Client{
Timeout: time.Hour,
Transport: &http.Transport{
//DisableKeepAlives: true,
IdleConnTimeout: time.Hour,
ResponseHeaderTimeout: time.Hour,
ExpectContinueTimeout: time.Hour,
},
}
defer c.CloseIdleConnections()
llm, err := ollama.New(
ollama.WithModel(ai.model),
ollama.WithServerURL(ai.url),
ollama.WithHTTPClient(c),
)
if err != nil {
return "", err
}
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
}
type AILocal struct {
checkpointPath string
tokenizerPath string
temperature float64
steps int
topp float64
}
func NewAILocal(
checkpointPath string,
tokenizerPath string,
temperature float64,
steps int,
topp float64,
) AILocal {
return AILocal{
checkpointPath: checkpointPath,
tokenizerPath: tokenizerPath,
temperature: temperature,
steps: steps,
topp: topp,
}
}
// https://github.com/nikolaydubina/llama2.go/blob/master/main.go
func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
checkpointFile, err := os.OpenFile(ai.checkpointPath, os.O_RDONLY, 0)
if err != nil {
return "", err
}
defer checkpointFile.Close()
config, err := llama2.NewConfigFromCheckpoint(checkpointFile)
if err != nil {
return "", err
}
isSharedWeights := config.VocabSize > 0
if config.VocabSize < 0 {
config.VocabSize = -config.VocabSize
}
tokenizerFile, err := os.OpenFile(ai.tokenizerPath, os.O_RDONLY, 0)
if err != nil {
return "", err
}
defer tokenizerFile.Close()
vocab := llama2.NewVocabFromFile(config.VocabSize, tokenizerFile)
w := llama2.NewTransformerWeightsFromCheckpoint(config, checkpointFile, isSharedWeights)
// right now we cannot run for more than config.SeqLen steps
steps := ai.steps
if steps <= 0 || steps > config.SeqLen {
steps = config.SeqLen
}
runState := llama2.NewRunState(config)
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
out := bytes.NewBuffer(nil)
// the current position we are in
var token int = 1 // 1 = BOS token in llama-2 sentencepiece
var pos = 0
for pos < steps {
// forward the transformer to get logits for the next token
llama2.Transformer(token, pos, config, runState, w)
var next int
if pos < len(promptTokens) {
next = promptTokens[pos]
} else {
// sample the next token
if ai.temperature == 0 {
// greedy argmax sampling
next = nn.ArgMax(runState.Logits)
} else {
// apply the temperature to the logits
for q := 0; q < config.VocabSize; q++ {
runState.Logits[q] /= float32(ai.temperature)
}
// apply softmax to the logits to the probabilities for next token
nn.SoftMax(runState.Logits)
// we now want to sample from this distribution to get the next token
if ai.topp <= 0 || ai.topp >= 1 {
// simply sample from the predicted probability distribution
next = nn.Sample(runState.Logits)
} else {
// top-p (nucleus) sampling, clamping the least likely tokens to zero
next = nn.SampleTopP(runState.Logits, float32(ai.topp))
}
}
}
pos++
// data-dependent terminating condition: the BOS (1) token delimits sequences
if next == 1 {
break
}
// following BOS (1) token, sentencepiece decoder strips any leading whitespace
var tokenStr string
if token == 1 && vocab.Words[next][0] == ' ' {
tokenStr = vocab.Words[next][1:]
} else {
tokenStr = vocab.Words[next]
}
out.Write([]byte(tokenStr))
// advance forward
token = next
}
out.Write([]byte("\n"))
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
}

View File

@@ -4,11 +4,6 @@ package main
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path"
"testing"
"time"
)
@@ -22,53 +17,7 @@ func TestAINoop(t *testing.T) {
func TestAIOllama(t *testing.T) {
t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai)
}
func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir()
checkpoints := "checkpoints"
tokenizer := "tokenizer"
for u, p := range map[string]*string{
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
} {
func() {
*p = path.Base(u)
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
t.Logf("downloading %s from %s", u, *p)
resp, err := http.Get(u)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
f, err := os.Create(path.Join(d, *p))
if err != nil {
t.Fatal(err)
}
defer f.Close()
if _, err := io.Copy(f, resp.Body); err != nil {
f.Close()
os.Remove(path.Join(d, *p))
t.Fatal(err)
}
}
}()
}
ai := NewAILocal(
path.Join(d, checkpoints),
path.Join(d, tokenizer),
0.9,
256,
0.9,
)
ai := NewAIOllama("http://localhost:11434", "llama3")
testAI(t, ai)
}
@@ -78,7 +27,7 @@ func testAI(t *testing.T, ai AI) {
defer can()
t.Run("mvp", func(t *testing.T) {
if result, err := ai.Do(ctx, "hello world"); err != nil {
if result, err := ai.Do(ctx, "Tell me a fun fact."); err != nil {
t.Fatal(err)
} else if len(result) < 3 {
t.Error(result)
@@ -87,6 +36,7 @@ func testAI(t *testing.T, ai AI) {
}
})
/*
t.Run("simulation", func(t *testing.T) {
d := NewRAM()
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
@@ -114,5 +64,5 @@ func testAI(t *testing.T, ai AI) {
}
t.Logf("\n\t%s\n->\n\t%s", input, result)
})
*/
}

View File

@@ -23,10 +23,10 @@ type Config struct {
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaUrl string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
RecapPromptIntro string
RecapPrompt string
AssetPattern string
DatacenterPattern string
EventNamePattern string
@@ -34,7 +34,9 @@ type Config struct {
storage Storage
ai AI
slackToModelPipeline Pipeline
slackScrapePipeline Pipeline
modelToPersistencePipeline Pipeline
persistenceToRecapPipeline Pipeline
}
var (
@@ -50,10 +52,12 @@ func newConfig(ctx context.Context) (Config, error) {
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
def := Config{
Port: 38080,
OllamaModel: "gemma:2b",
OllamaModel: "llama3",
AssetPattern: renderAssetPattern,
DatacenterPattern: renderDatacenterPattern,
EventNamePattern: renderEventNamePattern,
RecapPromptIntro: "A Slack thread begin with the following original post.",
RecapPrompt: "Summarize all of the following Slack thread responses in 1 sentence without any leading text.",
}
var m map[string]any
@@ -130,10 +134,8 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.storage = storage
if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
if result.OllamaUrl != "" {
result.ai = NewAIOllama(result.OllamaUrl, result.OllamaModel)
} else {
result.ai = NewAINoop()
}
@@ -150,5 +152,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.modelToPersistencePipeline = modelToPersistencePipeline
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackScrapePipeline = slackScrapePipeline
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToRecapPipeline = persistenceToRecapPipeline
return result, nil
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/url"
"os"
"path"
"testing"
@@ -35,7 +36,12 @@ func NewTestDriver(t *testing.T, optionalP ...string) Driver {
func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {
conn = ":memory:"
f, err := os.CreateTemp(os.TempDir(), "spoc-bot-vr-undef-*.db")
if err != nil {
return Driver{}, err
}
f.Close()
conn = f.Name()
} else {
if u, err := url.Parse(conn); err != nil {
return Driver{}, err

5
go.mod
View File

@@ -6,9 +6,9 @@ require (
github.com/glebarez/go-sqlite v1.21.2
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8
gotest.tools/v3 v3.5.1
golang.org/x/time v0.5.0
gotest.tools v2.2.0+incompatible
)
require (
@@ -20,7 +20,6 @@ require (
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect

6
go.sum
View File

@@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
@@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=

100
main.go
View File

@@ -39,6 +39,8 @@ func run(ctx context.Context, cfg Config) error {
case err := <-processPipelines(ctx,
cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline,
cfg.slackScrapePipeline,
cfg.persistenceToRecapPipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
@@ -88,6 +90,7 @@ func newHandler(cfg Config) http.HandlerFunc {
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
@@ -106,90 +109,45 @@ func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
json.NewEncoder(w).Encode(map[string]any{"version": Version})
}
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
event := r.URL.Query().Get("id")
b, _ := json.Marshal(ModelIDs{Event: event})
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"event": event})
}
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
httpc := http.Client{Timeout: time.Second}
get := func(url string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token)
req = req.WithContext(r.Context())
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
}
n := 0
for len(urls) > 0 {
url := urls[0]
urls = urls[1:]
select {
case <-r.Context().Done():
case <-time.After(time.Second):
}
body, err := get(url)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
job, _ := json.Marshal(SlackScrape{
Latest: time.Now().Unix(),
Oldest: since.Unix(),
ThreadTS: "",
Channel: r.Header.Get("slack-channel"),
Token: r.Header.Get("slack-oauth-token"),
})
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
errs = append(errs, err)
} else {
n += 1
}
if !strings.Contains(url, "ts=") {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
}
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
}
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
}
}

View File

@@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
return int(port)
}()
u := fmt.Sprintf("http://localhost:%d", port)
var err error
cfg := Config{}
cfg.DatacenterPattern = renderDatacenterPattern
@@ -44,10 +45,16 @@ func TestRun(t *testing.T) {
cfg.Port = port
cfg.driver = NewTestDriver(t)
cfg.storage, _ = NewStorage(ctx, cfg.driver)
cfg.ai = NewAINoop()
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
if err != nil {
t.Fatal(err)
}
go func() {
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
@@ -83,4 +90,41 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
})
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
for ctx.Err() == nil {
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
break
}
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 100):
}
}
if err := ctx.Err(); err != nil {
t.Fatal("timed out waiting for recap")
}
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
if thread.Recap == "" {
t.Error(thread.Recap)
}
t.Log(thread.Recap)
})
}

View File

@@ -8,6 +8,7 @@ type Thread struct {
TS uint64
Channel string
EventID string
Recap string
}
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {

View File

@@ -26,15 +26,14 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
if err != nil {
return Pipeline{}, err
}
writer = NewNoopQueue()
return Pipeline{
writer: writer,
reader: reader,
process: newModelToPersistenceProcess(cfg.storage),
process: newModelToPersistenceProcess(cfg, cfg.storage),
}, nil
}
func newModelToPersistenceProcess(storage Storage) processFunc {
func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
return func(ctx context.Context, models []byte) ([]byte, error) {
var m Models
if err := json.Unmarshal(models, &m); err != nil {
@@ -56,7 +55,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
return nil, fmt.Errorf("failed to persist message: %w", err)
}
if cfg.Debug {
log.Printf("persisted models")
}
return json.Marshal(ModelIDs{
Event: m.Event.ID,
Thread: m.Thread.ID,

View File

@@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
d := NewTestDriver(t)
s, _ := NewStorage(ctx, d)
process := newModelToPersistenceProcess(s)
process := newModelToPersistenceProcess(Config{}, s)
_, _ = ctx, process

View File

@@ -37,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
if q.driver.DB == nil {
return nil
}
_, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
result, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
`,
uuid.New().String(),
q.topic,
time.Now().Unix(),
b,
uuid.New().String(),
)
if err != nil {
return err
}
if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
}
return nil
}
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {

84
recap.go Normal file
View File

@@ -0,0 +1,84 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
)
type PersistenceToRecap struct {
pipeline Pipeline
}
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer := NewNoopQueue()
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToRecapProcess(cfg),
}, nil
}
func newPersistenceToRecapProcess(cfg Config) processFunc {
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
var m ModelIDs
if err := json.Unmarshal(modelIDs, &m); err != nil {
return nil, fmt.Errorf("received non model ids payload: %w", err)
}
if m.Event == "" {
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
return nil, err
} else if !event.Resolved {
} else if err := func() error {
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
if err != nil {
return err
}
for _, thread := range threads {
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
if err != nil {
return err
} else if len(messages) < 2 {
continue
}
prompt := []string{
cfg.RecapPromptIntro,
"---",
messages[0].Plaintext,
"---",
cfg.RecapPrompt,
"---",
}
for _, message := range messages[1:] {
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
}
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
if err != nil {
return err
}
thread.Recap = recap
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
return err
}
log.Println("recapped", thread.ID)
}
return nil
}(); err != nil {
return nil, err
}
if cfg.Debug {
log.Printf("persisted recap")
}
return nil, nil
}
}

50
recap_test.go Normal file
View File

@@ -0,0 +1,50 @@
package main
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestNewPersistenceToRecapProcess(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
s, _ := NewStorage(ctx, d)
cfg := Config{
driver: d,
storage: s,
ai: NewAINoop(),
Debug: true,
}
proc := newPersistenceToRecapProcess(cfg)
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
t.Fatal(err)
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
t.Fatal(err)
}
b, _ := json.Marshal(ModelIDs{Event: "Event"})
if _, err := proc(ctx, b); err != nil {
t.Error(err)
}
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
t.Error(err)
} else if thread.Recap == "" {
t.Error("no recap:", thread.Recap)
} else {
t.Logf("%+v", thread)
}
}

View File

@@ -77,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
}
if cfg.Debug {
log.Printf("parsed slack message into models")
}
return json.Marshal(Models{
Event: event,
Message: message,

View File

@@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
"",
"",
"Datastores Non-Critical",
false,
true,
),
Message: model.NewMessage(
"1712927439.728409/1712927439",
@@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Color: "2ecc71",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
@@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical",
Resolved: true,
},
},
"opsgenie_alert_resolved.json": {

149
slackscrape.go Normal file
View File

@@ -0,0 +1,149 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"time"
"golang.org/x/time/rate"
)
type SlackScrape struct {
Latest int64
Oldest int64
ThreadTS string
Channel string
Token string
}
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: cfg.slackScrapePipeline.reader,
process: newSlackScrapeProcess(cfg),
}, nil
}
func newSlackScrapeProcess(cfg Config) processFunc {
limiter := rate.NewLimiter(0.5, 1)
return func(ctx context.Context, jobb []byte) ([]byte, error) {
if err := limiter.Wait(ctx); err != nil {
return nil, err
}
var job SlackScrape
if err := json.Unmarshal(jobb, &job); err != nil {
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
}
u := url.URL{
Scheme: "https",
Host: "slack.com",
Path: "/api/conversations.history",
}
q := url.Values{}
q.Set("channel", job.Channel)
q.Set("latest", strconv.FormatInt(job.Latest, 10))
q.Set("limit", "999")
q.Set("inclusive", "true")
if job.ThreadTS != "" {
u.Path = "/api/conversations.replies"
q.Set("ts", job.ThreadTS)
}
if job.Oldest != 0 {
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
}
u.RawQuery = q.Encode()
url := u.String()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+job.Token)
req = req.WithContext(ctx)
httpc := http.Client{Timeout: time.Second}
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
return nil, err
}
newLatest := float64(job.Latest)
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
var peekTS struct {
TS float64 `json:"ts,string"`
}
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
newLatest = peekTS.TS
}
if job.ThreadTS == "" {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
clone := job
clone.ThreadTS = peek.ThreadTS
clone.Oldest = 0
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
}
}
}
if len(page.Messages) == 999 {
clone := job
clone.Latest = int64(newLatest)
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
}
log.Printf("scraped %v from %s", len(page.Messages), url)
return nil, nil
}
}

View File

@@ -67,10 +67,112 @@ func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error)
return v, err
}
func (s Storage) GetEventThreads(ctx context.Context, ID string) ([]model.Thread, error) {
return s.selectThreadsWhere(ctx, "EventID = $1", ID)
}
func (s Storage) GetThreadMessages(ctx context.Context, ID string) ([]model.Message, error) {
return s.selectMessagesWhere(ctx, "ThreadID = $1", ID)
}
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
return s.upsert(ctx, "threads", thread)
}
func (s Storage) selectThreadsWhere(ctx context.Context, clause string, args ...any) ([]model.Thread, error) {
keys, _, _, _, err := keysArgsKeyargsValues(model.Thread{})
if err != nil {
return nil, err
}
args2 := make([]any, len(args))
for i := range args {
args2[i], _ = json.Marshal(args[i])
}
scanTargets := make([]any, len(keys))
q := fmt.Sprintf(`
SELECT %s FROM threads WHERE %s
ORDER BY TS ASC
`, strings.Join(keys, ", "), clause)
rows, err := s.driver.QueryContext(ctx, q, args2...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []model.Thread
for rows.Next() {
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := rows.Scan(scanTargets...); err != nil {
return nil, err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
var one model.Thread
if err := json.Unmarshal(b, &one); err != nil {
return nil, err
}
result = append(result, one)
}
return result, rows.Err()
}
func (s Storage) selectMessagesWhere(ctx context.Context, clause string, args ...any) ([]model.Message, error) {
keys, _, _, _, err := keysArgsKeyargsValues(model.Message{})
if err != nil {
return nil, err
}
args2 := make([]any, len(args))
for i := range args {
args2[i], _ = json.Marshal(args[i])
}
scanTargets := make([]any, len(keys))
q := fmt.Sprintf(`
SELECT %s FROM messages WHERE %s
ORDER BY TS ASC
`, strings.Join(keys, ", "), clause)
rows, err := s.driver.QueryContext(ctx, q, args2...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []model.Message
for rows.Next() {
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := rows.Scan(scanTargets...); err != nil {
return nil, err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
var one model.Message
if err := json.Unmarshal(b, &one); err != nil {
return nil, err
}
result = append(result, one)
}
return result, rows.Err()
}
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
if questions := strings.Count(clause, "$"); questions != len(args) {
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))

View File

@@ -2,6 +2,8 @@ package main
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
@@ -85,4 +87,64 @@ func TestStorage(t *testing.T) {
t.Fatal("unexpected result from get:", got)
}
})
t.Run("get thread messages", func(t *testing.T) {
thread := fmt.Sprintf("thread-%d", rand.Int())
m := model.NewMessage(
"ID",
1,
"Author",
"Plaintext",
thread,
)
if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if m2, err := s.GetMessage(ctx, m.ID); err != nil {
t.Fatal("unexpected error on upsert-get:", err)
} else if m2 != m {
t.Errorf("expected %+v but got %+v", m, m2)
}
msgs, err := s.GetThreadMessages(ctx, thread)
if err != nil {
t.Fatal(err)
} else if len(msgs) != 1 {
t.Fatal(msgs)
} else if msgs[0].ThreadID != m.ThreadID {
t.Fatal(msgs[0].ThreadID)
} else if msgs[0] != m {
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
}
})
t.Run("get event threads", func(t *testing.T) {
event := fmt.Sprintf("event-%d", rand.Int())
m := model.NewThread(
"ID",
"URL",
1,
"Channel",
event,
)
if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if m2, err := s.GetThread(ctx, m.ID); err != nil {
t.Fatal("unexpected error on upsert-get:", err)
} else if m2 != m {
t.Errorf("expected %+v but got %+v", m, m2)
}
msgs, err := s.GetEventThreads(ctx, event)
if err != nil {
t.Fatal(err)
} else if len(msgs) != 1 {
t.Fatal(msgs)
} else if msgs[0].EventID != m.EventID {
t.Fatal(msgs[0].EventID)
} else if msgs[0] != m {
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
}
})
}

View File

@@ -29,7 +29,8 @@
"attachments": [
{
"id": 1,
"color": "F4511E",
"realcolor": "F4511E",
"color": "2ecc71",
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",