Merge remote-tracking branch 'gitea/main'

main
Bel LaPointe 2024-04-14 09:19:14 -06:00
commit c89a9a8ada
15 changed files with 571 additions and 86 deletions

38
ai.go
View File

@ -4,15 +4,49 @@ import (
"bytes"
"context"
"os"
"strings"
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
"github.com/nikolaydubina/llama2.go/llama2"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
)
type AI interface {
Do(context.Context, string) (string, error)
}
type AINoop struct {
}
func NewAINoop() AINoop {
return AINoop{}
}
func (ai AINoop) Do(ctx context.Context, prompt string) (string, error) {
return ":shrug:", nil
}
type AIOllama struct {
model string
url string
}
func NewAIOllama(url, model string) AIOllama {
return AIOllama{url: url, model: model}
}
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
llm, err := ollama.New(
ollama.WithModel(ai.model),
ollama.WithServerURL(ai.url),
)
if err != nil {
return "", err
}
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
}
type AILocal struct {
checkpointPath string
tokenizerPath string
@ -73,7 +107,7 @@ func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
runState := llama2.NewRunState(config)
promptTokens := vocab.Encode(prompt)
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
out := bytes.NewBuffer(nil)
@ -131,5 +165,5 @@ func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
}
out.Write([]byte("\n"))
return string(out.Bytes()), nil
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
}

View File

@ -13,18 +13,30 @@ import (
"time"
)
func TestAILocal(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Minute)
defer can()
func TestAINoop(t *testing.T) {
ai := NewAINoop()
testAI(t, ai)
}
func TestAIOllama(t *testing.T) {
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai)
}
func TestAILocal(t *testing.T) {
d := os.TempDir()
for k, u := range map[string]string{
"checkpoints": "https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin",
"tokenizer": "https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin",
checkpoints := "checkpoints"
tokenizer := "tokenizer"
for u, p := range map[string]*string{
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
} {
func() {
if _, err := os.Stat(path.Join(d, k)); os.IsNotExist(err) {
t.Logf("downloading %s from %s", u, k)
*p = path.Base(u)
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
t.Logf("downloading %s from %s", u, *p)
resp, err := http.Get(u)
if err != nil {
@ -32,7 +44,7 @@ func TestAILocal(t *testing.T) {
}
defer resp.Body.Close()
f, err := os.Create(path.Join(d, k))
f, err := os.Create(path.Join(d, *p))
if err != nil {
t.Fatal(err)
}
@ -40,7 +52,7 @@ func TestAILocal(t *testing.T) {
if _, err := io.Copy(f, resp.Body); err != nil {
f.Close()
os.Remove(path.Join(d, k))
os.Remove(path.Join(d, *p))
t.Fatal(err)
}
}
@ -48,17 +60,24 @@ func TestAILocal(t *testing.T) {
}
ai := NewAILocal(
path.Join(d, "checkpoints"),
path.Join(d, "tokenizer"),
path.Join(d, checkpoints),
path.Join(d, tokenizer),
0.9,
256,
0.9,
)
testAI(t, ai)
}
func testAI(t *testing.T, ai AI) {
ctx, can := context.WithTimeout(context.Background(), time.Minute)
defer can()
t.Run("mvp", func(t *testing.T) {
if result, err := ai.Do(ctx, "hello world"); err != nil {
t.Fatal(err)
} else if len(result) < 250 {
} else if len(result) < 3 {
t.Error(result)
} else {
t.Logf("%s", result)

View File

@ -21,9 +21,16 @@ type Config struct {
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
storage Storage
queue Queue
driver Driver
ai AI
}
func newConfig(ctx context.Context) (Config, error) {
@ -32,7 +39,10 @@ func newConfig(ctx context.Context) (Config, error) {
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
def := Config{
Port: 8080,
Port: 38080,
OllamaModel: "gemma:2b",
AssetPattern: `(dpg|svc|red)-[a-z0-9-]*`,
DatacenterPattern: `[a-z]{4}[a-z]*-[0-9]`,
}
var m map[string]any
@ -97,12 +107,20 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.driver = pg
}
if result.FillWithTestdata {
if err := FillWithTestdata(ctx, result.driver); err != nil {
if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern); err != nil {
return Config{}, err
}
}
result.storage = NewStorage(result.driver)
result.queue = NewQueue(result.driver)
if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
} else {
result.ai = NewAINoop()
}
return result, nil
}

View File

@ -23,7 +23,7 @@ type Driver interface {
Set(context.Context, string, string, []byte) error
}
func FillWithTestdata(ctx context.Context, driver Driver) error {
func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern string) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
@ -37,7 +37,7 @@ func FillWithTestdata(ctx context.Context, driver Driver) error {
if err != nil {
return err
}
m, err := ParseSlack(b)
m, err := ParseSlack(b, assetPattern, datacenterPattern)
if errors.Is(err, ErrIrrelevantMessage) {
continue
} else if err != nil {

View File

@ -17,7 +17,7 @@ func TestFillTestdata(t *testing.T) {
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram); err != nil {
if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern); err != nil {
t.Fatal(err)
}
n := 0

9
go.mod
View File

@ -6,7 +6,14 @@ require (
github.com/go-errors/errors v1.5.1
github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8
go.etcd.io/bbolt v1.3.9
)
require golang.org/x/sys v0.4.0 // indirect
require (
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/gage-technologies/mistral-go v1.0.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
golang.org/x/sys v0.16.0 // indirect
)

22
go.sum
View File

@ -1,20 +1,30 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A=
github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

178
main.go
View File

@ -11,6 +11,7 @@ import (
"net"
"net/http"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
@ -64,6 +65,9 @@ func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
@ -78,19 +82,82 @@ func newHandler(cfg Config) http.HandlerFunc {
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
}
}
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := cfg.storage.EventsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"events": events})
}
}
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
messages, err := cfg.storage.MessagesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"messages": messages})
}
}
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since := time.Unix(0, 0)
if sinceS := r.URL.Query().Get("since"); sinceS == "" {
} else if n, err := strconv.ParseInt(sinceS, 10, 64); err != nil {
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
} else {
since = time.Unix(n, 0)
}
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
@ -99,7 +166,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return
}
json.NewEncoder(w).Encode(map[string]any{"threads": threads})
encodeResponse(w, r, map[string]any{"threads": threads})
}
}
@ -117,7 +184,7 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
return
}
json.NewEncoder(w).Encode(map[string]any{"thread": map[string]any{"messages": messages}})
encodeResponse(w, r, map[string]any{"thread": messages})
}
}
@ -148,7 +215,7 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques
return
}
json.NewEncoder(w).Encode(map[string]any{"challenge": challenge.Challenge})
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
}
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
@ -179,7 +246,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return
}
m, err := ParseSlack(b)
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern)
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
@ -195,3 +262,96 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
log.Printf("ingested %v", m.ID)
}
}
func parseSince(s string) (time.Time, error) {
if s == "" {
return time.Unix(0, 0), nil
}
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
} else {
return time.Unix(n, 0), nil
}
if t, err := time.Parse(time.RFC3339, s); err != nil {
} else {
return t, nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
} else {
return t, nil
}
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
} else {
return t, nil
}
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
}
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
return encodeCSVResponse(w, v)
}
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
return encodeTSVResponse(w, v)
}
return encodeJSONResponse(w, v)
}
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
return json.NewEncoder(w).Encode(v)
}
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, "\t")
}
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, ",")
}
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
var data map[string][]map[string]json.RawMessage
if err := json.Unmarshal(b, &data); err != nil {
return err
}
var objects []map[string]json.RawMessage
for k := range data {
objects = data[k]
}
fields := []string{}
for i := range objects {
for k := range objects[i] {
b, _ := json.Marshal(k)
fields = append(fields, string(b))
}
break
}
sort.Strings(fields)
w.Write([]byte(strings.Join(fields, delim)))
w.Write([]byte("\n"))
for _, object := range objects {
for j, field := range fields {
json.Unmarshal([]byte(field), &field)
if j > 0 {
w.Write([]byte(delim))
}
w.Write(object[field])
}
w.Write([]byte("\n"))
}
return nil
}

View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
@ -80,6 +81,75 @@ func TestRun(t *testing.T) {
}
})
t.Run("GET /api/v1/messages", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Messages []Message
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Messages) != 1 {
t.Fatal(result.Messages)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/eventnames", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
EventNames []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.EventNames[0] != "[Oregon-1] Wal Receive Count Alert" {
t.Fatal(result.EventNames)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/events", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Events []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.Events[0] != "11067" {
t.Fatal(result.Events)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/threads", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u))
if err != nil {
@ -98,6 +168,8 @@ func TestRun(t *testing.T) {
t.Fatal(err)
} else if result.Threads[0] != "1712911957.023359" {
t.Fatal(result.Threads)
} else {
t.Logf("%+v", result)
}
})
@ -114,16 +186,56 @@ func TestRun(t *testing.T) {
}
var result struct {
Thread struct {
Messages []Message
}
Thread []Message
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Thread.Messages) != 1 {
} else if len(result.Thread) != 1 {
t.Fatal(result.Thread)
} else {
t.Logf("%+v", result.Thread.Messages[0])
t.Logf("%+v", result)
}
})
t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "text/csv")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
dec := csv.NewReader(resp.Body)
var lastLine []string
for {
line, err := dec.Read()
if err == io.EOF {
break
} else if err != nil {
t.Error(err)
}
if lastLine == nil {
} else if len(lastLine) != len(line) {
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
}
t.Logf("%+v", line)
lastLine = line
}
if lastLine == nil {
t.Error("no lines found")
}
})
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"time"
)
@ -23,6 +24,7 @@ type Message struct {
Plaintext string
Asset string
Resolved bool
Datacenter string
}
func (m Message) Empty() bool {
@ -105,7 +107,10 @@ type (
slackAction struct{}
)
func ParseSlack(b []byte) (Message, error) {
func ParseSlack(b []byte, assetPattern, datacenterPattern string) (Message, error) {
asset := regexp.MustCompile(assetPattern)
datacenter := regexp.MustCompile(datacenterPattern)
s, err := parseSlack(b)
if err != nil {
return Message{}, err
@ -117,6 +122,12 @@ func ParseSlack(b []byte) (Message, error) {
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return Message{}, ErrIrrelevantMessage
}
var tagsField string
for _, field := range s.Event.Attachments[0].Fields {
if field.Title == "Tags" {
tagsField = field.Value
}
}
return Message{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
TS: s.TS,
@ -124,10 +135,11 @@ func ParseSlack(b []byte) (Message, error) {
Channel: s.Event.Channel,
Thread: s.Event.ID,
EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1],
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"),
Plaintext: s.Event.Attachments[0].Text,
Asset: "TODO",
Asset: asset.FindString(s.Event.Attachments[0].Text),
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
Datacenter: datacenter.FindString(tagsField),
}, nil
}
@ -140,10 +152,11 @@ func ParseSlack(b []byte) (Message, error) {
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")),
Channel: s.Event.Channel,
Thread: s.Event.ParentID,
EventName: "TODO",
Event: "TODO",
EventName: "",
Event: "",
Plaintext: s.Event.Text,
Asset: "TODO",
Asset: asset.FindString(s.Event.Text),
Datacenter: datacenter.FindString(s.Event.Text),
}, nil
}

View File

@ -7,6 +7,11 @@ import (
"testing"
)
var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
)
func TestParseSlackTestdata(t *testing.T) {
cases := map[string]struct {
slackMessage slackMessage
@ -39,10 +44,10 @@ func TestParseSlackTestdata(t *testing.T) {
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "TODO",
Event: "TODO",
EventName: "",
Event: "",
Plaintext: "I gotta do this",
Asset: "TODO",
Asset: "",
},
},
"opsgenie_alert.json": {
@ -74,9 +79,9 @@ func TestParseSlackTestdata(t *testing.T) {
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "Alertconfig Workflow Failed",
Event: "#11071",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "TODO",
Asset: "",
},
},
"opsgenie_alert_resolved.json": {
@ -108,9 +113,9 @@ func TestParseSlackTestdata(t *testing.T) {
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "#11069",
Event: "11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "TODO",
Asset: "",
Resolved: true,
},
},
@ -135,7 +140,7 @@ func TestParseSlackTestdata(t *testing.T) {
})
t.Run("ParseSlack", func(t *testing.T) {
got, err := ParseSlack(b)
got, err := ParseSlack(b, renderAssetPattern, renderDatacenterPattern)
if err != nil {
t.Fatal(err)
}

47
report.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"context"
_ "embed"
"io"
"text/template"
"time"
)
//go:embed report.tmpl
var reportTMPL string
func ReportSince(ctx context.Context, w io.Writer, s Storage, t time.Time) error {
tmpl, err := template.New("report").Parse(reportTMPL)
if err != nil {
return err
}
messages, err := s.MessagesSince(ctx, t)
if err != nil {
return err
}
threads, err := s.ThreadsSince(ctx, t)
if err != nil {
return err
}
eventNames, err := s.EventNamesSince(ctx, t)
if err != nil {
return err
}
events, err := s.EventsSince(ctx, t)
if err != nil {
return err
}
return tmpl.Execute(w, map[string]any{
"since": t.Format("2006-01-02"),
"messages": messages,
"threads": threads,
"events": events,
"eventNames": eventNames,
})
}

12
report.tmpl Normal file
View File

@ -0,0 +1,12 @@
<!DOCTYPE html>
<html>
<header>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
</header>
<body>
<h1>Report</h1>
</body>
<footer>
</footer>
</html>

32
report_test.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"bytes"
"context"
"os"
"path"
"testing"
"time"
)
func TestReport(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Minute)
defer can()
w := bytes.NewBuffer(nil)
db := NewRAM()
FillWithTestdata(ctx, db, renderAssetPattern, renderDatacenterPattern)
s := NewStorage(db)
if err := ReportSince(ctx, w, s, time.Now().Add(-1*time.Hour*24*365*20)); err != nil {
t.Fatal(err)
}
p := path.Join(os.TempDir(), "test_report.html")
if env := os.Getenv("TEST_REPORT_PATH"); env != "" {
p = env
}
os.WriteFile(p, w.Bytes(), os.ModePerm)
t.Log(p)
}

View File

@ -19,23 +19,39 @@ func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
messages, err := s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
}
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
}
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
}
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
messages, err := s.MessagesSince(ctx, t)
if err != nil {
return nil, err
}
threads := map[string]struct{}{}
values := map[string]struct{}{}
for _, m := range messages {
threads[m.Thread] = struct{}{}
values[fielder(m)] = struct{}{}
}
result := make([]string, 0, len(threads))
for k := range threads {
result := make([]string, 0, len(values))
for k := range values {
result = append(result, k)
}
sort.Strings(result)