diff --git a/ai.go b/ai.go index 5c17116..4116b5f 100644 --- a/ai.go +++ b/ai.go @@ -4,15 +4,49 @@ import ( "bytes" "context" "os" + "strings" nn "github.com/nikolaydubina/llama2.go/exp/nnfast" "github.com/nikolaydubina/llama2.go/llama2" + "github.com/tmc/langchaingo/llms" + "github.com/tmc/langchaingo/llms/ollama" ) type AI interface { Do(context.Context, string) (string, error) } +type AINoop struct { +} + +func NewAINoop() AINoop { + return AINoop{} +} + +func (ai AINoop) Do(ctx context.Context, prompt string) (string, error) { + return ":shrug:", nil +} + +type AIOllama struct { + model string + url string +} + +func NewAIOllama(url, model string) AIOllama { + return AIOllama{url: url, model: model} +} + +func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) { + llm, err := ollama.New( + ollama.WithModel(ai.model), + ollama.WithServerURL(ai.url), + ) + if err != nil { + return "", err + } + return llms.GenerateFromSinglePrompt(ctx, llm, prompt) +} + type AILocal struct { checkpointPath string tokenizerPath string @@ -73,7 +107,7 @@ func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) { runState := llama2.NewRunState(config) - promptTokens := vocab.Encode(prompt) + promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>")) out := bytes.NewBuffer(nil) @@ -131,5 +165,5 @@ func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) { } out.Write([]byte("\n")) - return string(out.Bytes()), nil + return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil } diff --git a/ai_test.go b/ai_test.go index 8da2b79..b7dde8f 100644 --- a/ai_test.go +++ b/ai_test.go @@ -13,18 +13,30 @@ import ( "time" ) -func TestAILocal(t *testing.T) { - ctx, can := context.WithTimeout(context.Background(), time.Minute) - defer can() +func TestAINoop(t *testing.T) { + ai := NewAINoop() + testAI(t, ai) +} + +func TestAIOllama(t *testing.T) { + ai := NewAIOllama("http://localhost:11434", "gemma:2b") + + testAI(t, ai) +} + +func TestAILocal(t *testing.T) { d := os.TempDir() - for k, u := range map[string]string{ - "checkpoints": "https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin", - "tokenizer": "https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin", + checkpoints := "checkpoints" + tokenizer := "tokenizer" + for u, p := range map[string]*string{ + "https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints, + "https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer, } { func() { - if _, err := os.Stat(path.Join(d, k)); os.IsNotExist(err) { - t.Logf("downloading %s from %s", u, k) + *p = path.Base(u) + if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) { + t.Logf("downloading %s from %s", u, *p) resp, err := http.Get(u) if err != nil { @@ -32,7 +44,7 @@ func TestAILocal(t *testing.T) { } defer resp.Body.Close() - f, err := os.Create(path.Join(d, k)) + f, err := os.Create(path.Join(d, *p)) if err != nil { t.Fatal(err) } @@ -40,7 +52,7 @@ func TestAILocal(t *testing.T) { if _, err := io.Copy(f, resp.Body); err != nil { f.Close() - os.Remove(path.Join(d, k)) + os.Remove(path.Join(d, *p)) t.Fatal(err) } } @@ -48,17 +60,24 @@ func TestAILocal(t *testing.T) { } ai := NewAILocal( - path.Join(d, "checkpoints"), - path.Join(d, "tokenizer"), + path.Join(d, checkpoints), + path.Join(d, tokenizer), 0.9, 256, 0.9, ) + testAI(t, ai) +} + +func testAI(t *testing.T, ai AI) { + ctx, can := context.WithTimeout(context.Background(), time.Minute) + defer can() + t.Run("mvp", func(t *testing.T) { if result, err := ai.Do(ctx, "hello world"); err != nil { t.Fatal(err) - } else if len(result) < 250 { + } else if len(result) < 3 { t.Error(result) } else { t.Logf("%s", result) diff --git a/config.go b/config.go index d383f8f..fe16ae6 100644 --- a/config.go +++ b/config.go @@ -21,9 +21,16 @@ type Config struct { BasicAuthUser string BasicAuthPassword string FillWithTestdata bool + OllamaURL string + OllamaModel string + LocalCheckpoint string + LocalTokenizer string + AssetPattern string + DatacenterPattern string storage Storage queue Queue driver Driver + ai AI } func newConfig(ctx context.Context) (Config, error) { @@ -32,7 +39,10 @@ func newConfig(ctx context.Context) (Config, error) { func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) { def := Config{ - Port: 8080, + Port: 38080, + OllamaModel: "gemma:2b", + AssetPattern: `(dpg|svc|red)-[a-z0-9-]*`, + DatacenterPattern: `[a-z]{4}[a-z]*-[0-9]`, } var m map[string]any @@ -97,12 +107,20 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, result.driver = pg } if result.FillWithTestdata { - if err := FillWithTestdata(ctx, result.driver); err != nil { + if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern); err != nil { return Config{}, err } } result.storage = NewStorage(result.driver) result.queue = NewQueue(result.driver) + if result.OllamaURL != "" { + result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel) + } else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" { + result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9) + } else { + result.ai = NewAINoop() + } + return result, nil } diff --git a/driver.go b/driver.go index bb10b27..c9a3360 100644 --- a/driver.go +++ b/driver.go @@ -23,7 +23,7 @@ type Driver interface { Set(context.Context, string, string, []byte) error } -func FillWithTestdata(ctx context.Context, driver Driver) error { +func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern string) error { d := "./testdata/slack_events" entries, err := os.ReadDir(d) if err != nil { @@ -37,7 +37,7 @@ func FillWithTestdata(ctx context.Context, driver Driver) error { if err != nil { return err } - m, err := ParseSlack(b) + m, err := ParseSlack(b, assetPattern, datacenterPattern) if errors.Is(err, ErrIrrelevantMessage) { continue } else if err != nil { diff --git a/driver_test.go b/driver_test.go index 25abd9f..86bb2fe 100644 --- a/driver_test.go +++ b/driver_test.go @@ -17,7 +17,7 @@ func TestFillTestdata(t *testing.T) { defer can() ram := NewRAM() - if err := FillWithTestdata(ctx, ram); err != nil { + if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern); err != nil { t.Fatal(err) } n := 0 diff --git a/go.mod b/go.mod index f48b54b..9c3fb93 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,14 @@ require ( github.com/go-errors/errors v1.5.1 github.com/lib/pq v1.10.9 github.com/nikolaydubina/llama2.go v0.7.1 + github.com/tmc/langchaingo v0.1.8 go.etcd.io/bbolt v1.3.9 ) -require golang.org/x/sys v0.4.0 // indirect +require ( + github.com/dlclark/regexp2 v1.10.0 // indirect + github.com/gage-technologies/mistral-go v1.0.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pkoukk/tiktoken-go v0.1.6 // indirect + golang.org/x/sys v0.16.0 // indirect +) diff --git a/go.sum b/go.sum index a496fba..b238cea 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,30 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= +github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A= +github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE= github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM= +github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= +github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA= +github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 4b218e9..cf98020 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "os/signal" + "sort" "strconv" "strings" "syscall" @@ -64,6 +65,9 @@ func newHandler(cfg Config) http.HandlerFunc { mux := http.NewServeMux() mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) + mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) + mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) + mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) @@ -78,19 +82,82 @@ func newHandler(cfg Config) http.HandlerFunc { } } +func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"eventNames": eventNames}) + } +} + +func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + events, err := cfg.storage.EventsSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"events": events}) + } +} + +func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + messages, err := cfg.storage.MessagesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"messages": messages}) + } +} + func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } - since := time.Unix(0, 0) - if sinceS := r.URL.Query().Get("since"); sinceS == "" { - } else if n, err := strconv.ParseInt(sinceS, 10, 64); err != nil { + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return - } else { - since = time.Unix(n, 0) } threads, err := cfg.storage.ThreadsSince(r.Context(), since) @@ -99,7 +166,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { return } - json.NewEncoder(w).Encode(map[string]any{"threads": threads}) + encodeResponse(w, r, map[string]any{"threads": threads}) } } @@ -117,7 +184,7 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { return } - json.NewEncoder(w).Encode(map[string]any{"thread": map[string]any{"messages": messages}}) + encodeResponse(w, r, map[string]any{"thread": messages}) } } @@ -148,7 +215,7 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques return } - json.NewEncoder(w).Encode(map[string]any{"challenge": challenge.Challenge}) + encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) } func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { @@ -179,7 +246,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { return } - m, err := ParseSlack(b) + m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern) if errors.Is(err, ErrIrrelevantMessage) { return } else if err != nil { @@ -195,3 +262,96 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { log.Printf("ingested %v", m.ID) } } + +func parseSince(s string) (time.Time, error) { + if s == "" { + return time.Unix(0, 0), nil + } + + if n, err := strconv.ParseInt(s, 10, 64); err != nil { + } else { + return time.Unix(n, 0), nil + } + + if t, err := time.Parse(time.RFC3339, s); err != nil { + } else { + return t, nil + } + + if t, err := time.Parse(time.RFC3339Nano, s); err != nil { + } else { + return t, nil + } + + if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { + } else { + return t, nil + } + + return time.Time{}, fmt.Errorf("failed to parse since=%q", s) +} + +func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { + if strings.Contains(r.Header.Get("Accept"), "text/csv") { + return encodeCSVResponse(w, v) + } + if strings.Contains(r.Header.Get("Accept"), "text/tsv") { + return encodeTSVResponse(w, v) + } + return encodeJSONResponse(w, v) +} + +func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { + return json.NewEncoder(w).Encode(v) +} + +func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, "\t") +} + +func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, ",") +} + +func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + + var data map[string][]map[string]json.RawMessage + if err := json.Unmarshal(b, &data); err != nil { + return err + } + + var objects []map[string]json.RawMessage + for k := range data { + objects = data[k] + } + + fields := []string{} + for i := range objects { + for k := range objects[i] { + b, _ := json.Marshal(k) + fields = append(fields, string(b)) + } + break + } + sort.Strings(fields) + + w.Write([]byte(strings.Join(fields, delim))) + w.Write([]byte("\n")) + + for _, object := range objects { + for j, field := range fields { + json.Unmarshal([]byte(field), &field) + if j > 0 { + w.Write([]byte(delim)) + } + w.Write(object[field]) + } + w.Write([]byte("\n")) + } + + return nil +} diff --git a/main_test.go b/main_test.go index 79fcdde..9d3c67f 100644 --- a/main_test.go +++ b/main_test.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "encoding/csv" "encoding/json" "fmt" "io" @@ -80,6 +81,75 @@ func TestRun(t *testing.T) { } }) + t.Run("GET /api/v1/messages", func(t *testing.T) { + resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + t.Fatalf("(%d) %s", resp.StatusCode, b) + } + var result struct { + Messages []Message + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + t.Fatal(err) + } else if len(result.Messages) != 1 { + t.Fatal(result.Messages) + } else { + t.Logf("%+v", result) + } + }) + + t.Run("GET /api/v1/eventnames", func(t *testing.T) { + resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + t.Fatalf("(%d) %s", resp.StatusCode, b) + } + var result struct { + EventNames []string + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + t.Fatal(err) + } else if result.EventNames[0] != "[Oregon-1] Wal Receive Count Alert" { + t.Fatal(result.EventNames) + } else { + t.Logf("%+v", result) + } + }) + + t.Run("GET /api/v1/events", func(t *testing.T) { + resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + t.Fatalf("(%d) %s", resp.StatusCode, b) + } + var result struct { + Events []string + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + t.Fatal(err) + } else if result.Events[0] != "11067" { + t.Fatal(result.Events) + } else { + t.Logf("%+v", result) + } + }) + t.Run("GET /api/v1/threads", func(t *testing.T) { resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u)) if err != nil { @@ -98,6 +168,8 @@ func TestRun(t *testing.T) { t.Fatal(err) } else if result.Threads[0] != "1712911957.023359" { t.Fatal(result.Threads) + } else { + t.Logf("%+v", result) } }) @@ -114,16 +186,56 @@ func TestRun(t *testing.T) { } var result struct { - Thread struct { - Messages []Message - } + Thread []Message } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatal(err) - } else if len(result.Thread.Messages) != 1 { + } else if len(result.Thread) != 1 { t.Fatal(result.Thread) } else { - t.Logf("%+v", result.Thread.Messages[0]) + t.Logf("%+v", result) } }) + + t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Accept", "text/csv") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + t.Fatalf("(%d) %s", resp.StatusCode, b) + } + + dec := csv.NewReader(resp.Body) + var lastLine []string + for { + line, err := dec.Read() + if err == io.EOF { + break + } else if err != nil { + t.Error(err) + } + + if lastLine == nil { + } else if len(lastLine) != len(line) { + t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line)) + } + + t.Logf("%+v", line) + lastLine = line + } + if lastLine == nil { + t.Error("no lines found") + } + + }) } diff --git a/message.go b/message.go index c5b93dc..edb824f 100644 --- a/message.go +++ b/message.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "regexp" "strings" "time" ) @@ -13,16 +14,17 @@ var ( ) type Message struct { - ID string - TS uint64 - Source string - Channel string - Thread string - EventName string - Event string - Plaintext string - Asset string - Resolved bool + ID string + TS uint64 + Source string + Channel string + Thread string + EventName string + Event string + Plaintext string + Asset string + Resolved bool + Datacenter string } func (m Message) Empty() bool { @@ -105,7 +107,10 @@ type ( slackAction struct{} ) -func ParseSlack(b []byte) (Message, error) { +func ParseSlack(b []byte, assetPattern, datacenterPattern string) (Message, error) { + asset := regexp.MustCompile(assetPattern) + datacenter := regexp.MustCompile(datacenterPattern) + s, err := parseSlack(b) if err != nil { return Message{}, err @@ -117,17 +122,24 @@ func ParseSlack(b []byte) (Message, error) { } else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") { return Message{}, ErrIrrelevantMessage } + var tagsField string + for _, field := range s.Event.Attachments[0].Fields { + if field.Title == "Tags" { + tagsField = field.Value + } + } return Message{ - ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS), - TS: s.TS, - Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")), - Channel: s.Event.Channel, - Thread: s.Event.ID, - EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1], - Event: strings.Split(s.Event.Attachments[0].Title, ":")[0], - Plaintext: s.Event.Attachments[0].Text, - Asset: "TODO", - Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"), + ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS), + TS: s.TS, + Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")), + Channel: s.Event.Channel, + Thread: s.Event.ID, + EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1], + Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"), + Plaintext: s.Event.Attachments[0].Text, + Asset: asset.FindString(s.Event.Attachments[0].Text), + Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"), + Datacenter: datacenter.FindString(tagsField), }, nil } @@ -135,15 +147,16 @@ func ParseSlack(b []byte) (Message, error) { return Message{}, ErrIrrelevantMessage } return Message{ - ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS), - TS: s.TS, - Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")), - Channel: s.Event.Channel, - Thread: s.Event.ParentID, - EventName: "TODO", - Event: "TODO", - Plaintext: s.Event.Text, - Asset: "TODO", + ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS), + TS: s.TS, + Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")), + Channel: s.Event.Channel, + Thread: s.Event.ParentID, + EventName: "", + Event: "", + Plaintext: s.Event.Text, + Asset: asset.FindString(s.Event.Text), + Datacenter: datacenter.FindString(s.Event.Text), }, nil } diff --git a/message_test.go b/message_test.go index c965fef..28ac0b8 100644 --- a/message_test.go +++ b/message_test.go @@ -7,6 +7,11 @@ import ( "testing" ) +var ( + renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]` + renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]` +) + func TestParseSlackTestdata(t *testing.T) { cases := map[string]struct { slackMessage slackMessage @@ -39,10 +44,10 @@ func TestParseSlackTestdata(t *testing.T) { Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409", Channel: "C06U1DDBBU4", Thread: "1712927439.728409", - EventName: "TODO", - Event: "TODO", + EventName: "", + Event: "", Plaintext: "I gotta do this", - Asset: "TODO", + Asset: "", }, }, "opsgenie_alert.json": { @@ -74,9 +79,9 @@ func TestParseSlackTestdata(t *testing.T) { Channel: "C06U1DDBBU4", Thread: "1712927439.728409", EventName: "Alertconfig Workflow Failed", - Event: "#11071", + Event: "11071", Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", - Asset: "TODO", + Asset: "", }, }, "opsgenie_alert_resolved.json": { @@ -108,9 +113,9 @@ func TestParseSlackTestdata(t *testing.T) { Channel: "C06U1DDBBU4", Thread: "1712916339.000300", EventName: "Alertconfig Workflow Failed", - Event: "#11069", + Event: "11069", Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", - Asset: "TODO", + Asset: "", Resolved: true, }, }, @@ -135,7 +140,7 @@ func TestParseSlackTestdata(t *testing.T) { }) t.Run("ParseSlack", func(t *testing.T) { - got, err := ParseSlack(b) + got, err := ParseSlack(b, renderAssetPattern, renderDatacenterPattern) if err != nil { t.Fatal(err) } diff --git a/report.go b/report.go new file mode 100644 index 0000000..861d4bb --- /dev/null +++ b/report.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + _ "embed" + "io" + "text/template" + "time" +) + +//go:embed report.tmpl +var reportTMPL string + +func ReportSince(ctx context.Context, w io.Writer, s Storage, t time.Time) error { + tmpl, err := template.New("report").Parse(reportTMPL) + if err != nil { + return err + } + + messages, err := s.MessagesSince(ctx, t) + if err != nil { + return err + } + + threads, err := s.ThreadsSince(ctx, t) + if err != nil { + return err + } + + eventNames, err := s.EventNamesSince(ctx, t) + if err != nil { + return err + } + + events, err := s.EventsSince(ctx, t) + if err != nil { + return err + } + + return tmpl.Execute(w, map[string]any{ + "since": t.Format("2006-01-02"), + "messages": messages, + "threads": threads, + "events": events, + "eventNames": eventNames, + }) +} diff --git a/report.tmpl b/report.tmpl new file mode 100644 index 0000000..b679543 --- /dev/null +++ b/report.tmpl @@ -0,0 +1,12 @@ + + +
+ + +
+ +

Report

+ + + diff --git a/report_test.go b/report_test.go new file mode 100644 index 0000000..7ab64da --- /dev/null +++ b/report_test.go @@ -0,0 +1,32 @@ +package main + +import ( + "bytes" + "context" + "os" + "path" + "testing" + "time" +) + +func TestReport(t *testing.T) { + ctx, can := context.WithTimeout(context.Background(), time.Minute) + defer can() + + w := bytes.NewBuffer(nil) + + db := NewRAM() + FillWithTestdata(ctx, db, renderAssetPattern, renderDatacenterPattern) + s := NewStorage(db) + + if err := ReportSince(ctx, w, s, time.Now().Add(-1*time.Hour*24*365*20)); err != nil { + t.Fatal(err) + } + + p := path.Join(os.TempDir(), "test_report.html") + if env := os.Getenv("TEST_REPORT_PATH"); env != "" { + p = env + } + os.WriteFile(p, w.Bytes(), os.ModePerm) + t.Log(p) +} diff --git a/storage.go b/storage.go index 5442a3c..19aa577 100644 --- a/storage.go +++ b/storage.go @@ -19,23 +19,39 @@ func NewStorage(driver Driver) Storage { return Storage{driver: driver} } +func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) { + return s.messagesWhere(ctx, func(m Message) bool { + return !t.After(m.Time()) + }) +} + func (s Storage) Threads(ctx context.Context) ([]string, error) { return s.ThreadsSince(ctx, time.Unix(0, 0)) } func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) { - messages, err := s.messagesWhere(ctx, func(m Message) bool { - return !t.After(m.Time()) - }) + return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread }) +} + +func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) { + return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName }) +} + +func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) { + return s.fieldsSince(ctx, t, func(m Message) string { return m.Event }) +} + +func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) { + messages, err := s.MessagesSince(ctx, t) if err != nil { return nil, err } - threads := map[string]struct{}{} + values := map[string]struct{}{} for _, m := range messages { - threads[m.Thread] = struct{}{} + values[fielder(m)] = struct{}{} } - result := make([]string, 0, len(threads)) - for k := range threads { + result := make([]string, 0, len(values)) + for k := range values { result = append(result, k) } sort.Strings(result)