Compare commits
16 Commits
8557ddc522
...
5785ea37ae
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5785ea37ae | ||
|
|
f27d416a5a | ||
|
|
81793876f8 | ||
|
|
6d81164161 | ||
|
|
20256bd6b4 | ||
|
|
e5e98e2890 | ||
|
|
4fb26ec775 | ||
|
|
782b9ec3cf | ||
|
|
9d7f69bd8a | ||
|
|
12de99da57 | ||
|
|
81fe8070ca | ||
|
|
79de56e236 | ||
|
|
f485b5ea88 | ||
|
|
894536d209 | ||
|
|
f8861a73b5 | ||
|
|
14de286415 |
@@ -4,14 +4,9 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
|
|||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- dedupe critical+noncritical
|
|
||||||
- what SLO/SLI can I help benoit with
|
- what SLO/SLI can I help benoit with
|
||||||
- break into smaller goals
|
|
||||||
- sell to the team
|
|
||||||
- scott; like to keep state in incident.io and zendesk
|
- scott; like to keep state in incident.io and zendesk
|
||||||
- @spoc -ignore, @spoc -s summary
|
- @spoc -ignore, @spoc -s summary
|
||||||
- limit rps from rpc/slackscrape
|
|
||||||
- rpc/slackscrape to async
|
|
||||||
- limit queue retries
|
- limit queue retries
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
139
ai.go
139
ai.go
@@ -1,13 +1,10 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"net/http"
|
||||||
"strings"
|
"time"
|
||||||
|
|
||||||
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
|
|
||||||
"github.com/nikolaydubina/llama2.go/llama2"
|
|
||||||
"github.com/tmc/langchaingo/llms"
|
"github.com/tmc/langchaingo/llms"
|
||||||
"github.com/tmc/langchaingo/llms/ollama"
|
"github.com/tmc/langchaingo/llms/ollama"
|
||||||
)
|
)
|
||||||
@@ -37,133 +34,23 @@ func NewAIOllama(url, model string) AIOllama {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
|
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
|
||||||
|
c := &http.Client{
|
||||||
|
Timeout: time.Hour,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
//DisableKeepAlives: true,
|
||||||
|
IdleConnTimeout: time.Hour,
|
||||||
|
ResponseHeaderTimeout: time.Hour,
|
||||||
|
ExpectContinueTimeout: time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
defer c.CloseIdleConnections()
|
||||||
llm, err := ollama.New(
|
llm, err := ollama.New(
|
||||||
ollama.WithModel(ai.model),
|
ollama.WithModel(ai.model),
|
||||||
ollama.WithServerURL(ai.url),
|
ollama.WithServerURL(ai.url),
|
||||||
|
ollama.WithHTTPClient(c),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
|
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
|
||||||
}
|
}
|
||||||
|
|
||||||
type AILocal struct {
|
|
||||||
checkpointPath string
|
|
||||||
tokenizerPath string
|
|
||||||
temperature float64
|
|
||||||
steps int
|
|
||||||
topp float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAILocal(
|
|
||||||
checkpointPath string,
|
|
||||||
tokenizerPath string,
|
|
||||||
temperature float64,
|
|
||||||
steps int,
|
|
||||||
topp float64,
|
|
||||||
) AILocal {
|
|
||||||
return AILocal{
|
|
||||||
checkpointPath: checkpointPath,
|
|
||||||
tokenizerPath: tokenizerPath,
|
|
||||||
temperature: temperature,
|
|
||||||
steps: steps,
|
|
||||||
topp: topp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://github.com/nikolaydubina/llama2.go/blob/master/main.go
|
|
||||||
func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
|
|
||||||
checkpointFile, err := os.OpenFile(ai.checkpointPath, os.O_RDONLY, 0)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer checkpointFile.Close()
|
|
||||||
|
|
||||||
config, err := llama2.NewConfigFromCheckpoint(checkpointFile)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
isSharedWeights := config.VocabSize > 0
|
|
||||||
if config.VocabSize < 0 {
|
|
||||||
config.VocabSize = -config.VocabSize
|
|
||||||
}
|
|
||||||
|
|
||||||
tokenizerFile, err := os.OpenFile(ai.tokenizerPath, os.O_RDONLY, 0)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer tokenizerFile.Close()
|
|
||||||
|
|
||||||
vocab := llama2.NewVocabFromFile(config.VocabSize, tokenizerFile)
|
|
||||||
|
|
||||||
w := llama2.NewTransformerWeightsFromCheckpoint(config, checkpointFile, isSharedWeights)
|
|
||||||
|
|
||||||
// right now we cannot run for more than config.SeqLen steps
|
|
||||||
steps := ai.steps
|
|
||||||
if steps <= 0 || steps > config.SeqLen {
|
|
||||||
steps = config.SeqLen
|
|
||||||
}
|
|
||||||
|
|
||||||
runState := llama2.NewRunState(config)
|
|
||||||
|
|
||||||
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
|
|
||||||
|
|
||||||
out := bytes.NewBuffer(nil)
|
|
||||||
|
|
||||||
// the current position we are in
|
|
||||||
var token int = 1 // 1 = BOS token in llama-2 sentencepiece
|
|
||||||
var pos = 0
|
|
||||||
|
|
||||||
for pos < steps {
|
|
||||||
// forward the transformer to get logits for the next token
|
|
||||||
llama2.Transformer(token, pos, config, runState, w)
|
|
||||||
|
|
||||||
var next int
|
|
||||||
if pos < len(promptTokens) {
|
|
||||||
next = promptTokens[pos]
|
|
||||||
} else {
|
|
||||||
// sample the next token
|
|
||||||
if ai.temperature == 0 {
|
|
||||||
// greedy argmax sampling
|
|
||||||
next = nn.ArgMax(runState.Logits)
|
|
||||||
} else {
|
|
||||||
// apply the temperature to the logits
|
|
||||||
for q := 0; q < config.VocabSize; q++ {
|
|
||||||
runState.Logits[q] /= float32(ai.temperature)
|
|
||||||
}
|
|
||||||
// apply softmax to the logits to the probabilities for next token
|
|
||||||
nn.SoftMax(runState.Logits)
|
|
||||||
// we now want to sample from this distribution to get the next token
|
|
||||||
if ai.topp <= 0 || ai.topp >= 1 {
|
|
||||||
// simply sample from the predicted probability distribution
|
|
||||||
next = nn.Sample(runState.Logits)
|
|
||||||
} else {
|
|
||||||
// top-p (nucleus) sampling, clamping the least likely tokens to zero
|
|
||||||
next = nn.SampleTopP(runState.Logits, float32(ai.topp))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pos++
|
|
||||||
|
|
||||||
// data-dependent terminating condition: the BOS (1) token delimits sequences
|
|
||||||
if next == 1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// following BOS (1) token, sentencepiece decoder strips any leading whitespace
|
|
||||||
var tokenStr string
|
|
||||||
if token == 1 && vocab.Words[next][0] == ' ' {
|
|
||||||
tokenStr = vocab.Words[next][1:]
|
|
||||||
} else {
|
|
||||||
tokenStr = vocab.Words[next]
|
|
||||||
}
|
|
||||||
out.Write([]byte(tokenStr))
|
|
||||||
|
|
||||||
// advance forward
|
|
||||||
token = next
|
|
||||||
}
|
|
||||||
out.Write([]byte("\n"))
|
|
||||||
|
|
||||||
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
|
|
||||||
}
|
|
||||||
|
|||||||
106
ai_test.go
106
ai_test.go
@@ -4,11 +4,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -22,53 +17,7 @@ func TestAINoop(t *testing.T) {
|
|||||||
|
|
||||||
func TestAIOllama(t *testing.T) {
|
func TestAIOllama(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
ai := NewAIOllama("http://localhost:11434", "llama3")
|
||||||
|
|
||||||
testAI(t, ai)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAILocal(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
d := os.TempDir()
|
|
||||||
checkpoints := "checkpoints"
|
|
||||||
tokenizer := "tokenizer"
|
|
||||||
for u, p := range map[string]*string{
|
|
||||||
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
|
|
||||||
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
|
|
||||||
} {
|
|
||||||
func() {
|
|
||||||
*p = path.Base(u)
|
|
||||||
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
|
|
||||||
t.Logf("downloading %s from %s", u, *p)
|
|
||||||
|
|
||||||
resp, err := http.Get(u)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
f, err := os.Create(path.Join(d, *p))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
if _, err := io.Copy(f, resp.Body); err != nil {
|
|
||||||
f.Close()
|
|
||||||
os.Remove(path.Join(d, *p))
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
ai := NewAILocal(
|
|
||||||
path.Join(d, checkpoints),
|
|
||||||
path.Join(d, tokenizer),
|
|
||||||
0.9,
|
|
||||||
256,
|
|
||||||
0.9,
|
|
||||||
)
|
|
||||||
|
|
||||||
testAI(t, ai)
|
testAI(t, ai)
|
||||||
}
|
}
|
||||||
@@ -78,7 +27,7 @@ func testAI(t *testing.T, ai AI) {
|
|||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
t.Run("mvp", func(t *testing.T) {
|
t.Run("mvp", func(t *testing.T) {
|
||||||
if result, err := ai.Do(ctx, "hello world"); err != nil {
|
if result, err := ai.Do(ctx, "Tell me a fun fact."); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if len(result) < 3 {
|
} else if len(result) < 3 {
|
||||||
t.Error(result)
|
t.Error(result)
|
||||||
@@ -87,32 +36,33 @@ func testAI(t *testing.T, ai AI) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("simulation", func(t *testing.T) {
|
/*
|
||||||
d := NewRAM()
|
t.Run("simulation", func(t *testing.T) {
|
||||||
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
d := NewRAM()
|
||||||
s := NewStorage(d)
|
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
||||||
|
s := NewStorage(d)
|
||||||
|
|
||||||
threads, err := s.Threads(ctx)
|
threads, err := s.Threads(ctx)
|
||||||
if err != nil || len(threads) < 1 {
|
if err != nil || len(threads) < 1 {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
thread, err := s.Thread(ctx, threads[0])
|
thread, err := s.Thread(ctx, threads[0])
|
||||||
if err != nil || len(thread) < 1 {
|
if err != nil || len(thread) < 1 {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
input := fmt.Sprintf(`
|
|
||||||
Summarize the following forum converstion.
|
|
||||||
---
|
|
||||||
%s
|
|
||||||
`, thread[0].Plaintext)
|
|
||||||
t.Logf("\n\t%s", input)
|
|
||||||
result, err := ai.Do(ctx, input)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
|
||||||
})
|
|
||||||
|
|
||||||
|
input := fmt.Sprintf(`
|
||||||
|
Summarize the following forum converstion.
|
||||||
|
---
|
||||||
|
%s
|
||||||
|
`, thread[0].Plaintext)
|
||||||
|
t.Logf("\n\t%s", input)
|
||||||
|
result, err := ai.Do(ctx, input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
||||||
|
})
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|||||||
30
config.go
30
config.go
@@ -23,10 +23,10 @@ type Config struct {
|
|||||||
BasicAuthUser string
|
BasicAuthUser string
|
||||||
BasicAuthPassword string
|
BasicAuthPassword string
|
||||||
FillWithTestdata bool
|
FillWithTestdata bool
|
||||||
OllamaURL string
|
OllamaUrl string
|
||||||
OllamaModel string
|
OllamaModel string
|
||||||
LocalCheckpoint string
|
RecapPromptIntro string
|
||||||
LocalTokenizer string
|
RecapPrompt string
|
||||||
AssetPattern string
|
AssetPattern string
|
||||||
DatacenterPattern string
|
DatacenterPattern string
|
||||||
EventNamePattern string
|
EventNamePattern string
|
||||||
@@ -34,7 +34,9 @@ type Config struct {
|
|||||||
storage Storage
|
storage Storage
|
||||||
ai AI
|
ai AI
|
||||||
slackToModelPipeline Pipeline
|
slackToModelPipeline Pipeline
|
||||||
|
slackScrapePipeline Pipeline
|
||||||
modelToPersistencePipeline Pipeline
|
modelToPersistencePipeline Pipeline
|
||||||
|
persistenceToRecapPipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -50,10 +52,12 @@ func newConfig(ctx context.Context) (Config, error) {
|
|||||||
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
|
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
|
||||||
def := Config{
|
def := Config{
|
||||||
Port: 38080,
|
Port: 38080,
|
||||||
OllamaModel: "gemma:2b",
|
OllamaModel: "llama3",
|
||||||
AssetPattern: renderAssetPattern,
|
AssetPattern: renderAssetPattern,
|
||||||
DatacenterPattern: renderDatacenterPattern,
|
DatacenterPattern: renderDatacenterPattern,
|
||||||
EventNamePattern: renderEventNamePattern,
|
EventNamePattern: renderEventNamePattern,
|
||||||
|
RecapPromptIntro: "A Slack thread begin with the following original post.",
|
||||||
|
RecapPrompt: "Summarize all of the following Slack thread responses in 1 sentence without any leading text.",
|
||||||
}
|
}
|
||||||
|
|
||||||
var m map[string]any
|
var m map[string]any
|
||||||
@@ -130,10 +134,8 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
}
|
}
|
||||||
result.storage = storage
|
result.storage = storage
|
||||||
|
|
||||||
if result.OllamaURL != "" {
|
if result.OllamaUrl != "" {
|
||||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
result.ai = NewAIOllama(result.OllamaUrl, result.OllamaModel)
|
||||||
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
|
||||||
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
|
|
||||||
} else {
|
} else {
|
||||||
result.ai = NewAINoop()
|
result.ai = NewAINoop()
|
||||||
}
|
}
|
||||||
@@ -150,5 +152,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
}
|
}
|
||||||
result.modelToPersistencePipeline = modelToPersistencePipeline
|
result.modelToPersistencePipeline = modelToPersistencePipeline
|
||||||
|
|
||||||
|
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.slackScrapePipeline = slackScrapePipeline
|
||||||
|
|
||||||
|
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.persistenceToRecapPipeline = persistenceToRecapPipeline
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@@ -35,7 +36,12 @@ func NewTestDriver(t *testing.T, optionalP ...string) Driver {
|
|||||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||||
engine := "sqlite"
|
engine := "sqlite"
|
||||||
if conn == "" {
|
if conn == "" {
|
||||||
conn = ":memory:"
|
f, err := os.CreateTemp(os.TempDir(), "spoc-bot-vr-undef-*.db")
|
||||||
|
if err != nil {
|
||||||
|
return Driver{}, err
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
conn = f.Name()
|
||||||
} else {
|
} else {
|
||||||
if u, err := url.Parse(conn); err != nil {
|
if u, err := url.Parse(conn); err != nil {
|
||||||
return Driver{}, err
|
return Driver{}, err
|
||||||
|
|||||||
5
go.mod
5
go.mod
@@ -6,9 +6,9 @@ require (
|
|||||||
github.com/glebarez/go-sqlite v1.21.2
|
github.com/glebarez/go-sqlite v1.21.2
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/lib/pq v1.10.9
|
github.com/lib/pq v1.10.9
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1
|
|
||||||
github.com/tmc/langchaingo v0.1.8
|
github.com/tmc/langchaingo v0.1.8
|
||||||
gotest.tools/v3 v3.5.1
|
golang.org/x/time v0.5.0
|
||||||
|
gotest.tools v2.2.0+incompatible
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@@ -20,7 +20,6 @@ require (
|
|||||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
golang.org/x/sys v0.16.0 // indirect
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
gotest.tools v2.2.0+incompatible // indirect
|
|
||||||
modernc.org/libc v1.22.5 // indirect
|
modernc.org/libc v1.22.5 // indirect
|
||||||
modernc.org/mathutil v1.5.0 // indirect
|
modernc.org/mathutil v1.5.0 // indirect
|
||||||
modernc.org/memory v1.5.0 // indirect
|
modernc.org/memory v1.5.0 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
|||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
||||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||||
@@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
|
|||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||||
|
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
|
||||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
|
||||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||||
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||||
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||||
|
|||||||
108
main.go
108
main.go
@@ -39,6 +39,8 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
case err := <-processPipelines(ctx,
|
case err := <-processPipelines(ctx,
|
||||||
cfg.slackToModelPipeline,
|
cfg.slackToModelPipeline,
|
||||||
cfg.modelToPersistencePipeline,
|
cfg.modelToPersistencePipeline,
|
||||||
|
cfg.slackScrapePipeline,
|
||||||
|
cfg.persistenceToRecapPipeline,
|
||||||
):
|
):
|
||||||
return err
|
return err
|
||||||
case err := <-listenAndServe(ctx, cfg):
|
case err := <-listenAndServe(ctx, cfg):
|
||||||
@@ -88,6 +90,7 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||||
|
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
@@ -106,90 +109,45 @@ func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
|
|||||||
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !basicAuth(cfg, w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
event := r.URL.Query().Get("id")
|
||||||
|
b, _ := json.Marshal(ModelIDs{Event: event})
|
||||||
|
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{"event": event})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !basicAuth(cfg, w, r) {
|
if !basicAuth(cfg, w, r) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
channel := r.Header.Get("slack-channel")
|
|
||||||
token := r.Header.Get("slack-oauth-token")
|
|
||||||
|
|
||||||
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
|
since, err := parseSince(r.URL.Query().Get("since"))
|
||||||
|
if err != nil {
|
||||||
httpc := http.Client{Timeout: time.Second}
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
get := func(url string) ([]byte, error) {
|
return
|
||||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Header.Set("Authorization", "Bearer "+token)
|
|
||||||
req = req.WithContext(r.Context())
|
|
||||||
|
|
||||||
resp, err := httpc.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
defer io.Copy(io.Discard, resp.Body)
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
return io.ReadAll(resp.Body)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n := 0
|
job, _ := json.Marshal(SlackScrape{
|
||||||
|
Latest: time.Now().Unix(),
|
||||||
for len(urls) > 0 {
|
Oldest: since.Unix(),
|
||||||
url := urls[0]
|
ThreadTS: "",
|
||||||
urls = urls[1:]
|
Channel: r.Header.Get("slack-channel"),
|
||||||
select {
|
Token: r.Header.Get("slack-oauth-token"),
|
||||||
case <-r.Context().Done():
|
})
|
||||||
case <-time.After(time.Second):
|
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
|
||||||
}
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
body, err := get(url)
|
return
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var page struct {
|
|
||||||
Messages []json.RawMessage
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(body, &page); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
errs := []error{}
|
|
||||||
for _, messageJSON := range page.Messages {
|
|
||||||
if cfg.Debug {
|
|
||||||
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
|
||||||
}
|
|
||||||
b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
|
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
} else {
|
|
||||||
n += 1
|
|
||||||
}
|
|
||||||
if !strings.Contains(url, "ts=") {
|
|
||||||
var peek struct {
|
|
||||||
ThreadTS string `json:"thread_ts"`
|
|
||||||
}
|
|
||||||
json.Unmarshal(messageJSON, &peek)
|
|
||||||
if peek.ThreadTS != "" {
|
|
||||||
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
|
||||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
44
main_test.go
44
main_test.go
@@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
|
|||||||
return int(port)
|
return int(port)
|
||||||
}()
|
}()
|
||||||
u := fmt.Sprintf("http://localhost:%d", port)
|
u := fmt.Sprintf("http://localhost:%d", port)
|
||||||
|
var err error
|
||||||
|
|
||||||
cfg := Config{}
|
cfg := Config{}
|
||||||
cfg.DatacenterPattern = renderDatacenterPattern
|
cfg.DatacenterPattern = renderDatacenterPattern
|
||||||
@@ -44,10 +45,16 @@ func TestRun(t *testing.T) {
|
|||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver = NewTestDriver(t)
|
cfg.driver = NewTestDriver(t)
|
||||||
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
||||||
|
cfg.ai = NewAINoop()
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
||||||
|
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
|
||||||
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
||||||
|
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||||
@@ -83,4 +90,41 @@ func TestRun(t *testing.T) {
|
|||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
|
||||||
|
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
t.Fatal("timed out waiting for recap")
|
||||||
|
}
|
||||||
|
|
||||||
|
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
|
||||||
|
if thread.Recap == "" {
|
||||||
|
t.Error(thread.Recap)
|
||||||
|
}
|
||||||
|
t.Log(thread.Recap)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ type Thread struct {
|
|||||||
TS uint64
|
TS uint64
|
||||||
Channel string
|
Channel string
|
||||||
EventID string
|
EventID string
|
||||||
|
Recap string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
||||||
|
|||||||
@@ -26,15 +26,14 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer = NewNoopQueue()
|
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newModelToPersistenceProcess(cfg.storage),
|
process: newModelToPersistenceProcess(cfg, cfg.storage),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newModelToPersistenceProcess(storage Storage) processFunc {
|
func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
|
||||||
return func(ctx context.Context, models []byte) ([]byte, error) {
|
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||||
var m Models
|
var m Models
|
||||||
if err := json.Unmarshal(models, &m); err != nil {
|
if err := json.Unmarshal(models, &m); err != nil {
|
||||||
@@ -56,7 +55,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
|
|||||||
return nil, fmt.Errorf("failed to persist message: %w", err)
|
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("persisted models")
|
if cfg.Debug {
|
||||||
|
log.Printf("persisted models")
|
||||||
|
}
|
||||||
return json.Marshal(ModelIDs{
|
return json.Marshal(ModelIDs{
|
||||||
Event: m.Event.ID,
|
Event: m.Event.ID,
|
||||||
Thread: m.Thread.ID,
|
Thread: m.Thread.ID,
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
|||||||
|
|
||||||
d := NewTestDriver(t)
|
d := NewTestDriver(t)
|
||||||
s, _ := NewStorage(ctx, d)
|
s, _ := NewStorage(ctx, d)
|
||||||
process := newModelToPersistenceProcess(s)
|
process := newModelToPersistenceProcess(Config{}, s)
|
||||||
|
|
||||||
_, _ = ctx, process
|
_, _ = ctx, process
|
||||||
|
|
||||||
|
|||||||
82
queue.go
82
queue.go
@@ -20,14 +20,14 @@ func NewNoopQueue() Queue {
|
|||||||
|
|
||||||
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||||
if _, err := driver.ExecContext(ctx, `
|
if _, err := driver.ExecContext(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS queue (
|
CREATE TABLE IF NOT EXISTS queue (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
topic TEXT NOT NULL,
|
topic TEXT NOT NULL,
|
||||||
updated INTEGER NOT NULL,
|
updated INTEGER NOT NULL,
|
||||||
reservation TEXT,
|
reservation TEXT,
|
||||||
payload TEXT
|
payload TEXT
|
||||||
);
|
);
|
||||||
`); err != nil {
|
`); err != nil {
|
||||||
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
||||||
}
|
}
|
||||||
return Queue{topic: topic, driver: driver}, nil
|
return Queue{topic: topic, driver: driver}, nil
|
||||||
@@ -37,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
|||||||
if q.driver.DB == nil {
|
if q.driver.DB == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
result, err := q.driver.ExecContext(ctx, `
|
||||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
|
||||||
`,
|
`,
|
||||||
|
uuid.New().String(),
|
||||||
q.topic,
|
q.topic,
|
||||||
time.Now().Unix(),
|
time.Now().Unix(),
|
||||||
b,
|
b,
|
||||||
uuid.New().String(),
|
|
||||||
)
|
)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if n != 1 {
|
||||||
|
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||||
@@ -71,22 +79,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
reservation := []byte(uuid.New().String())
|
reservation := []byte(uuid.New().String())
|
||||||
var payload []byte
|
var payload []byte
|
||||||
if result, err := q.driver.ExecContext(ctx, `
|
if result, err := q.driver.ExecContext(ctx, `
|
||||||
UPDATE queue
|
UPDATE queue
|
||||||
SET
|
SET
|
||||||
updated = $1, reservation = $2
|
updated = $1, reservation = $2
|
||||||
WHERE
|
WHERE
|
||||||
id IN (
|
id IN (
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE
|
WHERE
|
||||||
topic = $3
|
topic = $3
|
||||||
AND (
|
AND (
|
||||||
reservation IS NULL
|
reservation IS NULL
|
||||||
OR $4 - updated > 60
|
OR $4 - updated > 60
|
||||||
)
|
)
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
`, now, reservation, q.topic, now); err != nil {
|
`, now, reservation, q.topic, now); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
||||||
} else if n, err := result.RowsAffected(); err != nil {
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||||
@@ -95,11 +103,11 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
row := q.driver.QueryRowContext(ctx, `
|
row := q.driver.QueryRowContext(ctx, `
|
||||||
SELECT payload
|
SELECT payload
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE reservation=$1
|
WHERE reservation=$1
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`, reservation)
|
`, reservation)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||||
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
||||||
@@ -118,9 +126,9 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
result, err := q.driver.ExecContext(ctx, `
|
result, err := q.driver.ExecContext(ctx, `
|
||||||
DELETE FROM queue
|
DELETE FROM queue
|
||||||
WHERE reservation=$1
|
WHERE reservation=$1
|
||||||
`, reservation)
|
`, reservation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
84
recap.go
Normal file
84
recap.go
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PersistenceToRecap struct {
|
||||||
|
pipeline Pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
writer := NewNoopQueue()
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: newPersistenceToRecapProcess(cfg),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPersistenceToRecapProcess(cfg Config) processFunc {
|
||||||
|
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
|
||||||
|
var m ModelIDs
|
||||||
|
if err := json.Unmarshal(modelIDs, &m); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non model ids payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Event == "" {
|
||||||
|
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if !event.Resolved {
|
||||||
|
} else if err := func() error {
|
||||||
|
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, thread := range threads {
|
||||||
|
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if len(messages) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
prompt := []string{
|
||||||
|
cfg.RecapPromptIntro,
|
||||||
|
"---",
|
||||||
|
messages[0].Plaintext,
|
||||||
|
"---",
|
||||||
|
cfg.RecapPrompt,
|
||||||
|
"---",
|
||||||
|
}
|
||||||
|
for _, message := range messages[1:] {
|
||||||
|
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
|
||||||
|
}
|
||||||
|
|
||||||
|
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
thread.Recap = recap
|
||||||
|
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Println("recapped", thread.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("persisted recap")
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
50
recap_test.go
Normal file
50
recap_test.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewPersistenceToRecapProcess(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
d := NewTestDriver(t)
|
||||||
|
s, _ := NewStorage(ctx, d)
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
driver: d,
|
||||||
|
storage: s,
|
||||||
|
ai: NewAINoop(),
|
||||||
|
Debug: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
proc := newPersistenceToRecapProcess(cfg)
|
||||||
|
|
||||||
|
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, _ := json.Marshal(ModelIDs{Event: "Event"})
|
||||||
|
if _, err := proc(ctx, b); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if thread.Recap == "" {
|
||||||
|
t.Error("no recap:", thread.Recap)
|
||||||
|
} else {
|
||||||
|
t.Logf("%+v", thread)
|
||||||
|
}
|
||||||
|
}
|
||||||
4
slack.go
4
slack.go
@@ -77,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("parsed slack message into models")
|
if cfg.Debug {
|
||||||
|
log.Printf("parsed slack message into models")
|
||||||
|
}
|
||||||
return json.Marshal(Models{
|
return json.Marshal(Models{
|
||||||
Event: event,
|
Event: event,
|
||||||
Message: message,
|
Message: message,
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
|
|||||||
"",
|
"",
|
||||||
"",
|
"",
|
||||||
"Datastores Non-Critical",
|
"Datastores Non-Critical",
|
||||||
false,
|
true,
|
||||||
),
|
),
|
||||||
Message: model.NewMessage(
|
Message: model.NewMessage(
|
||||||
"1712927439.728409/1712927439",
|
"1712927439.728409/1712927439",
|
||||||
@@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Name: "Opsgenie for Alert Management",
|
Name: "Opsgenie for Alert Management",
|
||||||
},
|
},
|
||||||
Attachments: []slackAttachment{{
|
Attachments: []slackAttachment{{
|
||||||
Color: "F4511E",
|
Color: "2ecc71",
|
||||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Fields: []slackField{
|
Fields: []slackField{
|
||||||
@@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
Author: "Opsgenie for Alert Management",
|
Author: "Opsgenie for Alert Management",
|
||||||
Team: "Datastores Non-Critical",
|
Team: "Datastores Non-Critical",
|
||||||
|
Resolved: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"opsgenie_alert_resolved.json": {
|
"opsgenie_alert_resolved.json": {
|
||||||
|
|||||||
149
slackscrape.go
Normal file
149
slackscrape.go
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlackScrape struct {
|
||||||
|
Latest int64
|
||||||
|
Oldest int64
|
||||||
|
ThreadTS string
|
||||||
|
Channel string
|
||||||
|
Token string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: cfg.slackScrapePipeline.reader,
|
||||||
|
process: newSlackScrapeProcess(cfg),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSlackScrapeProcess(cfg Config) processFunc {
|
||||||
|
limiter := rate.NewLimiter(0.5, 1)
|
||||||
|
return func(ctx context.Context, jobb []byte) ([]byte, error) {
|
||||||
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var job SlackScrape
|
||||||
|
if err := json.Unmarshal(jobb, &job); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: "slack.com",
|
||||||
|
Path: "/api/conversations.history",
|
||||||
|
}
|
||||||
|
q := url.Values{}
|
||||||
|
q.Set("channel", job.Channel)
|
||||||
|
q.Set("latest", strconv.FormatInt(job.Latest, 10))
|
||||||
|
q.Set("limit", "999")
|
||||||
|
q.Set("inclusive", "true")
|
||||||
|
if job.ThreadTS != "" {
|
||||||
|
u.Path = "/api/conversations.replies"
|
||||||
|
q.Set("ts", job.ThreadTS)
|
||||||
|
}
|
||||||
|
if job.Oldest != 0 {
|
||||||
|
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
|
||||||
|
}
|
||||||
|
u.RawQuery = q.Encode()
|
||||||
|
url := u.String()
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Authorization", "Bearer "+job.Token)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
httpc := http.Client{Timeout: time.Second}
|
||||||
|
resp, err := httpc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
defer io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var page struct {
|
||||||
|
Messages []json.RawMessage
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &page); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newLatest := float64(job.Latest)
|
||||||
|
for _, messageJSON := range page.Messages {
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var peekTS struct {
|
||||||
|
TS float64 `json:"ts,string"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
|
||||||
|
newLatest = peekTS.TS
|
||||||
|
}
|
||||||
|
if job.ThreadTS == "" {
|
||||||
|
var peek struct {
|
||||||
|
ThreadTS string `json:"thread_ts"`
|
||||||
|
}
|
||||||
|
json.Unmarshal(messageJSON, &peek)
|
||||||
|
if peek.ThreadTS != "" {
|
||||||
|
clone := job
|
||||||
|
clone.ThreadTS = peek.ThreadTS
|
||||||
|
clone.Oldest = 0
|
||||||
|
b, _ := json.Marshal(clone)
|
||||||
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(page.Messages) == 999 {
|
||||||
|
clone := job
|
||||||
|
clone.Latest = int64(newLatest)
|
||||||
|
b, _ := json.Marshal(clone)
|
||||||
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("scraped %v from %s", len(page.Messages), url)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
102
storage.go
102
storage.go
@@ -67,10 +67,112 @@ func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error)
|
|||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetEventThreads(ctx context.Context, ID string) ([]model.Thread, error) {
|
||||||
|
return s.selectThreadsWhere(ctx, "EventID = $1", ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetThreadMessages(ctx context.Context, ID string) ([]model.Message, error) {
|
||||||
|
return s.selectMessagesWhere(ctx, "ThreadID = $1", ID)
|
||||||
|
}
|
||||||
|
|
||||||
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||||
return s.upsert(ctx, "threads", thread)
|
return s.upsert(ctx, "threads", thread)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) selectThreadsWhere(ctx context.Context, clause string, args ...any) ([]model.Thread, error) {
|
||||||
|
keys, _, _, _, err := keysArgsKeyargsValues(model.Thread{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args2 := make([]any, len(args))
|
||||||
|
for i := range args {
|
||||||
|
args2[i], _ = json.Marshal(args[i])
|
||||||
|
}
|
||||||
|
scanTargets := make([]any, len(keys))
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
SELECT %s FROM threads WHERE %s
|
||||||
|
ORDER BY TS ASC
|
||||||
|
`, strings.Join(keys, ", "), clause)
|
||||||
|
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []model.Thread
|
||||||
|
for rows.Next() {
|
||||||
|
for i := range scanTargets {
|
||||||
|
scanTargets[i] = &[]byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanTargets...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := map[string]json.RawMessage{}
|
||||||
|
for i, k := range keys {
|
||||||
|
m[k] = *scanTargets[i].(*[]byte)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
|
||||||
|
var one model.Thread
|
||||||
|
if err := json.Unmarshal(b, &one); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, one)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) selectMessagesWhere(ctx context.Context, clause string, args ...any) ([]model.Message, error) {
|
||||||
|
keys, _, _, _, err := keysArgsKeyargsValues(model.Message{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args2 := make([]any, len(args))
|
||||||
|
for i := range args {
|
||||||
|
args2[i], _ = json.Marshal(args[i])
|
||||||
|
}
|
||||||
|
scanTargets := make([]any, len(keys))
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
SELECT %s FROM messages WHERE %s
|
||||||
|
ORDER BY TS ASC
|
||||||
|
`, strings.Join(keys, ", "), clause)
|
||||||
|
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []model.Message
|
||||||
|
for rows.Next() {
|
||||||
|
for i := range scanTargets {
|
||||||
|
scanTargets[i] = &[]byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanTargets...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := map[string]json.RawMessage{}
|
||||||
|
for i, k := range keys {
|
||||||
|
m[k] = *scanTargets[i].(*[]byte)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
|
||||||
|
var one model.Message
|
||||||
|
if err := json.Unmarshal(b, &one); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, one)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||||
if questions := strings.Count(clause, "$"); questions != len(args) {
|
if questions := strings.Count(clause, "$"); questions != len(args) {
|
||||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -85,4 +87,64 @@ func TestStorage(t *testing.T) {
|
|||||||
t.Fatal("unexpected result from get:", got)
|
t.Fatal("unexpected result from get:", got)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("get thread messages", func(t *testing.T) {
|
||||||
|
thread := fmt.Sprintf("thread-%d", rand.Int())
|
||||||
|
m := model.NewMessage(
|
||||||
|
"ID",
|
||||||
|
1,
|
||||||
|
"Author",
|
||||||
|
"Plaintext",
|
||||||
|
thread,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertMessage(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if m2, err := s.GetMessage(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on upsert-get:", err)
|
||||||
|
} else if m2 != m {
|
||||||
|
t.Errorf("expected %+v but got %+v", m, m2)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := s.GetThreadMessages(ctx, thread)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(msgs) != 1 {
|
||||||
|
t.Fatal(msgs)
|
||||||
|
} else if msgs[0].ThreadID != m.ThreadID {
|
||||||
|
t.Fatal(msgs[0].ThreadID)
|
||||||
|
} else if msgs[0] != m {
|
||||||
|
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("get event threads", func(t *testing.T) {
|
||||||
|
event := fmt.Sprintf("event-%d", rand.Int())
|
||||||
|
m := model.NewThread(
|
||||||
|
"ID",
|
||||||
|
"URL",
|
||||||
|
1,
|
||||||
|
"Channel",
|
||||||
|
event,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertThread(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if m2, err := s.GetThread(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on upsert-get:", err)
|
||||||
|
} else if m2 != m {
|
||||||
|
t.Errorf("expected %+v but got %+v", m, m2)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := s.GetEventThreads(ctx, event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(msgs) != 1 {
|
||||||
|
t.Fatal(msgs)
|
||||||
|
} else if msgs[0].EventID != m.EventID {
|
||||||
|
t.Fatal(msgs[0].EventID)
|
||||||
|
} else if msgs[0] != m {
|
||||||
|
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
3
testdata/slack_events/opsgenie_alert.json
vendored
3
testdata/slack_events/opsgenie_alert.json
vendored
@@ -29,7 +29,8 @@
|
|||||||
"attachments": [
|
"attachments": [
|
||||||
{
|
{
|
||||||
"id": 1,
|
"id": 1,
|
||||||
"color": "F4511E",
|
"realcolor": "F4511E",
|
||||||
|
"color": "2ecc71",
|
||||||
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
|||||||
Reference in New Issue
Block a user