Compare commits

...

32 Commits

Author SHA1 Message Date
Bel LaPointe
31fe916000 DO AS I WILL 2024-04-19 15:15:08 -06:00
Bel LaPointe
d31042e971 prompt up again 2024-04-19 15:07:12 -06:00
Bel LaPointe
2e7d58fd13 prompt up 2024-04-19 14:56:21 -06:00
Bel LaPointe
93672e67a6 gr 2024-04-19 14:33:12 -06:00
Bel LaPointe
1b06f727fd OBEY 2024-04-19 14:25:01 -06:00
Bel LaPointe
3ae62390cf more prompt 2024-04-19 14:18:18 -06:00
Bel LaPointe
5cfb89bc64 up queue timeout to 10min for ai reasons 2024-04-19 14:14:07 -06:00
Bel LaPointe
b554be6282 update prompt 2024-04-19 14:10:48 -06:00
Bel LaPointe
5785ea37ae better recap prompts by doing an intro with the OP 2024-04-19 14:01:24 -06:00
Bel LaPointe
f27d416a5a get recap prompt from $RECAP_PROMPT 2024-04-19 13:32:12 -06:00
Bel LaPointe
81793876f8 readme 2024-04-19 13:24:48 -06:00
Bel LaPointe
6d81164161 impl PersistenceToRecap pipeline where each resolved event gets an ai recap of each of its threads that have messages persisted under the thread as a Recap column 2024-04-19 13:19:14 -06:00
Bel LaPointe
20256bd6b4 try and raise ollama timeout 2024-04-19 13:18:12 -06:00
Bel LaPointe
e5e98e2890 reenable queue new_persistence 2024-04-19 12:44:58 -06:00
Bel LaPointe
4fb26ec775 fix sqlite :memory: dont actually work 2024-04-19 12:42:36 -06:00
Bel LaPointe
782b9ec3cf if no therads or no messages then no ai 2024-04-19 12:39:50 -06:00
Bel LaPointe
9d7f69bd8a default ollama model to llama3 2024-04-19 12:36:25 -06:00
Bel LaPointe
12de99da57 impl GET /api/v1/rpc/aievent?id=123 2024-04-19 12:34:49 -06:00
Bel LaPointe
81fe8070ca impl storage GetEventThreads 2024-04-19 12:25:40 -06:00
Bel LaPointe
79de56e236 impl storage.GetThreadMessages 2024-04-19 12:21:31 -06:00
Bel LaPointe
f485b5ea88 from OLLAMA_U_R_L to OLLAMA_URL 2024-04-19 11:54:09 -06:00
Bel LaPointe
894536d209 oops drop bad log 2024-04-18 15:05:25 -06:00
Bel LaPointe
f8861a73b5 async slack scrape goes up to ?since 2024-04-18 14:56:33 -06:00
Bel LaPointe
14de286415 go test -tags=ai -v -run=AI works with ollama which is cool and fast with llama3 2024-04-18 14:08:28 -06:00
Bel LaPointe
8557ddc522 boo 2024-04-17 17:32:07 -06:00
Bel LaPointe
1e43c2a14e split 2024-04-17 17:28:48 -06:00
Bel LaPointe
04c574ffec f it 2024-04-17 16:58:06 -06:00
Bel LaPointe
b2f64037e2 GET /api/v1/version 2024-04-17 16:23:15 -06:00
Bel LaPointe
fbd151f9ef when initializing slack, stash token in driver 2024-04-17 16:07:43 -06:00
Bel LaPointe
5f21098fdc test and update asset pattern to catch ip addresses 2024-04-17 03:58:43 -06:00
Bel LaPointe
7c2d663401 do not cry to me 2024-04-16 15:08:21 -06:00
Bel LaPointe
95b0394199 slack can parse optional channel wrapper for scrape 2024-04-16 15:03:24 -06:00
21 changed files with 783 additions and 372 deletions

View File

@@ -4,14 +4,9 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
## TODO ## TODO
- dedupe critical+noncritical
- what SLO/SLI can I help benoit with - what SLO/SLI can I help benoit with
- break into smaller goals
- sell to the team
- scott; like to keep state in incident.io and zendesk - scott; like to keep state in incident.io and zendesk
- @spoc -ignore, @spoc -s summary - @spoc -ignore, @spoc -s summary
- limit rps from rpc/slackscrape
- rpc/slackscrape to async
- limit queue retries - limit queue retries
``` ```

139
ai.go
View File

@@ -1,13 +1,10 @@
package main package main
import ( import (
"bytes"
"context" "context"
"os" "net/http"
"strings" "time"
nn "github.com/nikolaydubina/llama2.go/exp/nnfast"
"github.com/nikolaydubina/llama2.go/llama2"
"github.com/tmc/langchaingo/llms" "github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama" "github.com/tmc/langchaingo/llms/ollama"
) )
@@ -37,133 +34,23 @@ func NewAIOllama(url, model string) AIOllama {
} }
func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) { func (ai AIOllama) Do(ctx context.Context, prompt string) (string, error) {
c := &http.Client{
Timeout: time.Hour,
Transport: &http.Transport{
//DisableKeepAlives: true,
IdleConnTimeout: time.Hour,
ResponseHeaderTimeout: time.Hour,
ExpectContinueTimeout: time.Hour,
},
}
defer c.CloseIdleConnections()
llm, err := ollama.New( llm, err := ollama.New(
ollama.WithModel(ai.model), ollama.WithModel(ai.model),
ollama.WithServerURL(ai.url), ollama.WithServerURL(ai.url),
ollama.WithHTTPClient(c),
) )
if err != nil { if err != nil {
return "", err return "", err
} }
return llms.GenerateFromSinglePrompt(ctx, llm, prompt) return llms.GenerateFromSinglePrompt(ctx, llm, prompt)
} }
type AILocal struct {
checkpointPath string
tokenizerPath string
temperature float64
steps int
topp float64
}
func NewAILocal(
checkpointPath string,
tokenizerPath string,
temperature float64,
steps int,
topp float64,
) AILocal {
return AILocal{
checkpointPath: checkpointPath,
tokenizerPath: tokenizerPath,
temperature: temperature,
steps: steps,
topp: topp,
}
}
// https://github.com/nikolaydubina/llama2.go/blob/master/main.go
func (ai AILocal) Do(ctx context.Context, prompt string) (string, error) {
checkpointFile, err := os.OpenFile(ai.checkpointPath, os.O_RDONLY, 0)
if err != nil {
return "", err
}
defer checkpointFile.Close()
config, err := llama2.NewConfigFromCheckpoint(checkpointFile)
if err != nil {
return "", err
}
isSharedWeights := config.VocabSize > 0
if config.VocabSize < 0 {
config.VocabSize = -config.VocabSize
}
tokenizerFile, err := os.OpenFile(ai.tokenizerPath, os.O_RDONLY, 0)
if err != nil {
return "", err
}
defer tokenizerFile.Close()
vocab := llama2.NewVocabFromFile(config.VocabSize, tokenizerFile)
w := llama2.NewTransformerWeightsFromCheckpoint(config, checkpointFile, isSharedWeights)
// right now we cannot run for more than config.SeqLen steps
steps := ai.steps
if steps <= 0 || steps > config.SeqLen {
steps = config.SeqLen
}
runState := llama2.NewRunState(config)
promptTokens := vocab.Encode(strings.ReplaceAll(prompt, "\n", "<0x0A>"))
out := bytes.NewBuffer(nil)
// the current position we are in
var token int = 1 // 1 = BOS token in llama-2 sentencepiece
var pos = 0
for pos < steps {
// forward the transformer to get logits for the next token
llama2.Transformer(token, pos, config, runState, w)
var next int
if pos < len(promptTokens) {
next = promptTokens[pos]
} else {
// sample the next token
if ai.temperature == 0 {
// greedy argmax sampling
next = nn.ArgMax(runState.Logits)
} else {
// apply the temperature to the logits
for q := 0; q < config.VocabSize; q++ {
runState.Logits[q] /= float32(ai.temperature)
}
// apply softmax to the logits to the probabilities for next token
nn.SoftMax(runState.Logits)
// we now want to sample from this distribution to get the next token
if ai.topp <= 0 || ai.topp >= 1 {
// simply sample from the predicted probability distribution
next = nn.Sample(runState.Logits)
} else {
// top-p (nucleus) sampling, clamping the least likely tokens to zero
next = nn.SampleTopP(runState.Logits, float32(ai.topp))
}
}
}
pos++
// data-dependent terminating condition: the BOS (1) token delimits sequences
if next == 1 {
break
}
// following BOS (1) token, sentencepiece decoder strips any leading whitespace
var tokenStr string
if token == 1 && vocab.Words[next][0] == ' ' {
tokenStr = vocab.Words[next][1:]
} else {
tokenStr = vocab.Words[next]
}
out.Write([]byte(tokenStr))
// advance forward
token = next
}
out.Write([]byte("\n"))
return strings.ReplaceAll(string(out.Bytes()), "<0x0A>", "\n"), nil
}

View File

@@ -4,11 +4,6 @@ package main
import ( import (
"context" "context"
"fmt"
"io"
"net/http"
"os"
"path"
"testing" "testing"
"time" "time"
) )
@@ -22,53 +17,7 @@ func TestAINoop(t *testing.T) {
func TestAIOllama(t *testing.T) { func TestAIOllama(t *testing.T) {
t.Parallel() t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b") ai := NewAIOllama("http://localhost:11434", "llama3")
testAI(t, ai)
}
func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir()
checkpoints := "checkpoints"
tokenizer := "tokenizer"
for u, p := range map[string]*string{
"https://huggingface.co/karpathy/tinyllamas/resolve/main/stories110M.bin": &checkpoints,
"https://github.com/karpathy/llama2.c/raw/master/tokenizer.bin": &tokenizer,
} {
func() {
*p = path.Base(u)
if _, err := os.Stat(path.Join(d, *p)); os.IsNotExist(err) {
t.Logf("downloading %s from %s", u, *p)
resp, err := http.Get(u)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
f, err := os.Create(path.Join(d, *p))
if err != nil {
t.Fatal(err)
}
defer f.Close()
if _, err := io.Copy(f, resp.Body); err != nil {
f.Close()
os.Remove(path.Join(d, *p))
t.Fatal(err)
}
}
}()
}
ai := NewAILocal(
path.Join(d, checkpoints),
path.Join(d, tokenizer),
0.9,
256,
0.9,
)
testAI(t, ai) testAI(t, ai)
} }
@@ -78,7 +27,7 @@ func testAI(t *testing.T, ai AI) {
defer can() defer can()
t.Run("mvp", func(t *testing.T) { t.Run("mvp", func(t *testing.T) {
if result, err := ai.Do(ctx, "hello world"); err != nil { if result, err := ai.Do(ctx, "Tell me a fun fact."); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(result) < 3 { } else if len(result) < 3 {
t.Error(result) t.Error(result)
@@ -87,6 +36,7 @@ func testAI(t *testing.T, ai AI) {
} }
}) })
/*
t.Run("simulation", func(t *testing.T) { t.Run("simulation", func(t *testing.T) {
d := NewRAM() d := NewRAM()
FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern) FillWithTestdata(ctx, d, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
@@ -114,5 +64,5 @@ func testAI(t *testing.T, ai AI) {
} }
t.Logf("\n\t%s\n->\n\t%s", input, result) t.Logf("\n\t%s\n->\n\t%s", input, result)
}) })
*/
} }

View File

@@ -23,10 +23,10 @@ type Config struct {
BasicAuthUser string BasicAuthUser string
BasicAuthPassword string BasicAuthPassword string
FillWithTestdata bool FillWithTestdata bool
OllamaURL string OllamaUrl string
OllamaModel string OllamaModel string
LocalCheckpoint string RecapPromptIntro string
LocalTokenizer string RecapPrompt string
AssetPattern string AssetPattern string
DatacenterPattern string DatacenterPattern string
EventNamePattern string EventNamePattern string
@@ -34,11 +34,13 @@ type Config struct {
storage Storage storage Storage
ai AI ai AI
slackToModelPipeline Pipeline slackToModelPipeline Pipeline
slackScrapePipeline Pipeline
modelToPersistencePipeline Pipeline modelToPersistencePipeline Pipeline
persistenceToRecapPipeline Pipeline
} }
var ( var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]` renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]|ip-[0-9]+-[0-9]+-[0-9]+-[0-9]+\.[a-z]+-[a-z]+-[0-9]+\.compute\.internal`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]` renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)` renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
) )
@@ -50,10 +52,12 @@ func newConfig(ctx context.Context) (Config, error) {
func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) { func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, error) {
def := Config{ def := Config{
Port: 38080, Port: 38080,
OllamaModel: "gemma:2b", OllamaModel: "llama3",
AssetPattern: renderAssetPattern, AssetPattern: renderAssetPattern,
DatacenterPattern: renderDatacenterPattern, DatacenterPattern: renderDatacenterPattern,
EventNamePattern: renderEventNamePattern, EventNamePattern: renderEventNamePattern,
RecapPromptIntro: "A Slack thread began with the following original post.",
RecapPrompt: "What is the summary of the responses to the Slack thread consisting of the following messages? Limit the summary to one sentence. Do not include any leading text. Be as brief as possible. No context is needed.",
} }
var m map[string]any var m map[string]any
@@ -130,10 +134,8 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.storage = storage result.storage = storage
if result.OllamaURL != "" { if result.OllamaUrl != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel) result.ai = NewAIOllama(result.OllamaUrl, result.OllamaModel)
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
result.ai = NewAILocal(result.LocalCheckpoint, result.LocalTokenizer, 0.9, 128, 0.9)
} else { } else {
result.ai = NewAINoop() result.ai = NewAINoop()
} }
@@ -150,5 +152,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.modelToPersistencePipeline = modelToPersistencePipeline result.modelToPersistencePipeline = modelToPersistencePipeline
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackScrapePipeline = slackScrapePipeline
persistenceToRecapPipeline, err := NewPersistenceToRecapPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToRecapPipeline = persistenceToRecapPipeline
return result, nil return result, nil
} }

View File

@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"os"
"path" "path"
"testing" "testing"
@@ -35,7 +36,12 @@ func NewTestDriver(t *testing.T, optionalP ...string) Driver {
func NewDriver(ctx context.Context, conn string) (Driver, error) { func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite" engine := "sqlite"
if conn == "" { if conn == "" {
conn = ":memory:" f, err := os.CreateTemp(os.TempDir(), "spoc-bot-vr-undef-*.db")
if err != nil {
return Driver{}, err
}
f.Close()
conn = f.Name()
} else { } else {
if u, err := url.Parse(conn); err != nil { if u, err := url.Parse(conn); err != nil {
return Driver{}, err return Driver{}, err

5
go.mod
View File

@@ -6,9 +6,9 @@ require (
github.com/glebarez/go-sqlite v1.21.2 github.com/glebarez/go-sqlite v1.21.2
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8 github.com/tmc/langchaingo v0.1.8
gotest.tools/v3 v3.5.1 golang.org/x/time v0.5.0
gotest.tools v2.2.0+incompatible
) )
require ( require (
@@ -20,7 +20,6 @@ require (
github.com/pkoukk/tiktoken-go v0.1.6 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
modernc.org/libc v1.22.5 // indirect modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect

6
go.sum
View File

@@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
@@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=

139
main.go
View File

@@ -39,6 +39,8 @@ func run(ctx context.Context, cfg Config) error {
case err := <-processPipelines(ctx, case err := <-processPipelines(ctx,
cfg.slackToModelPipeline, cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline, cfg.modelToPersistencePipeline,
cfg.slackScrapePipeline,
cfg.persistenceToRecapPipeline,
): ):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
@@ -85,8 +87,10 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc { func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug { if cfg.Debug {
@@ -99,89 +103,51 @@ func newHandler(cfg Config) http.HandlerFunc {
} }
} }
var Version = "undef"
func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
json.NewEncoder(w).Encode(map[string]any{"version": Version})
}
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
event := r.URL.Query().Get("id")
b, _ := json.Marshal(ModelIDs{Event: event})
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"event": event})
}
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) { if !basicAuth(cfg, w, r) {
return return
} }
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel} since, err := parseSince(r.URL.Query().Get("since"))
httpc := http.Client{Timeout: time.Second}
get := func(url string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil { if err != nil {
return nil, err http.Error(w, err.Error(), http.StatusBadRequest)
}
req.Header.Set("Authorization", "Bearer "+token)
req = req.WithContext(r.Context())
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
}
n := 0
for len(urls) > 0 {
url := urls[0]
urls = urls[1:]
select {
case <-r.Context().Done():
case <-time.After(time.Second):
}
body, err := get(url)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return return
} }
var page struct { job, _ := json.Marshal(SlackScrape{
Messages []json.RawMessage Latest: time.Now().Unix(),
} Oldest: since.Unix(),
if err := json.Unmarshal(body, &page); err != nil { ThreadTS: "",
Channel: r.Header.Get("slack-channel"),
Token: r.Header.Get("slack-oauth-token"),
})
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
errs := []error{}
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
} else {
n += 1
}
if !strings.Contains(url, "ts=") {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
}
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
}
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
} }
} }
@@ -195,12 +161,13 @@ func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack { if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize return handlerPostAPIV1EventsSlackInitialize(cfg)
} }
return _newHandlerPostAPIV1EventsSlack(cfg) return _newHandlerPostAPIV1EventsSlack(cfg)
} }
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body) b, _ := io.ReadAll(r.Body)
var challenge struct { var challenge struct {
Token string Token string
@@ -211,14 +178,32 @@ func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Reques
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
cfg.driver.ExecContext(r.Context(), `
CREATE TABLE
IF NOT EXISTS
initialization (
label TEXT,
token TEXT,
updated TIMESTAMP
)
`)
if _, err := cfg.driver.ExecContext(r.Context(), `
INSERT
INTO initialization (label, token, updated)
VALUES ('slack_events_webhook_token', $1, $2)
`, challenge.Token, time.Now().UTC()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Println("stashed new slack initialization token", challenge.Token)
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
}
} }
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body) body, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b)) r.Body = io.NopCloser(bytes.NewReader(body))
var allowList struct { var allowList struct {
Token string Token string
@@ -226,7 +211,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
Channel string Channel string
} }
} }
if err := json.Unmarshal(b, &allowList); err != nil { if err := json.Unmarshal(body, &allowList); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} else if allowList.Token != cfg.SlackToken { } else if allowList.Token != cfg.SlackToken {
@@ -243,7 +228,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return return
} }
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil { if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil {
log.Printf("failed to ingest: %v", err) log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@@ -36,6 +36,7 @@ func TestRun(t *testing.T) {
return int(port) return int(port)
}() }()
u := fmt.Sprintf("http://localhost:%d", port) u := fmt.Sprintf("http://localhost:%d", port)
var err error
cfg := Config{} cfg := Config{}
cfg.DatacenterPattern = renderDatacenterPattern cfg.DatacenterPattern = renderDatacenterPattern
@@ -44,10 +45,16 @@ func TestRun(t *testing.T) {
cfg.Port = port cfg.Port = port
cfg.driver = NewTestDriver(t) cfg.driver = NewTestDriver(t)
cfg.storage, _ = NewStorage(ctx, cfg.driver) cfg.storage, _ = NewStorage(ctx, cfg.driver)
cfg.ai = NewAINoop()
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg) cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg) cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
cfg.persistenceToRecapPipeline, err = NewPersistenceToRecapPipeline(ctx, cfg)
if err != nil {
t.Fatal(err)
}
go func() { go func() {
if err := run(ctx, cfg); err != nil && ctx.Err() == nil { if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
@@ -83,4 +90,41 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b) t.Fatalf("(%d) %s", resp.StatusCode, b)
} }
}) })
t.Run("GET /api/v1/rpc/recapevent", func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", "human_thread_message_from_opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
b, err = os.ReadFile(path.Join("testdata", "slack_events", "opsgenie_alert.json"))
if err != nil {
t.Fatal(err)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal(err)
}
for ctx.Err() == nil {
if thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409"); thread.Recap != "" {
break
}
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 100):
}
}
if err := ctx.Err(); err != nil {
t.Fatal("timed out waiting for recap")
}
thread, _ := cfg.storage.GetThread(ctx, "1712927439.728409")
if thread.Recap == "" {
t.Error(thread.Recap)
}
t.Log(thread.Recap)
})
} }

View File

@@ -8,6 +8,7 @@ type Thread struct {
TS uint64 TS uint64
Channel string Channel string
EventID string EventID string
Recap string
} }
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread { func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {

View File

@@ -26,15 +26,14 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
writer = NewNoopQueue()
return Pipeline{ return Pipeline{
writer: writer, writer: writer,
reader: reader, reader: reader,
process: newModelToPersistenceProcess(cfg.storage), process: newModelToPersistenceProcess(cfg, cfg.storage),
}, nil }, nil
} }
func newModelToPersistenceProcess(storage Storage) processFunc { func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
return func(ctx context.Context, models []byte) ([]byte, error) { return func(ctx context.Context, models []byte) ([]byte, error) {
var m Models var m Models
if err := json.Unmarshal(models, &m); err != nil { if err := json.Unmarshal(models, &m); err != nil {
@@ -56,7 +55,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
return nil, fmt.Errorf("failed to persist message: %w", err) return nil, fmt.Errorf("failed to persist message: %w", err)
} }
if cfg.Debug {
log.Printf("persisted models") log.Printf("persisted models")
}
return json.Marshal(ModelIDs{ return json.Marshal(ModelIDs{
Event: m.Event.ID, Event: m.Event.ID,
Thread: m.Thread.ID, Thread: m.Thread.ID,

View File

@@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
d := NewTestDriver(t) d := NewTestDriver(t)
s, _ := NewStorage(ctx, d) s, _ := NewStorage(ctx, d)
process := newModelToPersistenceProcess(s) process := newModelToPersistenceProcess(Config{}, s)
_, _ = ctx, process _, _ = ctx, process

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -36,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
if q.driver.DB == nil { if q.driver.DB == nil {
return nil return nil
} }
_, err := q.driver.ExecContext(ctx, ` result, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3) INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
`, `,
uuid.New().String(),
q.topic, q.topic,
time.Now().Unix(), time.Now().Unix(),
b, b,
uuid.New().String(),
) )
if err != nil {
return err return err
}
if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
}
return nil
} }
func (q Queue) Syn(ctx context.Context) (string, []byte, error) { func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
@@ -81,7 +90,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
topic = $3 topic = $3
AND ( AND (
reservation IS NULL reservation IS NULL
OR $4 - updated > 60 OR $4 - updated > 600
) )
LIMIT 1 LIMIT 1
) )
@@ -101,7 +110,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
`, reservation) `, reservation)
if err := row.Err(); err != nil { if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err) return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil { } else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
} }

87
recap.go Normal file
View File

@@ -0,0 +1,87 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
)
type PersistenceToRecap struct {
pipeline Pipeline
}
func NewPersistenceToRecapPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer := NewNoopQueue()
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToRecapProcess(cfg),
}, nil
}
func newPersistenceToRecapProcess(cfg Config) processFunc {
return func(ctx context.Context, modelIDs []byte) ([]byte, error) {
var m ModelIDs
if err := json.Unmarshal(modelIDs, &m); err != nil {
return nil, fmt.Errorf("received non model ids payload: %w", err)
}
if m.Event == "" {
} else if event, err := cfg.storage.GetEvent(ctx, m.Event); err != nil {
return nil, err
} else if !event.Resolved {
} else if err := func() error {
threads, err := cfg.storage.GetEventThreads(ctx, event.ID)
if err != nil {
return err
}
for _, thread := range threads {
messages, err := cfg.storage.GetThreadMessages(ctx, thread.ID)
if err != nil {
return err
} else if len(messages) < 2 {
continue
}
prompt := []string{
cfg.RecapPromptIntro,
"---",
messages[0].Plaintext,
"---",
cfg.RecapPrompt,
"---",
}
for _, message := range messages[1:] {
prompt = append(prompt, fmt.Sprintf("%s\n%s", message.Author, message.Plaintext))
}
recap, err := cfg.ai.Do(ctx, strings.Join(prompt, "\n\n"))
if err != nil {
return err
}
thread.Recap = recap
if err := cfg.storage.UpsertThread(ctx, thread); err != nil {
return err
}
log.Println("recapped", thread.ID)
if cfg.Debug {
log.Printf("Recapped %q as %q from %q/%q and %+v", thread.ID, thread.Recap, cfg.RecapPromptIntro, cfg.RecapPrompt, messages)
}
}
return nil
}(); err != nil {
return nil, err
}
if cfg.Debug {
log.Printf("persisted recap")
}
return nil, nil
}
}

50
recap_test.go Normal file
View File

@@ -0,0 +1,50 @@
package main
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestNewPersistenceToRecapProcess(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
s, _ := NewStorage(ctx, d)
cfg := Config{
driver: d,
storage: s,
ai: NewAINoop(),
Debug: true,
}
proc := newPersistenceToRecapProcess(cfg)
if err := s.UpsertEvent(ctx, model.NewEvent("Event", "", 0, "", "", "", "", true)); err != nil {
t.Fatal(err)
} else if err := s.UpsertThread(ctx, model.NewThread("Thread", "", 0, "", "Event")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Root", 0, "bot", "an alert has fired", "Thread")); err != nil {
t.Fatal(err)
} else if err := s.UpsertMessage(ctx, model.NewMessage("Message", 0, "me", "hello world", "Thread")); err != nil {
t.Fatal(err)
}
b, _ := json.Marshal(ModelIDs{Event: "Event"})
if _, err := proc(ctx, b); err != nil {
t.Error(err)
}
if thread, err := s.GetThread(ctx, "Thread"); err != nil {
t.Error(err)
} else if thread.Recap == "" {
t.Error("no recap:", thread.Recap)
} else {
t.Logf("%+v", thread)
}
}

View File

@@ -61,14 +61,7 @@ func newSlackToModelProcess(cfg Config) processFunc {
cfg.DatacenterPattern: &s.Datacenter, cfg.DatacenterPattern: &s.Datacenter,
cfg.EventNamePattern: &s.EventName, cfg.EventNamePattern: &s.EventName,
} { } {
r := regexp.MustCompile(pattern) *ptr = withPattern(pattern, *ptr)
parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
} }
event := model.Event{} event := model.Event{}
@@ -84,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event) thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
} }
if cfg.Debug {
log.Printf("parsed slack message into models") log.Printf("parsed slack message into models")
}
return json.Marshal(Models{ return json.Marshal(Models{
Event: event, Event: event,
Message: message, Message: message,
@@ -93,6 +88,17 @@ func newSlackToModelProcess(cfg Config) processFunc {
} }
} }
func withPattern(pattern string, given string) string {
r := regexp.MustCompile(pattern)
parsed := r.FindString(given)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(given)[i]
}
}
return parsed
}
type ( type (
parsedSlackMessage struct { parsedSlackMessage struct {
ID string ID string
@@ -227,6 +233,11 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
} }
func _parseSlack(b []byte) (slackMessage, error) { func _parseSlack(b []byte) (slackMessage, error) {
var wrapper ChannelWrapper
if err := json.Unmarshal(b, &wrapper); err == nil && len(wrapper.V) > 0 {
b = wrapper.V
}
var result slackMessage var result slackMessage
err := json.Unmarshal(b, &result) err := json.Unmarshal(b, &result)
switch result.Type { switch result.Type {
@@ -247,6 +258,9 @@ func _parseSlack(b []byte) (slackMessage, error) {
} }
result.Event.PreviousMessage = nil result.Event.PreviousMessage = nil
} }
if wrapper.Channel != "" {
result.Event.Channel = wrapper.Channel
}
return result, err return result, err
} }
@@ -257,3 +271,8 @@ func (this slackEvent) Empty() bool {
func (this parsedSlackMessage) Time() time.Time { func (this parsedSlackMessage) Time() time.Time {
return time.Unix(int64(this.TS), 0) return time.Unix(int64(this.TS), 0)
} }
type ChannelWrapper struct {
Channel string
V json.RawMessage
}

View File

@@ -41,7 +41,7 @@ func TestSlackToModelPipeline(t *testing.T) {
"", "",
"", "",
"Datastores Non-Critical", "Datastores Non-Critical",
false, true,
), ),
Message: model.NewMessage( Message: model.NewMessage(
"1712927439.728409/1712927439", "1712927439.728409/1712927439",
@@ -141,7 +141,7 @@ func TestParseSlackTestdata(t *testing.T) {
Name: "Opsgenie for Alert Management", Name: "Opsgenie for Alert Management",
}, },
Attachments: []slackAttachment{{ Attachments: []slackAttachment{{
Color: "F4511E", Color: "2ecc71",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed", Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{ Fields: []slackField{
@@ -166,6 +166,7 @@ func TestParseSlackTestdata(t *testing.T) {
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management", Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical", Team: "Datastores Non-Critical",
Resolved: true,
}, },
}, },
"opsgenie_alert_resolved.json": { "opsgenie_alert_resolved.json": {
@@ -249,3 +250,54 @@ func TestParseSlackTestdata(t *testing.T) {
}) })
} }
} }
func TestWrappedSlack(t *testing.T) {
b, _ := os.ReadFile("testdata/slack_events/human_thread_message_from_opsgenie_alert.json")
b2, _ := json.Marshal(ChannelWrapper{Channel: "X", V: json.RawMessage(b)})
if got, err := _parseSlack(b); err != nil {
t.Fatal(err)
} else if got2, err := _parseSlack(b2); err != nil {
t.Fatal(err)
} else if got2.Event.Channel != "X" {
t.Error(got2.Event.Channel)
} else if got2.Event.ParentID == "" {
t.Error(got2.Event)
} else if got.Event.ParentID != got2.Event.ParentID {
t.Error(got, got2)
}
}
func TestWithPattern(t *testing.T) {
cases := map[string]struct {
given string
pattern string
want string
}{
"pods unavailable on node": {
given: `pods are unavailable on node ip-12-345-67-890.xx-yyyyy-1.compute.internal.`,
pattern: renderAssetPattern,
want: `ip-12-345-67-890.xx-yyyyy-1.compute.internal`,
},
"redis err": {
given: `Redis instance red-abc123 is emitting Some error repeatedly`,
pattern: renderAssetPattern,
want: `red-abc123`,
},
"pg err": {
given: `db dpg-xyz123 is in a pinch`,
pattern: renderAssetPattern,
want: `dpg-xyz123`,
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
got := withPattern(c.pattern, c.given)
if got != c.want {
t.Errorf("withPattern(%q, %q) expected %q but got %q", c.pattern, c.given, c.want, got)
}
})
}
}

149
slackscrape.go Normal file
View File

@@ -0,0 +1,149 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"time"
"golang.org/x/time/rate"
)
type SlackScrape struct {
Latest int64
Oldest int64
ThreadTS string
Channel string
Token string
}
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: cfg.slackScrapePipeline.reader,
process: newSlackScrapeProcess(cfg),
}, nil
}
func newSlackScrapeProcess(cfg Config) processFunc {
limiter := rate.NewLimiter(0.5, 1)
return func(ctx context.Context, jobb []byte) ([]byte, error) {
if err := limiter.Wait(ctx); err != nil {
return nil, err
}
var job SlackScrape
if err := json.Unmarshal(jobb, &job); err != nil {
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
}
u := url.URL{
Scheme: "https",
Host: "slack.com",
Path: "/api/conversations.history",
}
q := url.Values{}
q.Set("channel", job.Channel)
q.Set("latest", strconv.FormatInt(job.Latest, 10))
q.Set("limit", "999")
q.Set("inclusive", "true")
if job.ThreadTS != "" {
u.Path = "/api/conversations.replies"
q.Set("ts", job.ThreadTS)
}
if job.Oldest != 0 {
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
}
u.RawQuery = q.Encode()
url := u.String()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+job.Token)
req = req.WithContext(ctx)
httpc := http.Client{Timeout: time.Second}
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
return nil, err
}
newLatest := float64(job.Latest)
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
var peekTS struct {
TS float64 `json:"ts,string"`
}
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
newLatest = peekTS.TS
}
if job.ThreadTS == "" {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
clone := job
clone.ThreadTS = peek.ThreadTS
clone.Oldest = 0
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
}
}
}
if len(page.Messages) == 999 {
clone := job
clone.Latest = int64(newLatest)
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
}
log.Printf("scraped %v from %s", len(page.Messages), url)
return nil, nil
}
}

View File

@@ -67,10 +67,112 @@ func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error)
return v, err return v, err
} }
func (s Storage) GetEventThreads(ctx context.Context, ID string) ([]model.Thread, error) {
return s.selectThreadsWhere(ctx, "EventID = $1", ID)
}
func (s Storage) GetThreadMessages(ctx context.Context, ID string) ([]model.Message, error) {
return s.selectMessagesWhere(ctx, "ThreadID = $1", ID)
}
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error { func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
return s.upsert(ctx, "threads", thread) return s.upsert(ctx, "threads", thread)
} }
func (s Storage) selectThreadsWhere(ctx context.Context, clause string, args ...any) ([]model.Thread, error) {
keys, _, _, _, err := keysArgsKeyargsValues(model.Thread{})
if err != nil {
return nil, err
}
args2 := make([]any, len(args))
for i := range args {
args2[i], _ = json.Marshal(args[i])
}
scanTargets := make([]any, len(keys))
q := fmt.Sprintf(`
SELECT %s FROM threads WHERE %s
ORDER BY TS ASC
`, strings.Join(keys, ", "), clause)
rows, err := s.driver.QueryContext(ctx, q, args2...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []model.Thread
for rows.Next() {
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := rows.Scan(scanTargets...); err != nil {
return nil, err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
var one model.Thread
if err := json.Unmarshal(b, &one); err != nil {
return nil, err
}
result = append(result, one)
}
return result, rows.Err()
}
func (s Storage) selectMessagesWhere(ctx context.Context, clause string, args ...any) ([]model.Message, error) {
keys, _, _, _, err := keysArgsKeyargsValues(model.Message{})
if err != nil {
return nil, err
}
args2 := make([]any, len(args))
for i := range args {
args2[i], _ = json.Marshal(args[i])
}
scanTargets := make([]any, len(keys))
q := fmt.Sprintf(`
SELECT %s FROM messages WHERE %s
ORDER BY TS ASC
`, strings.Join(keys, ", "), clause)
rows, err := s.driver.QueryContext(ctx, q, args2...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []model.Message
for rows.Next() {
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := rows.Scan(scanTargets...); err != nil {
return nil, err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
var one model.Message
if err := json.Unmarshal(b, &one); err != nil {
return nil, err
}
result = append(result, one)
}
return result, rows.Err()
}
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error { func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
if questions := strings.Count(clause, "$"); questions != len(args) { if questions := strings.Count(clause, "$"); questions != len(args) {
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args)) return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))

View File

@@ -2,6 +2,8 @@ package main
import ( import (
"context" "context"
"fmt"
"math/rand"
"testing" "testing"
"time" "time"
@@ -85,4 +87,64 @@ func TestStorage(t *testing.T) {
t.Fatal("unexpected result from get:", got) t.Fatal("unexpected result from get:", got)
} }
}) })
t.Run("get thread messages", func(t *testing.T) {
thread := fmt.Sprintf("thread-%d", rand.Int())
m := model.NewMessage(
"ID",
1,
"Author",
"Plaintext",
thread,
)
if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if m2, err := s.GetMessage(ctx, m.ID); err != nil {
t.Fatal("unexpected error on upsert-get:", err)
} else if m2 != m {
t.Errorf("expected %+v but got %+v", m, m2)
}
msgs, err := s.GetThreadMessages(ctx, thread)
if err != nil {
t.Fatal(err)
} else if len(msgs) != 1 {
t.Fatal(msgs)
} else if msgs[0].ThreadID != m.ThreadID {
t.Fatal(msgs[0].ThreadID)
} else if msgs[0] != m {
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
}
})
t.Run("get event threads", func(t *testing.T) {
event := fmt.Sprintf("event-%d", rand.Int())
m := model.NewThread(
"ID",
"URL",
1,
"Channel",
event,
)
if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if m2, err := s.GetThread(ctx, m.ID); err != nil {
t.Fatal("unexpected error on upsert-get:", err)
} else if m2 != m {
t.Errorf("expected %+v but got %+v", m, m2)
}
msgs, err := s.GetEventThreads(ctx, event)
if err != nil {
t.Fatal(err)
} else if len(msgs) != 1 {
t.Fatal(msgs)
} else if msgs[0].EventID != m.EventID {
t.Fatal(msgs[0].EventID)
} else if msgs[0] != m {
t.Fatalf("wanted msgs like %+v but got %+v", m, msgs[0])
}
})
} }

View File

@@ -29,7 +29,8 @@
"attachments": [ "attachments": [
{ {
"id": 1, "id": 1,
"color": "F4511E", "realcolor": "F4511E",
"color": "2ecc71",
"fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", "fallback": "New alert: \"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/38152bc5-bc5d-411d-9feb-d285af5b6481-1712927439305|11071>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", "text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed", "title": "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",