Compare commits
9 Commits
44db0c6939
...
709f2ac254
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
709f2ac254 | ||
|
|
ba06796b8c | ||
|
|
f38c183fe8 | ||
|
|
8ae8f47753 | ||
|
|
e372be4288 | ||
|
|
acfd95e5af | ||
|
|
d70a0e313f | ||
|
|
0cecd5ea04 | ||
|
|
a7d5d021d6 |
26
config.go
26
config.go
@@ -30,10 +30,10 @@ type Config struct {
|
||||
DatacenterPattern string
|
||||
EventNamePattern string
|
||||
driver Driver
|
||||
storage Storage
|
||||
ai AI
|
||||
slackToMessagePipeline Pipeline
|
||||
messageToPersistencePipeline Pipeline
|
||||
persistenceToNormalizedPipeline Pipeline
|
||||
slackToModelPipeline Pipeline
|
||||
modelToPersistencePipeline Pipeline
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -119,6 +119,12 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
return Config{}, errors.New("not impl")
|
||||
}
|
||||
|
||||
storage, err := NewStorage(ctx, result.driver)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.storage = storage
|
||||
|
||||
if result.OllamaURL != "" {
|
||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
||||
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
||||
@@ -127,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
result.ai = NewAINoop()
|
||||
}
|
||||
|
||||
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
|
||||
slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.slackToMessagePipeline = slackToMessagePipeline
|
||||
result.slackToModelPipeline = slackToModelPipeline
|
||||
|
||||
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
|
||||
modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.messageToPersistencePipeline = messageToPersistencePipeline
|
||||
|
||||
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
|
||||
result.modelToPersistencePipeline = modelToPersistencePipeline
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -17,8 +17,12 @@ type Driver struct {
|
||||
*sql.DB
|
||||
}
|
||||
|
||||
func NewTestDriver(t *testing.T) Driver {
|
||||
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
|
||||
func NewTestDriver(t *testing.T, optionalP ...string) Driver {
|
||||
p := path.Join(t.TempDir(), "db")
|
||||
if len(optionalP) > 0 {
|
||||
p = optionalP[0]
|
||||
}
|
||||
driver, err := NewDriver(context.Background(), p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -16,9 +16,11 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
gotest.tools v2.2.0+incompatible // indirect
|
||||
modernc.org/libc v1.22.5 // indirect
|
||||
modernc.org/mathutil v1.5.0 // indirect
|
||||
modernc.org/memory v1.5.0 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@@ -18,6 +18,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
@@ -34,6 +36,8 @@ golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||
|
||||
9
main.go
9
main.go
@@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-processPipelines(ctx,
|
||||
cfg.slackToMessagePipeline,
|
||||
cfg.messageToPersistencePipeline,
|
||||
cfg.persistenceToNormalizedPipeline,
|
||||
cfg.slackToModelPipeline,
|
||||
cfg.modelToPersistencePipeline,
|
||||
):
|
||||
return err
|
||||
case err := <-listenAndServe(ctx, cfg):
|
||||
@@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
@@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||
log.Printf("failed to ingest: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
||||
@@ -45,7 +45,6 @@ func TestRun(t *testing.T) {
|
||||
cfg.EventNamePattern = renderEventNamePattern
|
||||
cfg.Port = port
|
||||
cfg.driver = NewTestDriver(t)
|
||||
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
||||
cfg.SlackToken = "redacted"
|
||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||
|
||||
@@ -96,7 +95,7 @@ func TestRun(t *testing.T) {
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
var result struct {
|
||||
Messages []Message
|
||||
Messages []any
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -189,7 +188,7 @@ func TestRun(t *testing.T) {
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Thread []Message
|
||||
Thread []any
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
157
message_test.go
157
message_test.go
@@ -1,157 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseSlackTestdata(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := map[string]struct {
|
||||
inCh string
|
||||
slackMessage slackMessage
|
||||
message Message
|
||||
}{
|
||||
"human_thread_message_from_opsgenie_alert.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712930706,
|
||||
Event: slackEvent{
|
||||
ID: "1712930706.598629",
|
||||
Channel: "C06U1DDBBU4",
|
||||
ParentID: "1712927439.728409",
|
||||
Text: "I gotta do this",
|
||||
Blocks: []slackBlock{{
|
||||
Elements: []slackElement{{
|
||||
Elements: []slackElement{{
|
||||
RichText: "I gotta do this",
|
||||
}},
|
||||
}},
|
||||
}},
|
||||
Bot: slackBot{
|
||||
Name: "",
|
||||
},
|
||||
Attachments: []slackAttachment{},
|
||||
},
|
||||
},
|
||||
message: Message{
|
||||
ID: "1712927439.728409/1712930706",
|
||||
TS: 1712930706,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712927439.728409",
|
||||
EventName: "",
|
||||
Event: "",
|
||||
Plaintext: "I gotta do this",
|
||||
Asset: "",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712927439,
|
||||
Event: slackEvent{
|
||||
ID: "1712927439.728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Bot: slackBot{
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "F4511E",
|
||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
{Value: "P3", Title: "Priority"},
|
||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||
},
|
||||
Actions: []slackAction{{}, {}, {}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
message: Message{
|
||||
ID: "1712927439.728409/1712927439",
|
||||
TS: 1712927439,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712927439.728409",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11071",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert_resolved.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712916339,
|
||||
Event: slackEvent{
|
||||
ID: "1712916339.000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Bot: slackBot{
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "2ecc71",
|
||||
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
{Value: "P3", Title: "Priority"},
|
||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||
},
|
||||
Actions: []slackAction{},
|
||||
}},
|
||||
},
|
||||
},
|
||||
message: Message{
|
||||
ID: "1712916339.000300/1712916339",
|
||||
TS: 1712916339,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712916339.000300",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11069",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "",
|
||||
Resolved: true,
|
||||
},
|
||||
},
|
||||
"reingested_alert.json": {
|
||||
inCh: "C06U1DDBBU4",
|
||||
message: Message{
|
||||
ID: "1712892637.037639/1712892637",
|
||||
TS: 1712892637,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712892637.037639",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11061",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "",
|
||||
Resolved: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, d := range cases {
|
||||
want := d
|
||||
t.Run(name, func(t *testing.T) {
|
||||
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
|
||||
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != want.message {
|
||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
||||
}
|
||||
if time := got.Time(); time.Unix() != int64(got.TS) {
|
||||
t.Error("not unix time", got.TS, time)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,37 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
type Event struct {
|
||||
Updated uint64
|
||||
ID string
|
||||
URL string
|
||||
TS uint64
|
||||
Name string
|
||||
Asset string
|
||||
Datacenter string
|
||||
Team string
|
||||
Resolved bool
|
||||
}
|
||||
|
||||
func NewEvent(ID, URL string, TS uint64, Name, Asset, Datacenter, Team string, Resolved bool) Event {
|
||||
return Event{
|
||||
Updated: updated(),
|
||||
ID: ID,
|
||||
URL: URL,
|
||||
TS: TS,
|
||||
Name: Name,
|
||||
Asset: Asset,
|
||||
Datacenter: Datacenter,
|
||||
Team: Team,
|
||||
Resolved: Resolved,
|
||||
}
|
||||
}
|
||||
|
||||
func (e Event) Empty() bool {
|
||||
return e == (Event{})
|
||||
}
|
||||
|
||||
func updated() uint64 {
|
||||
return uint64(time.Now().UnixNano() / int64(time.Millisecond))
|
||||
}
|
||||
|
||||
@@ -1 +1,28 @@
|
||||
package model
|
||||
|
||||
// THREAD ||--|{ MESSAGE: "populated by"
|
||||
type Message struct {
|
||||
Updated uint64
|
||||
ID string
|
||||
URL string
|
||||
TS uint64
|
||||
Author string
|
||||
Plaintext string
|
||||
ThreadID string
|
||||
}
|
||||
|
||||
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
||||
return Message{
|
||||
Updated: updated(),
|
||||
ID: ID,
|
||||
URL: URL,
|
||||
TS: TS,
|
||||
Author: Author,
|
||||
Plaintext: Plaintext,
|
||||
ThreadID: ThreadID,
|
||||
}
|
||||
}
|
||||
|
||||
func (m Message) Empty() bool {
|
||||
return m == (Message{})
|
||||
}
|
||||
|
||||
@@ -7,10 +7,12 @@ erDiagram
|
||||
THREAD ||--|{ MESSAGE: "populated by"
|
||||
|
||||
MESSAGE {
|
||||
|
||||
ID str
|
||||
URL str
|
||||
TS number
|
||||
Plaintext str
|
||||
Author str
|
||||
}
|
||||
THREAD {
|
||||
ID str
|
||||
@@ -23,5 +25,6 @@ erDiagram
|
||||
Asset str
|
||||
Resolved bool
|
||||
Datacenter str
|
||||
Team str
|
||||
}
|
||||
`
|
||||
|
||||
@@ -1 +1,26 @@
|
||||
package model
|
||||
|
||||
// EVENT ||--|{ THREAD: "spawns"
|
||||
type Thread struct {
|
||||
Updated uint64
|
||||
ID string
|
||||
URL string
|
||||
TS uint64
|
||||
Channel string
|
||||
EventID string
|
||||
}
|
||||
|
||||
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
||||
return Thread{
|
||||
Updated: updated(),
|
||||
ID: ID,
|
||||
URL: URL,
|
||||
TS: TS,
|
||||
Channel: Channel,
|
||||
EventID: EventID,
|
||||
}
|
||||
}
|
||||
|
||||
func (t Thread) Empty() bool {
|
||||
return t == (Thread{})
|
||||
}
|
||||
|
||||
32
normalize.go
32
normalize.go
@@ -1,32 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type PersistenceToNormalized struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newPersistenceToNormalizedProcess(cfg.driver),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
|
||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||
return nil, errors.New("not impl")
|
||||
}
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPersistenceToNormalizedProcessor(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
process := newPersistenceToNormalizedProcess(d)
|
||||
|
||||
_, _ = ctx, process
|
||||
}
|
||||
@@ -2,14 +2,14 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type MessageToPersistence struct {
|
||||
type ModelToPersistence struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
@@ -21,27 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newMessageToPersistenceProcess(cfg.driver),
|
||||
process: newModelToPersistenceProcess(cfg.driver),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
||||
func newModelToPersistenceProcess(driver Driver) processFunc {
|
||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||
m, err := Deserialize(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
|
||||
INSERT INTO messages (id, v) VALUES (?, ?)
|
||||
ON CONFLICT(id) DO UPDATE set v = ?;
|
||||
`, m.ID, msg, msg); err != nil {
|
||||
return nil, err
|
||||
} else if n, err := result.RowsAffected(); err != nil {
|
||||
return nil, err
|
||||
} else if n != 1 {
|
||||
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
|
||||
}
|
||||
return msg, nil
|
||||
return nil, errors.New("not impl")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,29 +6,13 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMessageToPersistenceProcessor(t *testing.T) {
|
||||
func TestModelToPersistenceProcessor(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
process := newMessageToPersistenceProcess(d)
|
||||
process := newModelToPersistenceProcess(d)
|
||||
|
||||
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||
t.Fatal("failed to upsert on redundant process", err)
|
||||
}
|
||||
|
||||
var id, v []byte
|
||||
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
||||
if err := row.Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := row.Scan(&id, &v); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(id) != "x" {
|
||||
t.Fatal(string(id))
|
||||
} else if string(v) != `{"ID":"x"}` {
|
||||
t.Fatal(string(v))
|
||||
}
|
||||
_, _ = ctx, process
|
||||
}
|
||||
|
||||
217
slack.go
217
slack.go
@@ -2,14 +2,32 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
type SlackToMessage struct {
|
||||
var (
|
||||
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
|
||||
)
|
||||
|
||||
type SlackToModel struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
type Models struct {
|
||||
Event model.Event
|
||||
Message model.Message
|
||||
Thread model.Thread
|
||||
}
|
||||
|
||||
func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
@@ -21,16 +39,203 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newSlackToMessageProcess(cfg),
|
||||
process: newSlackToModelProcess(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newSlackToMessageProcess(cfg Config) processFunc {
|
||||
func newSlackToModelProcess(cfg Config) processFunc {
|
||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
s, err := parseSlack(slack)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
||||
}
|
||||
return m.Serialize(), nil
|
||||
|
||||
for pattern, ptr := range map[string]*string{
|
||||
cfg.AssetPattern: &s.Asset,
|
||||
cfg.DatacenterPattern: &s.Datacenter,
|
||||
cfg.EventNamePattern: &s.EventName,
|
||||
} {
|
||||
r := regexp.MustCompile(pattern)
|
||||
parsed := r.FindString(*ptr)
|
||||
for i, name := range r.SubexpNames() {
|
||||
if i > 0 && name != "" {
|
||||
parsed = r.FindStringSubmatch(*ptr)[i]
|
||||
}
|
||||
}
|
||||
*ptr = parsed
|
||||
}
|
||||
|
||||
event := model.Event{}
|
||||
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
|
||||
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
|
||||
}
|
||||
message := model.Message{}
|
||||
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
|
||||
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
|
||||
}
|
||||
thread := model.Thread{}
|
||||
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||
}
|
||||
return json.Marshal(Models{
|
||||
Event: event,
|
||||
Message: message,
|
||||
Thread: thread,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
parsedSlackMessage struct {
|
||||
ID string
|
||||
TS uint64
|
||||
Source string
|
||||
Channel string
|
||||
Thread string
|
||||
EventName string
|
||||
Event string
|
||||
Plaintext string
|
||||
Asset string
|
||||
Resolved bool
|
||||
Datacenter string
|
||||
}
|
||||
|
||||
slackMessage struct {
|
||||
slackEvent
|
||||
Type string
|
||||
TS uint64 `json:"event_time"`
|
||||
Event slackEvent
|
||||
MessageTS string `json:"ts"`
|
||||
}
|
||||
|
||||
slackEvent struct {
|
||||
ID string `json:"event_ts"`
|
||||
Channel string
|
||||
// rewrites
|
||||
Nested *slackEvent `json:"message"`
|
||||
PreviousMessage *slackEvent `json:"previous_message"`
|
||||
// human
|
||||
ParentID string `json:"thread_ts"`
|
||||
Text string
|
||||
Blocks []slackBlock
|
||||
// bot
|
||||
Bot slackBot `json:"bot_profile"`
|
||||
Attachments []slackAttachment
|
||||
}
|
||||
|
||||
slackBlock struct {
|
||||
Elements []slackElement
|
||||
}
|
||||
|
||||
slackElement struct {
|
||||
Elements []slackElement
|
||||
RichText string `json:"text"`
|
||||
}
|
||||
|
||||
slackBot struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
slackAttachment struct {
|
||||
Color string
|
||||
Title string
|
||||
Text string
|
||||
Fields []slackField
|
||||
Actions []slackAction
|
||||
}
|
||||
|
||||
slackField struct {
|
||||
Value string
|
||||
Title string
|
||||
}
|
||||
|
||||
slackAction struct{}
|
||||
)
|
||||
|
||||
func parseSlack(b []byte) (parsedSlackMessage, error) {
|
||||
s, err := _parseSlack(b)
|
||||
if err != nil {
|
||||
return parsedSlackMessage{}, err
|
||||
}
|
||||
|
||||
/*
|
||||
if ch != "" {
|
||||
s.Event.Channel = ch
|
||||
}
|
||||
*/
|
||||
|
||||
if s.Event.Bot.Name != "" {
|
||||
if len(s.Event.Attachments) == 0 {
|
||||
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
|
||||
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||
}
|
||||
var tagsField string
|
||||
for _, field := range s.Event.Attachments[0].Fields {
|
||||
if field.Title == "Tags" {
|
||||
tagsField = field.Value
|
||||
}
|
||||
}
|
||||
return parsedSlackMessage{
|
||||
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
|
||||
TS: s.TS,
|
||||
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")),
|
||||
Channel: s.Event.Channel,
|
||||
Thread: s.Event.ID,
|
||||
EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1],
|
||||
Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"),
|
||||
Plaintext: s.Event.Attachments[0].Text,
|
||||
Asset: s.Event.Attachments[0].Text,
|
||||
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||
Datacenter: tagsField,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if s.Event.ParentID == "" {
|
||||
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||
}
|
||||
return parsedSlackMessage{
|
||||
ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS),
|
||||
TS: s.TS,
|
||||
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")),
|
||||
Channel: s.Event.Channel,
|
||||
Thread: s.Event.ParentID,
|
||||
EventName: "",
|
||||
Event: "",
|
||||
Plaintext: s.Event.Text,
|
||||
Asset: "",
|
||||
Datacenter: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func _parseSlack(b []byte) (slackMessage, error) {
|
||||
var result slackMessage
|
||||
err := json.Unmarshal(b, &result)
|
||||
switch result.Type {
|
||||
case "message":
|
||||
result.Event = result.slackEvent
|
||||
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
|
||||
result.Event.ID = result.MessageTS
|
||||
}
|
||||
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
||||
result.Event.Blocks = result.Event.Nested.Blocks
|
||||
result.Event.Bot = result.Event.Nested.Bot
|
||||
result.Event.Attachments = result.Event.Nested.Attachments
|
||||
result.Event.Nested = nil
|
||||
}
|
||||
if result.Event.PreviousMessage != nil {
|
||||
if result.Event.PreviousMessage.ID != "" {
|
||||
result.Event.ID = result.Event.PreviousMessage.ID
|
||||
}
|
||||
result.Event.PreviousMessage = nil
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (this slackEvent) Empty() bool {
|
||||
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
|
||||
}
|
||||
|
||||
func (this parsedSlackMessage) Time() time.Time {
|
||||
return time.Unix(int64(this.TS), 0)
|
||||
}
|
||||
|
||||
207
slack_test.go
207
slack_test.go
@@ -2,19 +2,27 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gotest.tools/v3/assert"
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
"gotest.tools/assert"
|
||||
)
|
||||
|
||||
func TestSlackToMessagePipeline(t *testing.T) {
|
||||
func TestSlackToModelPipeline(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer can()
|
||||
|
||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
|
||||
pipeline, err := NewSlackToModelPipeline(ctx, Config{
|
||||
driver: NewTestDriver(t),
|
||||
AssetPattern: renderAssetPattern,
|
||||
DatacenterPattern: renderDatacenterPattern,
|
||||
EventNamePattern: renderEventNamePattern,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -24,7 +32,33 @@ func TestSlackToMessagePipeline(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
want := Message{
|
||||
want := Models{
|
||||
Event: model.NewEvent(
|
||||
"11071",
|
||||
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
1712927439,
|
||||
"Alertconfig Workflow Failed",
|
||||
"",
|
||||
"",
|
||||
"TODO",
|
||||
false,
|
||||
),
|
||||
Message: model.NewMessage(
|
||||
"1712927439.728409/1712927439",
|
||||
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
1712927439,
|
||||
"TODO",
|
||||
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
"1712927439.728409",
|
||||
),
|
||||
Thread: model.NewThread(
|
||||
"1712927439.728409",
|
||||
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
1712927439,
|
||||
"C06U1DDBBU4",
|
||||
"11071",
|
||||
),
|
||||
/*
|
||||
ID: "1712927439.728409/1712927439",
|
||||
TS: 1712927439,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
@@ -35,16 +69,177 @@ func TestSlackToMessagePipeline(t *testing.T) {
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
*/
|
||||
}
|
||||
|
||||
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
|
||||
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
|
||||
t.Fatal("failed to enqueue", err)
|
||||
}
|
||||
var got Models
|
||||
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
|
||||
t.Fatal("failed to syn", err)
|
||||
} else if m := MustDeserialize(b2); false {
|
||||
} else if err := json.Unmarshal(b2, &got); err != nil {
|
||||
t.Fatal("failed to parse outqueue:", err)
|
||||
} else {
|
||||
assert.DeepEqual(t, want, m)
|
||||
want.Event.Updated = 0
|
||||
want.Message.Updated = 0
|
||||
want.Thread.Updated = 0
|
||||
got.Event.Updated = 0
|
||||
got.Message.Updated = 0
|
||||
got.Thread.Updated = 0
|
||||
assert.DeepEqual(t, want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSlackTestdata(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := map[string]struct {
|
||||
slackMessage slackMessage
|
||||
message parsedSlackMessage
|
||||
}{
|
||||
"human_thread_message_from_opsgenie_alert.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712930706,
|
||||
Event: slackEvent{
|
||||
ID: "1712930706.598629",
|
||||
Channel: "C06U1DDBBU4",
|
||||
ParentID: "1712927439.728409",
|
||||
Text: "I gotta do this",
|
||||
Blocks: []slackBlock{{
|
||||
Elements: []slackElement{{
|
||||
Elements: []slackElement{{
|
||||
RichText: "I gotta do this",
|
||||
}},
|
||||
}},
|
||||
}},
|
||||
Bot: slackBot{
|
||||
Name: "",
|
||||
},
|
||||
Attachments: []slackAttachment{},
|
||||
},
|
||||
},
|
||||
message: parsedSlackMessage{
|
||||
ID: "1712927439.728409/1712930706",
|
||||
TS: 1712930706,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712927439.728409",
|
||||
EventName: "",
|
||||
Event: "",
|
||||
Plaintext: "I gotta do this",
|
||||
Asset: "",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712927439,
|
||||
Event: slackEvent{
|
||||
ID: "1712927439.728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Bot: slackBot{
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "F4511E",
|
||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
{Value: "P3", Title: "Priority"},
|
||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||
},
|
||||
Actions: []slackAction{{}, {}, {}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
message: parsedSlackMessage{
|
||||
ID: "1712927439.728409/1712927439",
|
||||
TS: 1712927439,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712927439.728409",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11071",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert_resolved.json": {
|
||||
slackMessage: slackMessage{
|
||||
TS: 1712916339,
|
||||
Event: slackEvent{
|
||||
ID: "1712916339.000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Bot: slackBot{
|
||||
Name: "Opsgenie for Alert Management",
|
||||
},
|
||||
Attachments: []slackAttachment{{
|
||||
Color: "2ecc71",
|
||||
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Fields: []slackField{
|
||||
{Value: "P3", Title: "Priority"},
|
||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||
},
|
||||
Actions: []slackAction{},
|
||||
}},
|
||||
},
|
||||
},
|
||||
message: parsedSlackMessage{
|
||||
ID: "1712916339.000300/1712916339",
|
||||
TS: 1712916339,
|
||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
||||
Channel: "C06U1DDBBU4",
|
||||
Thread: "1712916339.000300",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11069",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Resolved: true,
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
},
|
||||
},
|
||||
"reingested_alert.json": {
|
||||
message: parsedSlackMessage{
|
||||
ID: "1712892637.037639/1712892637",
|
||||
TS: 1712892637,
|
||||
Source: "https://renderinc.slack.com/archives//p1712892637037639",
|
||||
//Channel: "C06U1DDBBU4",
|
||||
Thread: "1712892637.037639",
|
||||
EventName: "Alertconfig Workflow Failed",
|
||||
Event: "11061",
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Resolved: true,
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, d := range cases {
|
||||
want := d
|
||||
t.Run(name, func(t *testing.T) {
|
||||
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("parseSlack", func(t *testing.T) {
|
||||
got, err := parseSlack(b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got != want.message {
|
||||
assert.DeepEqual(t, want.message, got)
|
||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
||||
}
|
||||
if time := got.Time(); time.Unix() != int64(got.TS) {
|
||||
t.Error("not unix time", got.TS, time)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
159
storage.go
Normal file
159
storage.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
type Storage struct {
|
||||
driver Driver
|
||||
}
|
||||
|
||||
func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
|
||||
if _, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS events (ID TEXT UNIQUE);
|
||||
CREATE TABLE IF NOT EXISTS messages (ID TEXT UNIQUE);
|
||||
CREATE TABLE IF NOT EXISTS threads (ID TEXT UNIQUE);
|
||||
`); err != nil {
|
||||
return Storage{}, err
|
||||
}
|
||||
|
||||
for table, v := range map[string]any{
|
||||
"events": model.Event{},
|
||||
"messages": model.Message{},
|
||||
"threads": model.Thread{},
|
||||
} {
|
||||
b, _ := json.Marshal(v)
|
||||
var m map[string]struct{}
|
||||
json.Unmarshal(b, &m)
|
||||
for k := range m {
|
||||
if k == `ID` {
|
||||
continue
|
||||
}
|
||||
driver.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s TEXT`, table, k))
|
||||
}
|
||||
}
|
||||
|
||||
return Storage{driver: driver}, nil
|
||||
}
|
||||
|
||||
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
|
||||
v := model.Event{}
|
||||
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
|
||||
return s.upsert(ctx, "events", event)
|
||||
}
|
||||
|
||||
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
|
||||
v := model.Message{}
|
||||
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error {
|
||||
return s.upsert(ctx, "messages", message)
|
||||
}
|
||||
|
||||
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
|
||||
v := model.Thread{}
|
||||
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||
return s.upsert(ctx, "threads", thread)
|
||||
}
|
||||
|
||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||
if questions := strings.Count(clause, "?"); questions != len(args) {
|
||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||
}
|
||||
|
||||
keys, _, _, _, err := keysArgsKeyargsValues(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range args {
|
||||
args[i], _ = json.Marshal(args[i])
|
||||
}
|
||||
|
||||
q := fmt.Sprintf(`
|
||||
SELECT %s FROM %s WHERE %s
|
||||
`, strings.Join(keys, ", "), table, clause)
|
||||
row := s.driver.QueryRowContext(ctx, q, args...)
|
||||
if err := row.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scanTargets := make([]any, len(keys))
|
||||
for i := range scanTargets {
|
||||
scanTargets[i] = &[]byte{}
|
||||
}
|
||||
if err := row.Scan(scanTargets...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := map[string]json.RawMessage{}
|
||||
for i, k := range keys {
|
||||
m[k] = *scanTargets[i].(*[]byte)
|
||||
}
|
||||
b, _ := json.Marshal(m)
|
||||
return json.Unmarshal(b, v)
|
||||
}
|
||||
|
||||
func (s Storage) upsert(ctx context.Context, table string, v any) error {
|
||||
keys, args, keyArgs, values, err := keysArgsKeyargsValues(v)
|
||||
if err != nil || len(keys) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range keys {
|
||||
values = append(values, values[i])
|
||||
}
|
||||
|
||||
q := fmt.Sprintf(`
|
||||
INSERT INTO %s (%s) VALUES (%s)
|
||||
ON CONFLICT (ID) DO UPDATE SET %s
|
||||
`, table, strings.Join(keys, ", "), strings.Join(args, ", "), strings.Join(keyArgs, ", "))
|
||||
if result, err := s.driver.ExecContext(ctx, q, values...); err != nil {
|
||||
return err
|
||||
} else if n, err := result.RowsAffected(); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("UpsertMessage affected %v rows", n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
|
||||
b, _ := json.Marshal(v)
|
||||
var m map[string]json.RawMessage
|
||||
err := json.Unmarshal(b, &m)
|
||||
|
||||
keys := []string{}
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
args := make([]string, len(keys))
|
||||
for i := range args {
|
||||
args[i] = "?"
|
||||
}
|
||||
keyArgs := make([]string, len(keys))
|
||||
for i := range keyArgs {
|
||||
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
|
||||
}
|
||||
values := make([]any, len(keys))
|
||||
for i := range values {
|
||||
values[i] = []byte(m[keys[i]])
|
||||
}
|
||||
|
||||
return keys, args, keyArgs, values, err
|
||||
}
|
||||
89
storage_test.go
Normal file
89
storage_test.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
func TestStorage(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer can()
|
||||
|
||||
s, err := NewStorage(ctx, NewTestDriver(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("upsert get event", func(t *testing.T) {
|
||||
m := model.NewEvent(
|
||||
"ID",
|
||||
"URL",
|
||||
1,
|
||||
"Name",
|
||||
"Asset",
|
||||
"Datacenter",
|
||||
"Team",
|
||||
true,
|
||||
)
|
||||
|
||||
if err := s.UpsertEvent(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on insert:", err)
|
||||
} else if err := s.UpsertEvent(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on noop update:", err)
|
||||
}
|
||||
|
||||
if got, err := s.GetEvent(ctx, m.ID); err != nil {
|
||||
t.Fatal("unexpected error on get:", err)
|
||||
} else if got != m {
|
||||
t.Fatal("unexpected result from get:", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("upsert get thread", func(t *testing.T) {
|
||||
m := model.NewThread(
|
||||
"ID",
|
||||
"URL",
|
||||
1,
|
||||
"Channel",
|
||||
"EventID",
|
||||
)
|
||||
|
||||
if err := s.UpsertThread(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on insert:", err)
|
||||
} else if err := s.UpsertThread(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on noop update:", err)
|
||||
}
|
||||
|
||||
if got, err := s.GetThread(ctx, m.ID); err != nil {
|
||||
t.Fatal("unexpected error on get:", err)
|
||||
} else if got != m {
|
||||
t.Fatal("unexpected result from get:", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("upsert get message", func(t *testing.T) {
|
||||
m := model.NewMessage(
|
||||
"ID",
|
||||
"URL",
|
||||
1,
|
||||
"Author",
|
||||
"Plaintext",
|
||||
"ThreadID",
|
||||
)
|
||||
|
||||
if err := s.UpsertMessage(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on insert:", err)
|
||||
} else if err := s.UpsertMessage(ctx, m); err != nil {
|
||||
t.Fatal("unexpected error on noop update:", err)
|
||||
}
|
||||
|
||||
if got, err := s.GetMessage(ctx, m.ID); err != nil {
|
||||
t.Fatal("unexpected error on get:", err)
|
||||
} else if got != m {
|
||||
t.Fatal("unexpected result from get:", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user