Compare commits
32 Commits
d9d91193dd
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31fe916000 | ||
|
|
d31042e971 | ||
|
|
2e7d58fd13 | ||
|
|
93672e67a6 | ||
|
|
1b06f727fd | ||
|
|
3ae62390cf | ||
|
|
5cfb89bc64 | ||
|
|
b554be6282 | ||
|
|
5785ea37ae | ||
|
|
f27d416a5a | ||
|
|
81793876f8 | ||
|
|
6d81164161 | ||
|
|
20256bd6b4 | ||
|
|
e5e98e2890 | ||
|
|
4fb26ec775 | ||
|
|
782b9ec3cf | ||
|
|
9d7f69bd8a | ||
|
|
12de99da57 | ||
|
|
81fe8070ca | ||
|
|
79de56e236 | ||
|
|
f485b5ea88 | ||
|
|
894536d209 | ||
|
|
f8861a73b5 | ||
|
|
14de286415 | ||
|
|
8557ddc522 | ||
|
|
1e43c2a14e | ||
|
|
04c574ffec | ||
|
|
b2f64037e2 | ||
|
|
fbd151f9ef | ||
|
|
5f21098fdc | ||
|
|
7c2d663401 | ||
|
|
95b0394199 |
@@ -4,14 +4,9 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
|
|||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- dedupe critical+noncritical
|
|
||||||
- what SLO/SLI can I help benoit with
|
- what SLO/SLI can I help benoit with
|
||||||
- break into smaller goals
|
|
||||||
- sell to the team
|
|
||||||
- scott; like to keep state in incident.io and zendesk
|
- scott; like to keep state in incident.io and zendesk
|
||||||
- @spoc -ignore, @spoc -s summary
|
- @spoc -ignore, @spoc -s summary
|
||||||
- limit rps from rpc/slackscrape
|
|
||||||
- rpc/slackscrape to async
|
|
||||||
- limit queue retries
|
- limit queue retries
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
139
ai.go
139
ai.go
@@ -1,13 +1,10 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"net/http"
|
||||||
"strings"
|
"time"
|
||||||
|
|
||||||
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
|
|
||||||
"github.com/nikolaydubina/llama2.go/llama2"
|
|
||||||
"github.com/tmc/langchaingo/llms"
|
"github.com/tmc/langchaingo/llms"
|
||||||
"github.com/tmc/langchaingo/llms/ollama"
|
"github.com/tmc/langchaingo/llms/ollama"
|
||||||
)
|
)
|
||||||
@@ -37,133 +34,23 @@ func NewAIOllama(url, model string) AIOllama {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
|
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
|
||||||
|
c := &http.Client{
|
||||||
|
Timeout: time.Hour,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
//DisableKeepAlives: true,
|
||||||
|
IdleConnTimeout: time.Hour,
|
||||||
|
ResponseHeaderTimeout: time.Hour,
|
||||||
|
ExpectContinueTimeout: time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
defer c.CloseIdleConnections()
|
||||||
llm, err := ollama.New(
|
llm, err := ollama.New(
|
||||||
ollama.WithModel(ai.model),
|
ollama.WithModel(ai.model),
|
||||||
ollama.WithServerURL(ai.url),
|
ollama.WithServerURL(ai.url),
|
||||||
|
ollama.WithHTTPClient(c),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
|
return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
|
||||||
}
|
}
|
||||||
|
|
||||||
type AILocal struct {
|
|
||||||
checkpointPath string
|
|
||||||
tokenizerPath string
|
|
||||||
temperature float64
|
|
||||||
steps int
|
|
||||||
topp float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAILocal(
|
|
||||||
checkpointPath string,
|
|
||||||
tokenizerPath string,
|
|
||||||
temperature float64,
|
|
||||||
steps int,
|
|
||||||
topp float64,
|
|
||||||
) AILocal {
|
|
||||||
return AILocal{
|
|
||||||
checkpointPath: checkpointPath,
|
|
||||||
tokenizerPath: tokenizerPath,
|
|
||||||
temperature: temperature,
|
|
||||||
steps: steps,
|
|
||||||
topp: topp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://github.com/nikolaydubina/llama2.go/blob/master/main.go
|
|
||||||
func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
|
|
||||||
checkpointFile, err := os.OpenFile(ai.checkpointPath, os.O_RDONLY, 0)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer checkpointFile.Close()
|
|
||||||
|
|
||||||
config, err := llama2.NewConfigFromCheckpoint(checkpointFile)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
isSharedWeights := config.VocabSize > 0
|
|
||||||
if config.VocabSize < 0 {
|
|
||||||
config.VocabSize = -config.VocabSize
|
|
||||||
}
|
|
||||||
|
|
||||||
tokenizerFile, err := os.OpenFile(ai.tokenizerPath, os.O_RDONLY, 0)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer tokenizerFile.Close()
|
|
||||||
|
|
||||||
vocab := llama2.NewVocabFromFile(config.VocabSize, tokenizerFile)
|
|
||||||
|
|
||||||
w := llama2.NewTransformerWeightsFromCheckpoint(config, checkpointFile, isSharedWeights)
|
|
||||||
|
|
||||||
// right now we cannot run for more than config.SeqLen steps
|
|
||||||
steps := ai.steps
|
|
||||||
if steps <= 0 || steps > config.SeqLen {
|
|
||||||
steps = config.SeqLen
|
|
||||||
}
|
|
||||||
|
|
||||||
runState := llama2.NewRunState(config)
|
|
||||||
|
|
||||||
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
|
|
||||||
|
|
||||||
out := bytes.NewBuffer(nil)
|
|
||||||
|
|
||||||
// the current position we are in
|
|
||||||
var token int = 1 // 1 = BOS token in llama-2 sentencepiece
|
|
||||||
var pos = 0
|
|
||||||
|
|
||||||
for pos < steps {
|
|
||||||
// forward the transformer to get logits for the next token
|
|
||||||
llama2.Transformer(token, pos, config, runState, w)
|
|
||||||
|
|
||||||
var next int
|
|
||||||
if pos < len(promptTokens) {
|
|
||||||
next = promptTokens[pos]
|
|
||||||
} else {
|
|
||||||
// sample the next token
|
|
||||||
if ai.temperature == 0 {
|
|
||||||
// greedy argmax sampling
|
|
||||||
next = nn.ArgMax(runState.Logits)
|
|
||||||
} else {
|
|
||||||
// apply the temperature to the logits
|
|
||||||
for q := 0; q < config.VocabSize; q++ {
|
|
||||||
runState.Logits[q] /= float32(ai.temperature)
|
|
||||||
}
|
|
||||||
// apply softmax to the logits to the probabilities for next token
|
|
||||||
nn.SoftMax(runState.Logits)
|
|
||||||
// we now want to sample from this distribution to get the next token
|
|
||||||
if ai.topp <= 0 || ai.topp >= 1 {
|
|
||||||
// simply sample from the predicted probability distribution
|
|
||||||
next = nn.Sample(runState.Logits)
|
|
||||||
} else {
|
|
||||||
// top-p (nucleus) sampling, clamping the least likely tokens to zero
|
|
||||||
next = nn.SampleTopP(runState.Logits, float32(ai.topp))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pos++
|
|
||||||
|
|
||||||
// data-dependent terminating condition: the BOS (1) token delimits sequences
|
|
||||||
if next == 1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// following BOS (1) token, sentencepiece decoder strips any leading whitespace
|
|
||||||
var tokenStr string
|
|
||||||
if token == 1 && vocab.Words[next][0] == ' ' {
|
|
||||||
tokenStr = vocab.Words[next][1:]
|
|
||||||
} else {
|
|
||||||
tokenStr = vocab.Words[next]
|
|
||||||
}
|
|
||||||
out.Write([]byte(tokenStr))
|
|
||||||
|
|
||||||
// advance forward
|
|
||||||
token = next
|
|
||||||
}
|
|
||||||
out.Write([]byte("\n"))
|
|
||||||
|
|
||||||
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
|
|
||||||
}
|
|
||||||
|
|||||||
58
ai_test.go
58
ai_test.go
@@ -4,11 +4,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -22,53 +17,7 @@ func TestAINoop(t *testing.T) {
|
|||||||
|
|
||||||
func TestAIOllama(t *testing.T) {
|
func TestAIOllama(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
ai := NewAIOllama("http://localhost:11434", "llama3")
|
||||||
|
|
||||||
testAI(t, ai)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAILocal(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
d := os.TempDir()
|
|
||||||
checkpoints := "checkpoints"
|
|
||||||
tokenizer := "tokenizer"
|
|
||||||
for u, p := range map[string]*string{
|
|
||||||
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
|
|
||||||
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
|
|
||||||
} {
|
|
||||||
func() {
|
|
||||||
*p = path.Base(u)
|
|
||||||
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
|
|
||||||
t.Logf("downloading %s from %s", u, *p)
|
|
||||||
|
|
||||||
resp, err := http.Get(u)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
f, err := os.Create(path.Join(d, *p))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
if _, err := io.Copy(f, resp.Body); err != nil {
|
|
||||||
f.Close()
|
|
||||||
os.Remove(path.Join(d, *p))
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
ai := NewAILocal(
|
|
||||||
path.Join(d, checkpoints),
|
|
||||||
path.Join(d, tokenizer),
|
|
||||||
0.9,
|
|
||||||
256,
|
|
||||||
0.9,
|
|
||||||
)
|
|
||||||
|
|
||||||
testAI(t, ai)
|
testAI(t, ai)
|
||||||
}
|
}
|
||||||
@@ -78,7 +27,7 @@ func testAI(t *testing.T, ai AI) {
|
|||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
t.Run("mvp", func(t *testing.T) {
|
t.Run("mvp", func(t *testing.T) {
|
||||||
if result, err := ai.Do(ctx, "hello world"); err != nil {
|
if result, err := ai.Do(ctx, "Tell me a fun fact."); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if len(result) < 3 {
|
} else if len(result) < 3 {
|
||||||
t.Error(result)
|
t.Error(result)
|
||||||
@@ -87,6 +36,7 @@ func testAI(t *testing.T, ai AI) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
/*
|
||||||
t.Run("simulation", func(t *testing.T) {
|
t.Run("simulation", func(t *testing.T) {
|
||||||
d := NewRAM()
|
d := NewRAM()
|
||||||
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
||||||
@@ -114,5 +64,5 @@ func testAI(t *testing.T, ai AI) {
|
|||||||
}
|
}
|
||||||
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
t.Logf("\n\t%s\n->\n\t%s", input, result)
|
||||||
})
|
})
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|||||||
32
config.go
32
config.go
@@ -23,10 +23,10 @@ type Config struct {
|
|||||||
BasicAuthUser string
|
BasicAuthUser string
|
||||||
BasicAuthPassword string
|
BasicAuthPassword string
|
||||||
FillWithTestdata bool
|
FillWithTestdata bool
|
||||||
OllamaURL string
|
OllamaUrl string
|
||||||
OllamaModel string
|
OllamaModel string
|
||||||
LocalCheckpoint string
|
RecapPromptIntro string
|
||||||
LocalTokenizer string
|
RecapPrompt string
|
||||||
AssetPattern string
|
AssetPattern string
|
||||||
DatacenterPattern string
|
DatacenterPattern string
|
||||||
EventNamePattern string
|
EventNamePattern string
|
||||||
@@ -34,11 +34,13 @@ type Config struct {
|
|||||||
storage Storage
|
storage Storage
|
||||||
ai AI
|
ai AI
|
||||||
slackToModelPipeline Pipeline
|
slackToModelPipeline Pipeline
|
||||||
|
slackScrapePipeline Pipeline
|
||||||
modelToPersistencePipeline Pipeline
|
modelToPersistencePipeline Pipeline
|
||||||
|
persistenceToRecapPipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
|
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]|ip-[0-9]+-[0-9]+-[0-9]+-[0-9]+\.[a-z]+-[a-z]+-[0-9]+\.compute\.internal`
|
||||||
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
||||||
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
|
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
|
||||||
)
|
)
|
||||||
@@ -50,10 +52,12 @@ func newConfig(ctx context.Context) (Config, error) {
|
|||||||
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
|
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
|
||||||
def := Config{
|
def := Config{
|
||||||
Port: 38080,
|
Port: 38080,
|
||||||
OllamaModel: "gemma:2b",
|
OllamaModel: "llama3",
|
||||||
AssetPattern: renderAssetPattern,
|
AssetPattern: renderAssetPattern,
|
||||||
DatacenterPattern: renderDatacenterPattern,
|
DatacenterPattern: renderDatacenterPattern,
|
||||||
EventNamePattern: renderEventNamePattern,
|
EventNamePattern: renderEventNamePattern,
|
||||||
|
RecapPromptIntro: "A Slack thread began with the following original post.",
|
||||||
|
RecapPrompt: "What is the summary of the responses to the Slack thread consisting of the following messages? Limit the summary to one sentence. Do not include any leading text. Be as brief as possible. No context is needed.",
|
||||||
}
|
}
|
||||||
|
|
||||||
var m map[string]any
|
var m map[string]any
|
||||||
@@ -130,10 +134,8 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
}
|
}
|
||||||
result.storage = storage
|
result.storage = storage
|
||||||
|
|
||||||
if result.OllamaURL != "" {
|
if result.OllamaUrl != "" {
|
||||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
result.ai = NewAIOllama(result.OllamaUrl, result.OllamaModel)
|
||||||
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
|
||||||
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
|
|
||||||
} else {
|
} else {
|
||||||
result.ai = NewAINoop()
|
result.ai = NewAINoop()
|
||||||
}
|
}
|
||||||
@@ -150,5 +152,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
}
|
}
|
||||||
result.modelToPersistencePipeline = modelToPersistencePipeline
|
result.modelToPersistencePipeline = modelToPersistencePipeline
|
||||||
|
|
||||||
|
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.slackScrapePipeline = slackScrapePipeline
|
||||||
|
|
||||||
|
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.persistenceToRecapPipeline = persistenceToRecapPipeline
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@@ -35,7 +36,12 @@ func NewTestDriver(t *testing.T, optionalP ...string) Driver {
|
|||||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||||
engine := "sqlite"
|
engine := "sqlite"
|
||||||
if conn == "" {
|
if conn == "" {
|
||||||
conn = ":memory:"
|
f, err := os.CreateTemp(os.TempDir(), "spoc-bot-vr-undef-*.db")
|
||||||
|
if err != nil {
|
||||||
|
return Driver{}, err
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
conn = f.Name()
|
||||||
} else {
|
} else {
|
||||||
if u, err := url.Parse(conn); err != nil {
|
if u, err := url.Parse(conn); err != nil {
|
||||||
return Driver{}, err
|
return Driver{}, err
|
||||||
|
|||||||
5
go.mod
5
go.mod
@@ -6,9 +6,9 @@ require (
|
|||||||
github.com/glebarez/go-sqlite v1.21.2
|
github.com/glebarez/go-sqlite v1.21.2
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/lib/pq v1.10.9
|
github.com/lib/pq v1.10.9
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1
|
|
||||||
github.com/tmc/langchaingo v0.1.8
|
github.com/tmc/langchaingo v0.1.8
|
||||||
gotest.tools/v3 v3.5.1
|
golang.org/x/time v0.5.0
|
||||||
|
gotest.tools v2.2.0+incompatible
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@@ -20,7 +20,6 @@ require (
|
|||||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
golang.org/x/sys v0.16.0 // indirect
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
gotest.tools v2.2.0+incompatible // indirect
|
|
||||||
modernc.org/libc v1.22.5 // indirect
|
modernc.org/libc v1.22.5 // indirect
|
||||||
modernc.org/mathutil v1.5.0 // indirect
|
modernc.org/mathutil v1.5.0 // indirect
|
||||||
modernc.org/memory v1.5.0 // indirect
|
modernc.org/memory v1.5.0 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
|||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
||||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||||
@@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
|
|||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||||
|
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
|
||||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
|
||||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||||
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||||
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||||
|
|||||||
139
main.go
139
main.go
@@ -39,6 +39,8 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
case err := <-processPipelines(ctx,
|
case err := <-processPipelines(ctx,
|
||||||
cfg.slackToModelPipeline,
|
cfg.slackToModelPipeline,
|
||||||
cfg.modelToPersistencePipeline,
|
cfg.modelToPersistencePipeline,
|
||||||
|
cfg.slackScrapePipeline,
|
||||||
|
cfg.persistenceToRecapPipeline,
|
||||||
):
|
):
|
||||||
return err
|
return err
|
||||||
case err := <-listenAndServe(ctx, cfg):
|
case err := <-listenAndServe(ctx, cfg):
|
||||||
@@ -85,8 +87,10 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|||||||
func newHandler(cfg Config) http.HandlerFunc {
|
func newHandler(cfg Config) http.HandlerFunc {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
|
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||||
|
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
@@ -99,89 +103,51 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var Version = "undef"
|
||||||
|
|
||||||
|
func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !basicAuth(cfg, w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
event := r.URL.Query().Get("id")
|
||||||
|
b, _ := json.Marshal(ModelIDs{Event: event})
|
||||||
|
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{"event": event})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !basicAuth(cfg, w, r) {
|
if !basicAuth(cfg, w, r) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
channel := r.Header.Get("slack-channel")
|
|
||||||
token := r.Header.Get("slack-oauth-token")
|
|
||||||
|
|
||||||
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
|
since, err := parseSince(r.URL.Query().Get("since"))
|
||||||
|
|
||||||
httpc := http.Client{Timeout: time.Second}
|
|
||||||
get := func(url string) ([]byte, error) {
|
|
||||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
}
|
|
||||||
req.Header.Set("Authorization", "Bearer "+token)
|
|
||||||
req = req.WithContext(r.Context())
|
|
||||||
|
|
||||||
resp, err := httpc.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
defer io.Copy(io.Discard, resp.Body)
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
return io.ReadAll(resp.Body)
|
|
||||||
}
|
|
||||||
|
|
||||||
n := 0
|
|
||||||
|
|
||||||
for len(urls) > 0 {
|
|
||||||
url := urls[0]
|
|
||||||
urls = urls[1:]
|
|
||||||
select {
|
|
||||||
case <-r.Context().Done():
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
}
|
|
||||||
body, err := get(url)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var page struct {
|
job, _ := json.Marshal(SlackScrape{
|
||||||
Messages []json.RawMessage
|
Latest: time.Now().Unix(),
|
||||||
}
|
Oldest: since.Unix(),
|
||||||
if err := json.Unmarshal(body, &page); err != nil {
|
ThreadTS: "",
|
||||||
|
Channel: r.Header.Get("slack-channel"),
|
||||||
|
Token: r.Header.Get("slack-oauth-token"),
|
||||||
|
})
|
||||||
|
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
errs := []error{}
|
|
||||||
for _, messageJSON := range page.Messages {
|
|
||||||
if cfg.Debug {
|
|
||||||
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
|
||||||
}
|
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
} else {
|
|
||||||
n += 1
|
|
||||||
}
|
|
||||||
if !strings.Contains(url, "ts=") {
|
|
||||||
var peek struct {
|
|
||||||
ThreadTS string `json:"thread_ts"`
|
|
||||||
}
|
|
||||||
json.Unmarshal(messageJSON, &peek)
|
|
||||||
if peek.ThreadTS != "" {
|
|
||||||
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
|
||||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -195,12 +161,13 @@ func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
|
|||||||
|
|
||||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||||
if cfg.InitializeSlack {
|
if cfg.InitializeSlack {
|
||||||
return handlerPostAPIV1EventsSlackInitialize
|
return handlerPostAPIV1EventsSlackInitialize(cfg)
|
||||||
}
|
}
|
||||||
return _newHandlerPostAPIV1EventsSlack(cfg)
|
return _newHandlerPostAPIV1EventsSlack(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
|
func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
b, _ := io.ReadAll(r.Body)
|
b, _ := io.ReadAll(r.Body)
|
||||||
var challenge struct {
|
var challenge struct {
|
||||||
Token string
|
Token string
|
||||||
@@ -211,14 +178,32 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques
|
|||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cfg.driver.ExecContext(r.Context(), `
|
||||||
|
CREATE TABLE
|
||||||
|
IF NOT EXISTS
|
||||||
|
initialization (
|
||||||
|
label TEXT,
|
||||||
|
token TEXT,
|
||||||
|
updated TIMESTAMP
|
||||||
|
)
|
||||||
|
`)
|
||||||
|
if _, err := cfg.driver.ExecContext(r.Context(), `
|
||||||
|
INSERT
|
||||||
|
INTO initialization (label, token, updated)
|
||||||
|
VALUES ('slack_events_webhook_token', $1, $2)
|
||||||
|
`, challenge.Token, time.Now().UTC()); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Println("stashed new slack initialization token", challenge.Token)
|
||||||
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
b, _ := io.ReadAll(r.Body)
|
body, _ := io.ReadAll(r.Body)
|
||||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
||||||
|
|
||||||
var allowList struct {
|
var allowList struct {
|
||||||
Token string
|
Token string
|
||||||
@@ -226,7 +211,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
Channel string
|
Channel string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(b, &allowList); err != nil {
|
if err := json.Unmarshal(body, &allowList); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
} else if allowList.Token != cfg.SlackToken {
|
} else if allowList.Token != cfg.SlackToken {
|
||||||
@@ -243,7 +228,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil {
|
||||||
log.Printf("failed to ingest: %v", err)
|
log.Printf("failed to ingest: %v", err)
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|||||||
44
main_test.go
44
main_test.go
@@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
|
|||||||
return int(port)
|
return int(port)
|
||||||
}()
|
}()
|
||||||
u := fmt.Sprintf("http://localhost:%d", port)
|
u := fmt.Sprintf("http://localhost:%d", port)
|
||||||
|
var err error
|
||||||
|
|
||||||
cfg := Config{}
|
cfg := Config{}
|
||||||
cfg.DatacenterPattern = renderDatacenterPattern
|
cfg.DatacenterPattern = renderDatacenterPattern
|
||||||
@@ -44,10 +45,16 @@ func TestRun(t *testing.T) {
|
|||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver = NewTestDriver(t)
|
cfg.driver = NewTestDriver(t)
|
||||||
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
||||||
|
cfg.ai = NewAINoop()
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
||||||
|
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
|
||||||
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
||||||
|
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||||
@@ -83,4 +90,41 @@ func TestRun(t *testing.T) {
|
|||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
|
||||||
|
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
t.Fatal("timed out waiting for recap")
|
||||||
|
}
|
||||||
|
|
||||||
|
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
|
||||||
|
if thread.Recap == "" {
|
||||||
|
t.Error(thread.Recap)
|
||||||
|
}
|
||||||
|
t.Log(thread.Recap)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ type Thread struct {
|
|||||||
TS uint64
|
TS uint64
|
||||||
Channel string
|
Channel string
|
||||||
EventID string
|
EventID string
|
||||||
|
Recap string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
||||||
|
|||||||
@@ -26,15 +26,14 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer = NewNoopQueue()
|
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newModelToPersistenceProcess(cfg.storage),
|
process: newModelToPersistenceProcess(cfg, cfg.storage),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newModelToPersistenceProcess(storage Storage) processFunc {
|
func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
|
||||||
return func(ctx context.Context, models []byte) ([]byte, error) {
|
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||||
var m Models
|
var m Models
|
||||||
if err := json.Unmarshal(models, &m); err != nil {
|
if err := json.Unmarshal(models, &m); err != nil {
|
||||||
@@ -56,7 +55,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
|
|||||||
return nil, fmt.Errorf("failed to persist message: %w", err)
|
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.Debug {
|
||||||
log.Printf("persisted models")
|
log.Printf("persisted models")
|
||||||
|
}
|
||||||
return json.Marshal(ModelIDs{
|
return json.Marshal(ModelIDs{
|
||||||
Event: m.Event.ID,
|
Event: m.Event.ID,
|
||||||
Thread: m.Thread.ID,
|
Thread: m.Thread.ID,
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
|||||||
|
|
||||||
d := NewTestDriver(t)
|
d := NewTestDriver(t)
|
||||||
s, _ := NewStorage(ctx, d)
|
s, _ := NewStorage(ctx, d)
|
||||||
process := newModelToPersistenceProcess(s)
|
process := newModelToPersistenceProcess(Config{}, s)
|
||||||
|
|
||||||
_, _ = ctx, process
|
_, _ = ctx, process
|
||||||
|
|
||||||
|
|||||||
19
queue.go
19
queue.go
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -36,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
|||||||
if q.driver.DB == nil {
|
if q.driver.DB == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
result, err := q.driver.ExecContext(ctx, `
|
||||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
|
||||||
`,
|
`,
|
||||||
|
uuid.New().String(),
|
||||||
q.topic,
|
q.topic,
|
||||||
time.Now().Unix(),
|
time.Now().Unix(),
|
||||||
b,
|
b,
|
||||||
uuid.New().String(),
|
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if n != 1 {
|
||||||
|
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||||
@@ -81,7 +90,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
topic = $3
|
topic = $3
|
||||||
AND (
|
AND (
|
||||||
reservation IS NULL
|
reservation IS NULL
|
||||||
OR $4 - updated > 60
|
OR $4 - updated > 600
|
||||||
)
|
)
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
@@ -101,7 +110,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
`, reservation)
|
`, reservation)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||||
} else if err := row.Scan(&payload); err != nil {
|
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
||||||
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
87
recap.go
Normal file
87
recap.go
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PersistenceToRecap struct {
|
||||||
|
pipeline Pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
writer := NewNoopQueue()
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: newPersistenceToRecapProcess(cfg),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPersistenceToRecapProcess(cfg Config) processFunc {
|
||||||
|
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
|
||||||
|
var m ModelIDs
|
||||||
|
if err := json.Unmarshal(modelIDs, &m); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non model ids payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Event == "" {
|
||||||
|
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if !event.Resolved {
|
||||||
|
} else if err := func() error {
|
||||||
|
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, thread := range threads {
|
||||||
|
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if len(messages) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
prompt := []string{
|
||||||
|
cfg.RecapPromptIntro,
|
||||||
|
"---",
|
||||||
|
messages[0].Plaintext,
|
||||||
|
"---",
|
||||||
|
cfg.RecapPrompt,
|
||||||
|
"---",
|
||||||
|
}
|
||||||
|
for _, message := range messages[1:] {
|
||||||
|
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
|
||||||
|
}
|
||||||
|
|
||||||
|
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
thread.Recap = recap
|
||||||
|
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Println("recapped", thread.ID)
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("Recapped %q as %q from %q/%q and %+v", thread.ID, thread.Recap, cfg.RecapPromptIntro, cfg.RecapPrompt, messages)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("persisted recap")
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
50
recap_test.go
Normal file
50
recap_test.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewPersistenceToRecapProcess(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
d := NewTestDriver(t)
|
||||||
|
s, _ := NewStorage(ctx, d)
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
driver: d,
|
||||||
|
storage: s,
|
||||||
|
ai: NewAINoop(),
|
||||||
|
Debug: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
proc := newPersistenceToRecapProcess(cfg)
|
||||||
|
|
||||||
|
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, _ := json.Marshal(ModelIDs{Event: "Event"})
|
||||||
|
if _, err := proc(ctx, b); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if thread.Recap == "" {
|
||||||
|
t.Error("no recap:", thread.Recap)
|
||||||
|
} else {
|
||||||
|
t.Logf("%+v", thread)
|
||||||
|
}
|
||||||
|
}
|
||||||
35
slack.go
35
slack.go
@@ -61,14 +61,7 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
cfg.DatacenterPattern: &s.Datacenter,
|
cfg.DatacenterPattern: &s.Datacenter,
|
||||||
cfg.EventNamePattern: &s.EventName,
|
cfg.EventNamePattern: &s.EventName,
|
||||||
} {
|
} {
|
||||||
r := regexp.MustCompile(pattern)
|
*ptr = withPattern(pattern, *ptr)
|
||||||
parsed := r.FindString(*ptr)
|
|
||||||
for i, name := range r.SubexpNames() {
|
|
||||||
if i > 0 && name != "" {
|
|
||||||
parsed = r.FindStringSubmatch(*ptr)[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*ptr = parsed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
event := model.Event{}
|
event := model.Event{}
|
||||||
@@ -84,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.Debug {
|
||||||
log.Printf("parsed slack message into models")
|
log.Printf("parsed slack message into models")
|
||||||
|
}
|
||||||
return json.Marshal(Models{
|
return json.Marshal(Models{
|
||||||
Event: event,
|
Event: event,
|
||||||
Message: message,
|
Message: message,
|
||||||
@@ -93,6 +88,17 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func withPattern(pattern string, given string) string {
|
||||||
|
r := regexp.MustCompile(pattern)
|
||||||
|
parsed := r.FindString(given)
|
||||||
|
for i, name := range r.SubexpNames() {
|
||||||
|
if i > 0 && name != "" {
|
||||||
|
parsed = r.FindStringSubmatch(given)[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parsed
|
||||||
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
parsedSlackMessage struct {
|
parsedSlackMessage struct {
|
||||||
ID string
|
ID string
|
||||||
@@ -227,6 +233,11 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func _parseSlack(b []byte) (slackMessage, error) {
|
func _parseSlack(b []byte) (slackMessage, error) {
|
||||||
|
var wrapper ChannelWrapper
|
||||||
|
if err := json.Unmarshal(b, &wrapper); err == nil && len(wrapper.V) > 0 {
|
||||||
|
b = wrapper.V
|
||||||
|
}
|
||||||
|
|
||||||
var result slackMessage
|
var result slackMessage
|
||||||
err := json.Unmarshal(b, &result)
|
err := json.Unmarshal(b, &result)
|
||||||
switch result.Type {
|
switch result.Type {
|
||||||
@@ -247,6 +258,9 @@ func _parseSlack(b []byte) (slackMessage, error) {
|
|||||||
}
|
}
|
||||||
result.Event.PreviousMessage = nil
|
result.Event.PreviousMessage = nil
|
||||||
}
|
}
|
||||||
|
if wrapper.Channel != "" {
|
||||||
|
result.Event.Channel = wrapper.Channel
|
||||||
|
}
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,3 +271,8 @@ func (this slackEvent) Empty() bool {
|
|||||||
func (this parsedSlackMessage) Time() time.Time {
|
func (this parsedSlackMessage) Time() time.Time {
|
||||||
return time.Unix(int64(this.TS), 0)
|
return time.Unix(int64(this.TS), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ChannelWrapper struct {
|
||||||
|
Channel string
|
||||||
|
V json.RawMessage
|
||||||
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
|
|||||||
"",
|
"",
|
||||||
"",
|
"",
|
||||||
"Datastores Non-Critical",
|
"Datastores Non-Critical",
|
||||||
false,
|
true,
|
||||||
),
|
),
|
||||||
Message: model.NewMessage(
|
Message: model.NewMessage(
|
||||||
"1712927439.728409/1712927439",
|
"1712927439.728409/1712927439",
|
||||||
@@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Name: "Opsgenie for Alert Management",
|
Name: "Opsgenie for Alert Management",
|
||||||
},
|
},
|
||||||
Attachments: []slackAttachment{{
|
Attachments: []slackAttachment{{
|
||||||
Color: "F4511E",
|
Color: "2ecc71",
|
||||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Fields: []slackField{
|
Fields: []slackField{
|
||||||
@@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
Author: "Opsgenie for Alert Management",
|
Author: "Opsgenie for Alert Management",
|
||||||
Team: "Datastores Non-Critical",
|
Team: "Datastores Non-Critical",
|
||||||
|
Resolved: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"opsgenie_alert_resolved.json": {
|
"opsgenie_alert_resolved.json": {
|
||||||
@@ -249,3 +250,54 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWrappedSlack(t *testing.T) {
|
||||||
|
b, _ := os.ReadFile("testdata/slack_events/human_thread_message_from_opsgenie_alert.json")
|
||||||
|
b2, _ := json.Marshal(ChannelWrapper{Channel: "X", V: json.RawMessage(b)})
|
||||||
|
|
||||||
|
if got, err := _parseSlack(b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got2, err := _parseSlack(b2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if got2.Event.Channel != "X" {
|
||||||
|
t.Error(got2.Event.Channel)
|
||||||
|
} else if got2.Event.ParentID == "" {
|
||||||
|
t.Error(got2.Event)
|
||||||
|
} else if got.Event.ParentID != got2.Event.ParentID {
|
||||||
|
t.Error(got, got2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWithPattern(t *testing.T) {
|
||||||
|
cases := map[string]struct {
|
||||||
|
given string
|
||||||
|
pattern string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
"pods unavailable on node": {
|
||||||
|
given: `pods are unavailable on node ip-12-345-67-890.xx-yyyyy-1.compute.internal.`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `ip-12-345-67-890.xx-yyyyy-1.compute.internal`,
|
||||||
|
},
|
||||||
|
"redis err": {
|
||||||
|
given: `Redis instance red-abc123 is emitting Some error repeatedly`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `red-abc123`,
|
||||||
|
},
|
||||||
|
"pg err": {
|
||||||
|
given: `db dpg-xyz123 is in a pinch`,
|
||||||
|
pattern: renderAssetPattern,
|
||||||
|
want: `dpg-xyz123`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, d := range cases {
|
||||||
|
c := d
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
got := withPattern(c.pattern, c.given)
|
||||||
|
if got != c.want {
|
||||||
|
t.Errorf("withPattern(%q, %q) expected %q but got %q", c.pattern, c.given, c.want, got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
149
slackscrape.go
Normal file
149
slackscrape.go
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlackScrape struct {
|
||||||
|
Latest int64
|
||||||
|
Oldest int64
|
||||||
|
ThreadTS string
|
||||||
|
Channel string
|
||||||
|
Token string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: cfg.slackScrapePipeline.reader,
|
||||||
|
process: newSlackScrapeProcess(cfg),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSlackScrapeProcess(cfg Config) processFunc {
|
||||||
|
limiter := rate.NewLimiter(0.5, 1)
|
||||||
|
return func(ctx context.Context, jobb []byte) ([]byte, error) {
|
||||||
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var job SlackScrape
|
||||||
|
if err := json.Unmarshal(jobb, &job); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: "slack.com",
|
||||||
|
Path: "/api/conversations.history",
|
||||||
|
}
|
||||||
|
q := url.Values{}
|
||||||
|
q.Set("channel", job.Channel)
|
||||||
|
q.Set("latest", strconv.FormatInt(job.Latest, 10))
|
||||||
|
q.Set("limit", "999")
|
||||||
|
q.Set("inclusive", "true")
|
||||||
|
if job.ThreadTS != "" {
|
||||||
|
u.Path = "/api/conversations.replies"
|
||||||
|
q.Set("ts", job.ThreadTS)
|
||||||
|
}
|
||||||
|
if job.Oldest != 0 {
|
||||||
|
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
|
||||||
|
}
|
||||||
|
u.RawQuery = q.Encode()
|
||||||
|
url := u.String()
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Authorization", "Bearer "+job.Token)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
httpc := http.Client{Timeout: time.Second}
|
||||||
|
resp, err := httpc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
defer io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var page struct {
|
||||||
|
Messages []json.RawMessage
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &page); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newLatest := float64(job.Latest)
|
||||||
|
for _, messageJSON := range page.Messages {
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var peekTS struct {
|
||||||
|
TS float64 `json:"ts,string"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
|
||||||
|
newLatest = peekTS.TS
|
||||||
|
}
|
||||||
|
if job.ThreadTS == "" {
|
||||||
|
var peek struct {
|
||||||
|
ThreadTS string `json:"thread_ts"`
|
||||||
|
}
|
||||||
|
json.Unmarshal(messageJSON, &peek)
|
||||||
|
if peek.ThreadTS != "" {
|
||||||
|
clone := job
|
||||||
|
clone.ThreadTS = peek.ThreadTS
|
||||||
|
clone.Oldest = 0
|
||||||
|
b, _ := json.Marshal(clone)
|
||||||
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(page.Messages) == 999 {
|
||||||
|
clone := job
|
||||||
|
clone.Latest = int64(newLatest)
|
||||||
|
b, _ := json.Marshal(clone)
|
||||||
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("scraped %v from %s", len(page.Messages), url)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
102
storage.go
102
storage.go
@@ -67,10 +67,112 @@ func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error)
|
|||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetEventThreads(ctx context.Context, ID string) ([]model.Thread, error) {
|
||||||
|
return s.selectThreadsWhere(ctx, "EventID = $1", ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetThreadMessages(ctx context.Context, ID string) ([]model.Message, error) {
|
||||||
|
return s.selectMessagesWhere(ctx, "ThreadID = $1", ID)
|
||||||
|
}
|
||||||
|
|
||||||
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||||
return s.upsert(ctx, "threads", thread)
|
return s.upsert(ctx, "threads", thread)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) selectThreadsWhere(ctx context.Context, clause string, args ...any) ([]model.Thread, error) {
|
||||||
|
keys, _, _, _, err := keysArgsKeyargsValues(model.Thread{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args2 := make([]any, len(args))
|
||||||
|
for i := range args {
|
||||||
|
args2[i], _ = json.Marshal(args[i])
|
||||||
|
}
|
||||||
|
scanTargets := make([]any, len(keys))
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
SELECT %s FROM threads WHERE %s
|
||||||
|
ORDER BY TS ASC
|
||||||
|
`, strings.Join(keys, ", "), clause)
|
||||||
|
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []model.Thread
|
||||||
|
for rows.Next() {
|
||||||
|
for i := range scanTargets {
|
||||||
|
scanTargets[i] = &[]byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanTargets...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := map[string]json.RawMessage{}
|
||||||
|
for i, k := range keys {
|
||||||
|
m[k] = *scanTargets[i].(*[]byte)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
|
||||||
|
var one model.Thread
|
||||||
|
if err := json.Unmarshal(b, &one); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, one)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) selectMessagesWhere(ctx context.Context, clause string, args ...any) ([]model.Message, error) {
|
||||||
|
keys, _, _, _, err := keysArgsKeyargsValues(model.Message{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args2 := make([]any, len(args))
|
||||||
|
for i := range args {
|
||||||
|
args2[i], _ = json.Marshal(args[i])
|
||||||
|
}
|
||||||
|
scanTargets := make([]any, len(keys))
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
SELECT %s FROM messages WHERE %s
|
||||||
|
ORDER BY TS ASC
|
||||||
|
`, strings.Join(keys, ", "), clause)
|
||||||
|
rows, err := s.driver.QueryContext(ctx, q, args2...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []model.Message
|
||||||
|
for rows.Next() {
|
||||||
|
for i := range scanTargets {
|
||||||
|
scanTargets[i] = &[]byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanTargets...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := map[string]json.RawMessage{}
|
||||||
|
for i, k := range keys {
|
||||||
|
m[k] = *scanTargets[i].(*[]byte)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
|
||||||
|
var one model.Message
|
||||||
|
if err := json.Unmarshal(b, &one); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, one)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||||
if questions := strings.Count(clause, "$"); questions != len(args) {
|
if questions := strings.Count(clause, "$"); questions != len(args) {
|
||||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -85,4 +87,64 @@ func TestStorage(t *testing.T) {
|
|||||||
t.Fatal("unexpected result from get:", got)
|
t.Fatal("unexpected result from get:", got)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("get thread messages", func(t *testing.T) {
|
||||||
|
thread := fmt.Sprintf("thread-%d", rand.Int())
|
||||||
|
m := model.NewMessage(
|
||||||
|
"ID",
|
||||||
|
1,
|
||||||
|
"Author",
|
||||||
|
"Plaintext",
|
||||||
|
thread,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertMessage(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if m2, err := s.GetMessage(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on upsert-get:", err)
|
||||||
|
} else if m2 != m {
|
||||||
|
t.Errorf("expected %+v but got %+v", m, m2)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := s.GetThreadMessages(ctx, thread)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(msgs) != 1 {
|
||||||
|
t.Fatal(msgs)
|
||||||
|
} else if msgs[0].ThreadID != m.ThreadID {
|
||||||
|
t.Fatal(msgs[0].ThreadID)
|
||||||
|
} else if msgs[0] != m {
|
||||||
|
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("get event threads", func(t *testing.T) {
|
||||||
|
event := fmt.Sprintf("event-%d", rand.Int())
|
||||||
|
m := model.NewThread(
|
||||||
|
"ID",
|
||||||
|
"URL",
|
||||||
|
1,
|
||||||
|
"Channel",
|
||||||
|
event,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertThread(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if m2, err := s.GetThread(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on upsert-get:", err)
|
||||||
|
} else if m2 != m {
|
||||||
|
t.Errorf("expected %+v but got %+v", m, m2)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := s.GetEventThreads(ctx, event)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(msgs) != 1 {
|
||||||
|
t.Fatal(msgs)
|
||||||
|
} else if msgs[0].EventID != m.EventID {
|
||||||
|
t.Fatal(msgs[0].EventID)
|
||||||
|
} else if msgs[0] != m {
|
||||||
|
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
3
testdata/slack_events/opsgenie_alert.json
vendored
3
testdata/slack_events/opsgenie_alert.json
vendored
@@ -29,7 +29,8 @@
|
|||||||
"attachments": [
|
"attachments": [
|
||||||
{
|
{
|
||||||
"id": 1,
|
"id": 1,
|
||||||
"color": "F4511E",
|
"realcolor": "F4511E",
|
||||||
|
"color": "2ecc71",
|
||||||
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
|||||||
Reference in New Issue
Block a user