revive message and test slack pipeline parses slack into message
parent
eec5c39725
commit
580068d98b
2
go.mod
2
go.mod
|
|
@ -8,11 +8,13 @@ require (
|
|||
github.com/lib/pq v1.10.9
|
||||
github.com/nikolaydubina/llama2.go v0.7.1
|
||||
github.com/tmc/langchaingo v0.1.8
|
||||
gotest.tools/v3 v3.5.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dlclark/regexp2 v1.10.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -6,6 +6,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
|
|||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
|
||||
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
|
||||
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
|
|
@ -32,6 +34,8 @@ golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
|||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||
|
|
|
|||
32
slack.go
32
slack.go
|
|
@ -2,13 +2,35 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type SlackIngestion struct {
|
||||
slackToMessage Pipeline
|
||||
type SlackToMessage struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewSlackIngestion(ctx context.Context, driver Driver) (SlackIngestion, error) {
|
||||
return SlackIngestion{}, io.EOF
|
||||
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newSlackToMessageProcess(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newSlackToMessageProcess(cfg Config) processFunc {
|
||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
||||
}
|
||||
return m.Serialize(), nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,50 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
func TestSlackToMessagePipeline(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer can()
|
||||
|
||||
driver, _ := NewDriver(ctx, "/tmp/f")
|
||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
if err := pipeline.Process(ctx); err != nil && ctx.Err() == nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
want := Message{
|
||||
ID: "1712927439.728409/1712927439",
|
||||
TS: 1712927439,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712927439.728409",
|
||||
EventName: "",
|
||||
Event: "11071",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
}
|
||||
|
||||
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
|
||||
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
t.Fatal("failed to enqueue", err)
|
||||
}
|
||||
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
|
||||
t.Fatal("failed to syn", err)
|
||||
} else if m := MustDeserialize(b2); false {
|
||||
} else {
|
||||
assert.DeepEqual(t, want, m)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue