Compare commits
16 Commits
8557ddc522
...
5785ea37ae
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5785ea37ae | ||
|
|
f27d416a5a | ||
|
|
81793876f8 | ||
|
|
6d81164161 | ||
|
|
20256bd6b4 | ||
|
|
e5e98e2890 | ||
|
|
4fb26ec775 | ||
|
|
782b9ec3cf | ||
|
|
9d7f69bd8a | ||
|
|
12de99da57 | ||
|
|
81fe8070ca | ||
|
|
79de56e236 | ||
|
|
f485b5ea88 | ||
|
|
894536d209 | ||
|
|
f8861a73b5 | ||
|
|
14de286415 |
@@ -4,14 +4,9 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
|
||||
|
||||
## TODO
|
||||
|
||||
- dedupe critical+noncritical
|
||||
- what SLO/SLI can I help benoit with
|
||||
- break into smaller goals
|
||||
- sell to the team
|
||||
- scott; like to keep state in incident.io and zendesk
|
||||
- @spoc -ignore, @spoc -s summary
|
||||
- limit rps from rpc/slackscrape
|
||||
- rpc/slackscrape to async
|
||||
- limit queue retries
|
||||
|
||||
```
|
||||
|
||||
139
ai.go
139
ai.go
@@ -1,13 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
|
||||
"github.com/nikolaydubina/llama2.go/llama2"
|
||||
"github.com/tmc/langchaingo/llms"
|
||||
"github.com/tmc/langchaingo/llms/ollama"
|
||||
)
|
||||
@@ -37,133 +34,23 @@ func NewAIOllama(url, model string) AIOllama {
|
||||
}
|
||||
|
||||
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
|
||||
c := &http.Client{
|
||||
Timeout: time.Hour,
|
||||
Transport: &http.Transport{
|
||||
//DisableKeepAlives: true,
|
||||
IdleConnTimeout: time.Hour,
|
||||
ResponseHeaderTimeout: time.Hour,
|
||||
ExpectContinueTimeout: time.Hour,
|
||||
},
|
||||
}
|
||||
defer c.CloseIdleConnections()
|
||||
llm, err := ollama.New(
|
||||
ollama.WithModel(ai.model),
|
||||
ollama.WithServerURL(ai.url),
|
||||
ollama.WithHTTPClient(c),
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
|
||||
}
|
||||
|
||||
type AILocal struct {
|
||||
checkpointPath string
|
||||
tokenizerPath string
|
||||
temperature float64
|
||||
steps int
|
||||
topp float64
|
||||
}
|
||||
|
||||
func NewAILocal(
|
||||
checkpointPath string,
|
||||
tokenizerPath string,
|
||||
temperature float64,
|
||||
steps int,
|
||||
topp float64,
|
||||
) AILocal {
|
||||
return AILocal{
|
||||
checkpointPath: checkpointPath,
|
||||
tokenizerPath: tokenizerPath,
|
||||
temperature: temperature,
|
||||
steps: steps,
|
||||
topp: topp,
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/nikolaydubina/llama2.go/blob/master/main.go
|
||||
func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
|
||||
checkpointFile, err := os.OpenFile(ai.checkpointPath, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer checkpointFile.Close()
|
||||
|
||||
config, err := llama2.NewConfigFromCheckpoint(checkpointFile)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
isSharedWeights := config.VocabSize > 0
|
||||
if config.VocabSize < 0 {
|
||||
config.VocabSize = -config.VocabSize
|
||||
}
|
||||
|
||||
tokenizerFile, err := os.OpenFile(ai.tokenizerPath, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer tokenizerFile.Close()
|
||||
|
||||
vocab := llama2.NewVocabFromFile(config.VocabSize, tokenizerFile)
|
||||
|
||||
w := llama2.NewTransformerWeightsFromCheckpoint(config, checkpointFile, isSharedWeights)
|
||||
|
||||
// right now we cannot run for more than config.SeqLen steps
|
||||
steps := ai.steps
|
||||
if steps <= 0 || steps > config.SeqLen {
|
||||
steps = config.SeqLen
|
||||
}
|
||||
|
||||
runState := llama2.NewRunState(config)
|
||||
|
||||
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
|
||||
|
||||
out := bytes.NewBuffer(nil)
|
||||
|
||||
// the current position we are in
|
||||
var token int = 1 // 1 = BOS token in llama-2 sentencepiece
|
||||
var pos = 0
|
||||
|
||||
for pos < steps {
|
||||
// forward the transformer to get logits for the next token
|
||||
llama2.Transformer(token, pos, config, runState, w)
|
||||
|
||||
var next int
|
||||
if pos < len(promptTokens) {
|
||||
next = promptTokens[pos]
|
||||
} else {
|
||||
// sample the next token
|
||||
if ai.temperature == 0 {
|
||||
// greedy argmax sampling
|
||||
next = nn.ArgMax(runState.Logits)
|
||||
} else {
|
||||
// apply the temperature to the logits
|
||||
for q := 0; q < config.VocabSize; q++ {
|
||||
runState.Logits[q] /= float32(ai.temperature)
|
||||
}
|
||||
// apply softmax to the logits to the probabilities for next token
|
||||
nn.SoftMax(runState.Logits)
|
||||
// we now want to sample from this distribution to get the next token
|
||||
if ai.topp <= 0 || ai.topp >= 1 {
|
||||
// simply sample from the predicted probability distribution
|
||||
next = nn.Sample(runState.Logits)
|
||||
} else {
|
||||
// top-p (nucleus) sampling, clamping the least likely tokens to zero
|
||||
next = nn.SampleTopP(runState.Logits, float32(ai.topp))
|
||||
}
|
||||
}
|
||||
}
|
||||
pos++
|
||||
|
||||
// data-dependent terminating condition: the BOS (1) token delimits sequences
|
||||
if next == 1 {
|
||||
break
|
||||
}
|
||||
|
||||
// following BOS (1) token, sentencepiece decoder strips any leading whitespace
|
||||
var tokenStr string
|
||||
if token == 1 && vocab.Words[next][0] == ' ' {
|
||||
tokenStr = vocab.Words[next][1:]
|
||||
} else {
|
||||
tokenStr = vocab.Words[next]
|
||||
}
|
||||
out.Write([]byte(tokenStr))
|
||||
|
||||
// advance forward
|
||||
token = next
|
||||
}
|
||||
out.Write([]byte("\n"))
|
||||
|
||||
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
|
||||
}
|
||||
|
||||
106
ai_test.go
106
ai_test.go
@@ -4,11 +4,6 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -22,53 +17,7 @@ func TestAINoop(t *testing.T) {
|
||||
|
||||
func TestAIOllama(t *testing.T) {
|
||||
t.Parallel()
|
||||
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
||||
|
||||
testAI(t, ai)
|
||||
}
|
||||
|
||||
func TestAILocal(t *testing.T) {
|
||||
t.Parallel()
|
||||
d := os.TempDir()
|
||||
checkpoints := "checkpoints"
|
||||
tokenizer := "tokenizer"
|
||||
for u, p := range map[string]*string{
|
||||
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
|
||||
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
|
||||
} {
|
||||
func() {
|
||||
*p = path.Base(u)
|
||||
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
|
||||
t.Logf("downloading %s from %s", u, *p)
|
||||
|
||||
resp, err := http.Get(u)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
f, err := os.Create(path.Join(d, *p))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := io.Copy(f, resp.Body); err != nil {
|
||||
f.Close()
|
||||
os.Remove(path.Join(d, *p))
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
ai := NewAILocal(
|
||||
path.Join(d, checkpoints),
|
||||
path.Join(d, tokenizer),
|
||||
0.9,
|
||||
256,
|
||||
0.9,
|
||||
)
|
||||
ai := NewAIOllama("http://localhost:11434", "llama3")
|
||||
|
||||
testAI(t, ai)
|
||||
}
|
||||
@@ -78,7 +27,7 @@ func testAI(t *testing.T, ai AI) {
|
||||
defer can()
|
||||
|
||||
t.Run("mvp", func(t *testing.T) {
|
||||
if result, err := ai.Do(ctx, "hello world"); err != nil {
|
||||
if result, err := ai.Do(ctx, "Tell me a fun fact."); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(result) < 3 {
|
||||
t.Error(result)
|
||||
@@ -87,32 +36,33 @@ func testAI(t *testing.T, ai AI) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("simulation", func(t *testing.T) {
|
||||
d := NewRAM()
|
||||
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
||||
s := NewStorage(d)
|
||||
/*
|
||||
t.Run("simulation", func(t *testing.T) {
|
||||
d := NewRAM()
|
||||
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
||||
s := NewStorage(d)
|
||||
|
||||
threads, err := s.Threads(ctx)
|
||||
if err != nil || len(threads) < 1 {
|
||||
t.Fatal(err)
|
||||
}
|
||||
threads, err := s.Threads(ctx)
|
||||
if err != nil || len(threads) < 1 {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
thread, err := s.Thread(ctx, threads[0])
|
||||
if err != nil || len(thread) < 1 {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
input := fmt.Sprintf(`
|
||||
Summarize the following forum converstion.
|
||||
---
|
||||
%s
|
||||
`, thread[0].Plaintext)
|
||||
t.Logf("\n\t%s", input)
|
||||
result, err := ai.Do(ctx, input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
||||
})
|
||||
thread, err := s.Thread(ctx, threads[0])
|
||||
if err != nil || len(thread) < 1 {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
input := fmt.Sprintf(`
|
||||
Summarize the following forum converstion.
|
||||
---
|
||||
%s
|
||||
`, thread[0].Plaintext)
|
||||
t.Logf("\n\t%s", input)
|
||||
result, err := ai.Do(ctx, input)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
||||
})
|
||||
*/
|
||||
}
|
||||
|
||||
30
config.go
30
config.go
@@ -23,10 +23,10 @@ type Config struct {
|
||||
BasicAuthUser string
|
||||
BasicAuthPassword string
|
||||
FillWithTestdata bool
|
||||
OllamaURL string
|
||||
OllamaUrl string
|
||||
OllamaModel string
|
||||
LocalCheckpoint string
|
||||
LocalTokenizer string
|
||||
RecapPromptIntro string
|
||||
RecapPrompt string
|
||||
AssetPattern string
|
||||
DatacenterPattern string
|
||||
EventNamePattern string
|
||||
@@ -34,7 +34,9 @@ type Config struct {
|
||||
storage Storage
|
||||
ai AI
|
||||
slackToModelPipeline Pipeline
|
||||
slackScrapePipeline Pipeline
|
||||
modelToPersistencePipeline Pipeline
|
||||
persistenceToRecapPipeline Pipeline
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -50,10 +52,12 @@ func newConfig(ctx context.Context) (Config, error) {
|
||||
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
|
||||
def := Config{
|
||||
Port: 38080,
|
||||
OllamaModel: "gemma:2b",
|
||||
OllamaModel: "llama3",
|
||||
AssetPattern: renderAssetPattern,
|
||||
DatacenterPattern: renderDatacenterPattern,
|
||||
EventNamePattern: renderEventNamePattern,
|
||||
RecapPromptIntro: "A Slack thread begin with the following original post.",
|
||||
RecapPrompt: "Summarize all of the following Slack thread responses in 1 sentence without any leading text.",
|
||||
}
|
||||
|
||||
var m map[string]any
|
||||
@@ -130,10 +134,8 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
}
|
||||
result.storage = storage
|
||||
|
||||
if result.OllamaURL != "" {
|
||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
||||
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
||||
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
|
||||
if result.OllamaUrl != "" {
|
||||
result.ai = NewAIOllama(result.OllamaUrl, result.OllamaModel)
|
||||
} else {
|
||||
result.ai = NewAINoop()
|
||||
}
|
||||
@@ -150,5 +152,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
}
|
||||
result.modelToPersistencePipeline = modelToPersistencePipeline
|
||||
|
||||
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.slackScrapePipeline = slackScrapePipeline
|
||||
|
||||
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.persistenceToRecapPipeline = persistenceToRecapPipeline
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
@@ -35,7 +36,12 @@ func NewTestDriver(t *testing.T, optionalP ...string) Driver {
|
||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||
engine := "sqlite"
|
||||
if conn == "" {
|
||||
conn = ":memory:"
|
||||
f, err := os.CreateTemp(os.TempDir(), "spoc-bot-vr-undef-*.db")
|
||||
if err != nil {
|
||||
return Driver{}, err
|
||||
}
|
||||
f.Close()
|
||||
conn = f.Name()
|
||||
} else {
|
||||
if u, err := url.Parse(conn); err != nil {
|
||||
return Driver{}, err
|
||||
|
||||
5
go.mod
5
go.mod
@@ -6,9 +6,9 @@ require (
|
||||
github.com/glebarez/go-sqlite v1.21.2
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/nikolaydubina/llama2.go v0.7.1
|
||||
github.com/tmc/langchaingo v0.1.8
|
||||
gotest.tools/v3 v3.5.1
|
||||
golang.org/x/time v0.5.0
|
||||
gotest.tools v2.2.0+incompatible
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -20,7 +20,6 @@ require (
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
gotest.tools v2.2.0+incompatible // indirect
|
||||
modernc.org/libc v1.22.5 // indirect
|
||||
modernc.org/mathutil v1.5.0 // indirect
|
||||
modernc.org/memory v1.5.0 // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||
@@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||
|
||||
108
main.go
108
main.go
@@ -39,6 +39,8 @@ func run(ctx context.Context, cfg Config) error {
|
||||
case err := <-processPipelines(ctx,
|
||||
cfg.slackToModelPipeline,
|
||||
cfg.modelToPersistencePipeline,
|
||||
cfg.slackScrapePipeline,
|
||||
cfg.persistenceToRecapPipeline,
|
||||
):
|
||||
return err
|
||||
case err := <-listenAndServe(ctx, cfg):
|
||||
@@ -88,6 +90,7 @@ func newHandler(cfg Config) http.HandlerFunc {
|
||||
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if cfg.Debug {
|
||||
@@ -106,90 +109,45 @@ func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
event := r.URL.Query().Get("id")
|
||||
b, _ := json.Marshal(ModelIDs{Event: event})
|
||||
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"event": event})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
channel := r.Header.Get("slack-channel")
|
||||
token := r.Header.Get("slack-oauth-token")
|
||||
|
||||
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
|
||||
|
||||
httpc := http.Client{Timeout: time.Second}
|
||||
get := func(url string) ([]byte, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req = req.WithContext(r.Context())
|
||||
|
||||
resp, err := httpc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
return io.ReadAll(resp.Body)
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
n := 0
|
||||
|
||||
for len(urls) > 0 {
|
||||
url := urls[0]
|
||||
urls = urls[1:]
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
body, err := get(url)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
var page struct {
|
||||
Messages []json.RawMessage
|
||||
}
|
||||
if err := json.Unmarshal(body, &page); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
if cfg.Debug {
|
||||
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
||||
}
|
||||
b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
n += 1
|
||||
}
|
||||
if !strings.Contains(url, "ts=") {
|
||||
var peek struct {
|
||||
ThreadTS string `json:"thread_ts"`
|
||||
}
|
||||
json.Unmarshal(messageJSON, &peek)
|
||||
if peek.ThreadTS != "" {
|
||||
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
job, _ := json.Marshal(SlackScrape{
|
||||
Latest: time.Now().Unix(),
|
||||
Oldest: since.Unix(),
|
||||
ThreadTS: "",
|
||||
Channel: r.Header.Get("slack-channel"),
|
||||
Token: r.Header.Get("slack-oauth-token"),
|
||||
})
|
||||
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
44
main_test.go
44
main_test.go
@@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
|
||||
return int(port)
|
||||
}()
|
||||
u := fmt.Sprintf("http://localhost:%d", port)
|
||||
var err error
|
||||
|
||||
cfg := Config{}
|
||||
cfg.DatacenterPattern = renderDatacenterPattern
|
||||
@@ -44,10 +45,16 @@ func TestRun(t *testing.T) {
|
||||
cfg.Port = port
|
||||
cfg.driver = NewTestDriver(t)
|
||||
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
||||
cfg.ai = NewAINoop()
|
||||
cfg.SlackToken = "redacted"
|
||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
||||
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
|
||||
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
||||
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||
@@ -83,4 +90,41 @@ func TestRun(t *testing.T) {
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
|
||||
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
}
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
t.Fatal("timed out waiting for recap")
|
||||
}
|
||||
|
||||
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
|
||||
if thread.Recap == "" {
|
||||
t.Error(thread.Recap)
|
||||
}
|
||||
t.Log(thread.Recap)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ type Thread struct {
|
||||
TS uint64
|
||||
Channel string
|
||||
EventID string
|
||||
Recap string
|
||||
}
|
||||
|
||||
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
||||
|
||||
@@ -26,15 +26,14 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer = NewNoopQueue()
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newModelToPersistenceProcess(cfg.storage),
|
||||
process: newModelToPersistenceProcess(cfg, cfg.storage),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||
func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
|
||||
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||
var m Models
|
||||
if err := json.Unmarshal(models, &m); err != nil {
|
||||
@@ -56,7 +55,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("persisted models")
|
||||
if cfg.Debug {
|
||||
log.Printf("persisted models")
|
||||
}
|
||||
return json.Marshal(ModelIDs{
|
||||
Event: m.Event.ID,
|
||||
Thread: m.Thread.ID,
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
||||
|
||||
d := NewTestDriver(t)
|
||||
s, _ := NewStorage(ctx, d)
|
||||
process := newModelToPersistenceProcess(s)
|
||||
process := newModelToPersistenceProcess(Config{}, s)
|
||||
|
||||
_, _ = ctx, process
|
||||
|
||||
|
||||
82
queue.go
82
queue.go
@@ -20,14 +20,14 @@ func NewNoopQueue() Queue {
|
||||
|
||||
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||
if _, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
id TEXT PRIMARY KEY,
|
||||
topic TEXT NOT NULL,
|
||||
updated INTEGER NOT NULL,
|
||||
reservation TEXT,
|
||||
payload TEXT
|
||||
);
|
||||
`); err != nil {
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
id TEXT PRIMARY KEY,
|
||||
topic TEXT NOT NULL,
|
||||
updated INTEGER NOT NULL,
|
||||
reservation TEXT,
|
||||
payload TEXT
|
||||
);
|
||||
`); err != nil {
|
||||
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
||||
}
|
||||
return Queue{topic: topic, driver: driver}, nil
|
||||
@@ -37,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
||||
if q.driver.DB == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := q.driver.ExecContext(ctx, `
|
||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
||||
`,
|
||||
result, err := q.driver.ExecContext(ctx, `
|
||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
|
||||
`,
|
||||
uuid.New().String(),
|
||||
q.topic,
|
||||
time.Now().Unix(),
|
||||
b,
|
||||
uuid.New().String(),
|
||||
)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n, err := result.RowsAffected(); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||
@@ -71,22 +79,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||
reservation := []byte(uuid.New().String())
|
||||
var payload []byte
|
||||
if result, err := q.driver.ExecContext(ctx, `
|
||||
UPDATE queue
|
||||
SET
|
||||
updated = $1, reservation = $2
|
||||
WHERE
|
||||
id IN (
|
||||
SELECT id
|
||||
FROM queue
|
||||
WHERE
|
||||
topic = $3
|
||||
AND (
|
||||
reservation IS NULL
|
||||
OR $4 - updated > 60
|
||||
)
|
||||
LIMIT 1
|
||||
)
|
||||
`, now, reservation, q.topic, now); err != nil {
|
||||
UPDATE queue
|
||||
SET
|
||||
updated = $1, reservation = $2
|
||||
WHERE
|
||||
id IN (
|
||||
SELECT id
|
||||
FROM queue
|
||||
WHERE
|
||||
topic = $3
|
||||
AND (
|
||||
reservation IS NULL
|
||||
OR $4 - updated > 60
|
||||
)
|
||||
LIMIT 1
|
||||
)
|
||||
`, now, reservation, q.topic, now); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
||||
} else if n, err := result.RowsAffected(); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||
@@ -95,11 +103,11 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||
}
|
||||
|
||||
row := q.driver.QueryRowContext(ctx, `
|
||||
SELECT payload
|
||||
FROM queue
|
||||
WHERE reservation=$1
|
||||
LIMIT 1
|
||||
`, reservation)
|
||||
SELECT payload
|
||||
FROM queue
|
||||
WHERE reservation=$1
|
||||
LIMIT 1
|
||||
`, reservation)
|
||||
if err := row.Err(); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
||||
@@ -118,9 +126,9 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
||||
return nil
|
||||
}
|
||||
result, err := q.driver.ExecContext(ctx, `
|
||||
DELETE FROM queue
|
||||
WHERE reservation=$1
|
||||
`, reservation)
|
||||
DELETE FROM queue
|
||||
WHERE reservation=$1
|
||||
`, reservation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
84
recap.go
Normal file
84
recap.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type PersistenceToRecap struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer := NewNoopQueue()
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newPersistenceToRecapProcess(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newPersistenceToRecapProcess(cfg Config) processFunc {
|
||||
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
|
||||
var m ModelIDs
|
||||
if err := json.Unmarshal(modelIDs, &m); err != nil {
|
||||
return nil, fmt.Errorf("received non model ids payload: %w", err)
|
||||
}
|
||||
|
||||
if m.Event == "" {
|
||||
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
|
||||
return nil, err
|
||||
} else if !event.Resolved {
|
||||
} else if err := func() error {
|
||||
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, thread := range threads {
|
||||
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(messages) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
prompt := []string{
|
||||
cfg.RecapPromptIntro,
|
||||
"---",
|
||||
messages[0].Plaintext,
|
||||
"---",
|
||||
cfg.RecapPrompt,
|
||||
"---",
|
||||
}
|
||||
for _, message := range messages[1:] {
|
||||
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
|
||||
}
|
||||
|
||||
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
thread.Recap = recap
|
||||
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Println("recapped", thread.ID)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.Debug {
|
||||
log.Printf("persisted recap")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
50
recap_test.go
Normal file
50
recap_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
func TestNewPersistenceToRecapProcess(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
s, _ := NewStorage(ctx, d)
|
||||
|
||||
cfg := Config{
|
||||
driver: d,
|
||||
storage: s,
|
||||
ai: NewAINoop(),
|
||||
Debug: true,
|
||||
}
|
||||
|
||||
proc := newPersistenceToRecapProcess(cfg)
|
||||
|
||||
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(ModelIDs{Event: "Event"})
|
||||
if _, err := proc(ctx, b); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
|
||||
t.Error(err)
|
||||
} else if thread.Recap == "" {
|
||||
t.Error("no recap:", thread.Recap)
|
||||
} else {
|
||||
t.Logf("%+v", thread)
|
||||
}
|
||||
}
|
||||
4
slack.go
4
slack.go
@@ -77,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||
}
|
||||
|
||||
log.Printf("parsed slack message into models")
|
||||
if cfg.Debug {
|
||||
log.Printf("parsed slack message into models")
|
||||
}
|
||||
return json.Marshal(Models{
|
||||
Event: event,
|
||||
Message: message,
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
|
||||
"",
|
||||
"",
|
||||
"Datastores Non-Critical",
|
||||
false,
|
||||
true,
|
||||
),
|
||||
Message: model.NewMessage(
|
||||
"1712927439.728409/1712927439",
|
||||
@@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "F4511E",
|
||||
Color: "2ecc71",
|
||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
@@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
Author: "Opsgenie for Alert Management",
|
||||
Team: "Datastores Non-Critical",
|
||||
Resolved: true,
|
||||
},
|
||||
},
|
||||
"opsgenie_alert_resolved.json": {
|
||||
|
||||
149
slackscrape.go
Normal file
149
slackscrape.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type SlackScrape struct {
|
||||
Latest int64
|
||||
Oldest int64
|
||||
ThreadTS string
|
||||
Channel string
|
||||
Token string
|
||||
}
|
||||
|
||||
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: cfg.slackScrapePipeline.reader,
|
||||
process: newSlackScrapeProcess(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newSlackScrapeProcess(cfg Config) processFunc {
|
||||
limiter := rate.NewLimiter(0.5, 1)
|
||||
return func(ctx context.Context, jobb []byte) ([]byte, error) {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var job SlackScrape
|
||||
if err := json.Unmarshal(jobb, &job); err != nil {
|
||||
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
|
||||
}
|
||||
|
||||
u := url.URL{
|
||||
Scheme: "https",
|
||||
Host: "slack.com",
|
||||
Path: "/api/conversations.history",
|
||||
}
|
||||
q := url.Values{}
|
||||
q.Set("channel", job.Channel)
|
||||
q.Set("latest", strconv.FormatInt(job.Latest, 10))
|
||||
q.Set("limit", "999")
|
||||
q.Set("inclusive", "true")
|
||||
if job.ThreadTS != "" {
|
||||
u.Path = "/api/conversations.replies"
|
||||
q.Set("ts", job.ThreadTS)
|
||||
}
|
||||
if job.Oldest != 0 {
|
||||
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
|
||||
}
|
||||
u.RawQuery = q.Encode()
|
||||
url := u.String()
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+job.Token)
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
httpc := http.Client{Timeout: time.Second}
|
||||
resp, err := httpc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var page struct {
|
||||
Messages []json.RawMessage
|
||||
}
|
||||
if err := json.Unmarshal(body, &page); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newLatest := float64(job.Latest)
|
||||
for _, messageJSON := range page.Messages {
|
||||
if cfg.Debug {
|
||||
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
|
||||
}
|
||||
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var peekTS struct {
|
||||
TS float64 `json:"ts,string"`
|
||||
}
|
||||
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
|
||||
newLatest = peekTS.TS
|
||||
}
|
||||
if job.ThreadTS == "" {
|
||||
var peek struct {
|
||||
ThreadTS string `json:"thread_ts"`
|
||||
}
|
||||
json.Unmarshal(messageJSON, &peek)
|
||||
if peek.ThreadTS != "" {
|
||||
clone := job
|
||||
clone.ThreadTS = peek.ThreadTS
|
||||
clone.Oldest = 0
|
||||
b, _ := json.Marshal(clone)
|
||||
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(page.Messages) == 999 {
|
||||
clone := job
|
||||
clone.Latest = int64(newLatest)
|
||||
b, _ := json.Marshal(clone)
|
||||
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
|
||||
}
|
||||
|
||||
log.Printf("scraped %v from %s", len(page.Messages), url)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
102
storage.go
102
storage.go
@@ -67,10 +67,112 @@ func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (s Storage) GetEventThreads(ctx context.Context, ID string) ([]model.Thread, error) {
|
||||
return s.selectThreadsWhere(ctx, "EventID = $1", ID)
|
||||
}
|
||||
|
||||
func (s Storage) GetThreadMessages(ctx context.Context, ID string) ([]model.Message, error) {
|
||||
return s.selectMessagesWhere(ctx, "ThreadID = $1", ID)
|
||||
}
|
||||
|
||||
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||
return s.upsert(ctx, "threads", thread)
|
||||
}
|
||||
|
||||
func (s Storage) selectThreadsWhere(ctx context.Context, clause string, args ...any) ([]model.Thread, error) {
|
||||
keys, _, _, _, err := keysArgsKeyargsValues(model.Thread{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args2 := make([]any, len(args))
|
||||
for i := range args {
|
||||
args2[i], _ = json.Marshal(args[i])
|
||||
}
|
||||
scanTargets := make([]any, len(keys))
|
||||
|
||||
q := fmt.Sprintf(`
|
||||
SELECT %s FROM threads WHERE %s
|
||||
ORDER BY TS ASC
|
||||
`, strings.Join(keys, ", "), clause)
|
||||
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []model.Thread
|
||||
for rows.Next() {
|
||||
for i := range scanTargets {
|
||||
scanTargets[i] = &[]byte{}
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanTargets...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := map[string]json.RawMessage{}
|
||||
for i, k := range keys {
|
||||
m[k] = *scanTargets[i].(*[]byte)
|
||||
}
|
||||
b, _ := json.Marshal(m)
|
||||
|
||||
var one model.Thread
|
||||
if err := json.Unmarshal(b, &one); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, one)
|
||||
}
|
||||
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s Storage) selectMessagesWhere(ctx context.Context, clause string, args ...any) ([]model.Message, error) {
|
||||
keys, _, _, _, err := keysArgsKeyargsValues(model.Message{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args2 := make([]any, len(args))
|
||||
for i := range args {
|
||||
args2[i], _ = json.Marshal(args[i])
|
||||
}
|
||||
scanTargets := make([]any, len(keys))
|
||||
|
||||
q := fmt.Sprintf(`
|
||||
SELECT %s FROM messages WHERE %s
|
||||
ORDER BY TS ASC
|
||||
`, strings.Join(keys, ", "), clause)
|
||||
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []model.Message
|
||||
for rows.Next() {
|
||||
for i := range scanTargets {
|
||||
scanTargets[i] = &[]byte{}
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanTargets...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := map[string]json.RawMessage{}
|
||||
for i, k := range keys {
|
||||
m[k] = *scanTargets[i].(*[]byte)
|
||||
}
|
||||
b, _ := json.Marshal(m)
|
||||
|
||||
var one model.Message
|
||||
if err := json.Unmarshal(b, &one); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, one)
|
||||
}
|
||||
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||
if questions := strings.Count(clause, "$"); questions != len(args) {
|
||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||
|
||||
@@ -2,6 +2,8 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -85,4 +87,64 @@ func TestStorage(t *testing.T) {
|
||||
t.Fatal("unexpected result from get:", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get thread messages", func(t *testing.T) {
|
||||
thread := fmt.Sprintf("thread-%d", rand.Int())
|
||||
m := model.NewMessage(
|
||||
"ID",
|
||||
1,
|
||||
"Author",
|
||||
"Plaintext",
|
||||
thread,
|
||||
)
|
||||
|
||||
if err := s.UpsertMessage(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on insert:", err)
|
||||
} else if m2, err := s.GetMessage(ctx, m.ID); err != nil {
|
||||
t.Fatal("unexpected error on upsert-get:", err)
|
||||
} else if m2 != m {
|
||||
t.Errorf("expected %+v but got %+v", m, m2)
|
||||
}
|
||||
|
||||
msgs, err := s.GetThreadMessages(ctx, thread)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(msgs) != 1 {
|
||||
t.Fatal(msgs)
|
||||
} else if msgs[0].ThreadID != m.ThreadID {
|
||||
t.Fatal(msgs[0].ThreadID)
|
||||
} else if msgs[0] != m {
|
||||
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get event threads", func(t *testing.T) {
|
||||
event := fmt.Sprintf("event-%d", rand.Int())
|
||||
m := model.NewThread(
|
||||
"ID",
|
||||
"URL",
|
||||
1,
|
||||
"Channel",
|
||||
event,
|
||||
)
|
||||
|
||||
if err := s.UpsertThread(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on insert:", err)
|
||||
} else if m2, err := s.GetThread(ctx, m.ID); err != nil {
|
||||
t.Fatal("unexpected error on upsert-get:", err)
|
||||
} else if m2 != m {
|
||||
t.Errorf("expected %+v but got %+v", m, m2)
|
||||
}
|
||||
|
||||
msgs, err := s.GetEventThreads(ctx, event)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(msgs) != 1 {
|
||||
t.Fatal(msgs)
|
||||
} else if msgs[0].EventID != m.EventID {
|
||||
t.Fatal(msgs[0].EventID)
|
||||
} else if msgs[0] != m {
|
||||
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
3
testdata/slack_events/opsgenie_alert.json
vendored
3
testdata/slack_events/opsgenie_alert.json
vendored
@@ -29,7 +29,8 @@
|
||||
"attachments": [
|
||||
{
|
||||
"id": 1,
|
||||
"color": "F4511E",
|
||||
"realcolor": "F4511E",
|
||||
"color": "2ecc71",
|
||||
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
|
||||
Reference in New Issue
Block a user