Compare commits
9 Commits
44db0c6939
...
709f2ac254
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
709f2ac254 | ||
|
|
ba06796b8c | ||
|
|
f38c183fe8 | ||
|
|
8ae8f47753 | ||
|
|
e372be4288 | ||
|
|
acfd95e5af | ||
|
|
d70a0e313f | ||
|
|
0cecd5ea04 | ||
|
|
a7d5d021d6 |
62
config.go
62
config.go
@@ -13,27 +13,27 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
Debug bool
|
Debug bool
|
||||||
InitializeSlack bool
|
InitializeSlack bool
|
||||||
SlackToken string
|
SlackToken string
|
||||||
SlackChannels []string
|
SlackChannels []string
|
||||||
DriverConn string
|
DriverConn string
|
||||||
BasicAuthUser string
|
BasicAuthUser string
|
||||||
BasicAuthPassword string
|
BasicAuthPassword string
|
||||||
FillWithTestdata bool
|
FillWithTestdata bool
|
||||||
OllamaURL string
|
OllamaURL string
|
||||||
OllamaModel string
|
OllamaModel string
|
||||||
LocalCheckpoint string
|
LocalCheckpoint string
|
||||||
LocalTokenizer string
|
LocalTokenizer string
|
||||||
AssetPattern string
|
AssetPattern string
|
||||||
DatacenterPattern string
|
DatacenterPattern string
|
||||||
EventNamePattern string
|
EventNamePattern string
|
||||||
driver Driver
|
driver Driver
|
||||||
ai AI
|
storage Storage
|
||||||
slackToMessagePipeline Pipeline
|
ai AI
|
||||||
messageToPersistencePipeline Pipeline
|
slackToModelPipeline Pipeline
|
||||||
persistenceToNormalizedPipeline Pipeline
|
modelToPersistencePipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -119,6 +119,12 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
return Config{}, errors.New("not impl")
|
return Config{}, errors.New("not impl")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
storage, err := NewStorage(ctx, result.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.storage = storage
|
||||||
|
|
||||||
if result.OllamaURL != "" {
|
if result.OllamaURL != "" {
|
||||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
||||||
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
|
||||||
@@ -127,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
result.ai = NewAINoop()
|
result.ai = NewAINoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
|
slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
result.slackToMessagePipeline = slackToMessagePipeline
|
result.slackToModelPipeline = slackToModelPipeline
|
||||||
|
|
||||||
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
|
modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
result.messageToPersistencePipeline = messageToPersistencePipeline
|
result.modelToPersistencePipeline = modelToPersistencePipeline
|
||||||
|
|
||||||
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
|
|
||||||
if err != nil {
|
|
||||||
return Config{}, err
|
|
||||||
}
|
|
||||||
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
|
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,12 @@ type Driver struct {
|
|||||||
*sql.DB
|
*sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestDriver(t *testing.T) Driver {
|
func NewTestDriver(t *testing.T, optionalP ...string) Driver {
|
||||||
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
|
p := path.Join(t.TempDir(), "db")
|
||||||
|
if len(optionalP) > 0 {
|
||||||
|
p = optionalP[0]
|
||||||
|
}
|
||||||
|
driver, err := NewDriver(context.Background(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -16,9 +16,11 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/google/go-cmp v0.6.0 // indirect
|
github.com/google/go-cmp v0.6.0 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||||
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
golang.org/x/sys v0.16.0 // indirect
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
|
gotest.tools v2.2.0+incompatible // indirect
|
||||||
modernc.org/libc v1.22.5 // indirect
|
modernc.org/libc v1.22.5 // indirect
|
||||||
modernc.org/mathutil v1.5.0 // indirect
|
modernc.org/mathutil v1.5.0 // indirect
|
||||||
modernc.org/memory v1.5.0 // indirect
|
modernc.org/memory v1.5.0 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -18,6 +18,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
|
|||||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
||||||
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
@@ -34,6 +36,8 @@ golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
|||||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||||
|
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||||
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||||
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||||
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||||
|
|||||||
9
main.go
9
main.go
@@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case err := <-processPipelines(ctx,
|
case err := <-processPipelines(ctx,
|
||||||
cfg.slackToMessagePipeline,
|
cfg.slackToModelPipeline,
|
||||||
cfg.messageToPersistencePipeline,
|
cfg.modelToPersistencePipeline,
|
||||||
cfg.persistenceToNormalizedPipeline,
|
|
||||||
):
|
):
|
||||||
return err
|
return err
|
||||||
case err := <-listenAndServe(ctx, cfg):
|
case err := <-listenAndServe(ctx, cfg):
|
||||||
@@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
errs := []error{}
|
errs := []error{}
|
||||||
for _, messageJSON := range page.Messages {
|
for _, messageJSON := range page.Messages {
|
||||||
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||||
log.Printf("failed to ingest: %v", err)
|
log.Printf("failed to ingest: %v", err)
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -45,7 +45,6 @@ func TestRun(t *testing.T) {
|
|||||||
cfg.EventNamePattern = renderEventNamePattern
|
cfg.EventNamePattern = renderEventNamePattern
|
||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver = NewTestDriver(t)
|
cfg.driver = NewTestDriver(t)
|
||||||
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
|
|
||||||
@@ -96,7 +95,7 @@ func TestRun(t *testing.T) {
|
|||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||||
}
|
}
|
||||||
var result struct {
|
var result struct {
|
||||||
Messages []Message
|
Messages []any
|
||||||
}
|
}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -189,7 +188,7 @@ func TestRun(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var result struct {
|
var result struct {
|
||||||
Thread []Message
|
Thread []any
|
||||||
}
|
}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|||||||
157
message_test.go
157
message_test.go
@@ -1,157 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestParseSlackTestdata(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
cases := map[string]struct {
|
|
||||||
inCh string
|
|
||||||
slackMessage slackMessage
|
|
||||||
message Message
|
|
||||||
}{
|
|
||||||
"human_thread_message_from_opsgenie_alert.json": {
|
|
||||||
slackMessage: slackMessage{
|
|
||||||
TS: 1712930706,
|
|
||||||
Event: slackEvent{
|
|
||||||
ID: "1712930706.598629",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
ParentID: "1712927439.728409",
|
|
||||||
Text: "I gotta do this",
|
|
||||||
Blocks: []slackBlock{{
|
|
||||||
Elements: []slackElement{{
|
|
||||||
Elements: []slackElement{{
|
|
||||||
RichText: "I gotta do this",
|
|
||||||
}},
|
|
||||||
}},
|
|
||||||
}},
|
|
||||||
Bot: slackBot{
|
|
||||||
Name: "",
|
|
||||||
},
|
|
||||||
Attachments: []slackAttachment{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
message: Message{
|
|
||||||
ID: "1712927439.728409/1712930706",
|
|
||||||
TS: 1712930706,
|
|
||||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Thread: "1712927439.728409",
|
|
||||||
EventName: "",
|
|
||||||
Event: "",
|
|
||||||
Plaintext: "I gotta do this",
|
|
||||||
Asset: "",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"opsgenie_alert.json": {
|
|
||||||
slackMessage: slackMessage{
|
|
||||||
TS: 1712927439,
|
|
||||||
Event: slackEvent{
|
|
||||||
ID: "1712927439.728409",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Bot: slackBot{
|
|
||||||
Name: "Opsgenie for Alert Management",
|
|
||||||
},
|
|
||||||
Attachments: []slackAttachment{{
|
|
||||||
Color: "F4511E",
|
|
||||||
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
|
||||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
|
||||||
Fields: []slackField{
|
|
||||||
{Value: "P3", Title: "Priority"},
|
|
||||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
|
||||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
|
||||||
},
|
|
||||||
Actions: []slackAction{{}, {}, {}},
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
message: Message{
|
|
||||||
ID: "1712927439.728409/1712927439",
|
|
||||||
TS: 1712927439,
|
|
||||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Thread: "1712927439.728409",
|
|
||||||
EventName: "Alertconfig Workflow Failed",
|
|
||||||
Event: "11071",
|
|
||||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
|
||||||
Asset: "",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"opsgenie_alert_resolved.json": {
|
|
||||||
slackMessage: slackMessage{
|
|
||||||
TS: 1712916339,
|
|
||||||
Event: slackEvent{
|
|
||||||
ID: "1712916339.000300",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Bot: slackBot{
|
|
||||||
Name: "Opsgenie for Alert Management",
|
|
||||||
},
|
|
||||||
Attachments: []slackAttachment{{
|
|
||||||
Color: "2ecc71",
|
|
||||||
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
|
||||||
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
|
||||||
Fields: []slackField{
|
|
||||||
{Value: "P3", Title: "Priority"},
|
|
||||||
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
|
||||||
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
|
||||||
},
|
|
||||||
Actions: []slackAction{},
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
message: Message{
|
|
||||||
ID: "1712916339.000300/1712916339",
|
|
||||||
TS: 1712916339,
|
|
||||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Thread: "1712916339.000300",
|
|
||||||
EventName: "Alertconfig Workflow Failed",
|
|
||||||
Event: "11069",
|
|
||||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
|
||||||
Asset: "",
|
|
||||||
Resolved: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"reingested_alert.json": {
|
|
||||||
inCh: "C06U1DDBBU4",
|
|
||||||
message: Message{
|
|
||||||
ID: "1712892637.037639/1712892637",
|
|
||||||
TS: 1712892637,
|
|
||||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
|
|
||||||
Channel: "C06U1DDBBU4",
|
|
||||||
Thread: "1712892637.037639",
|
|
||||||
EventName: "Alertconfig Workflow Failed",
|
|
||||||
Event: "11061",
|
|
||||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
|
||||||
Asset: "",
|
|
||||||
Resolved: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, d := range cases {
|
|
||||||
want := d
|
|
||||||
t.Run(name, func(t *testing.T) {
|
|
||||||
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
|
|
||||||
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if got != want.message {
|
|
||||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
|
||||||
}
|
|
||||||
if time := got.Time(); time.Unix() != int64(got.TS) {
|
|
||||||
t.Error("not unix time", got.TS, time)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,37 @@
|
|||||||
package model
|
package model
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
Updated uint64
|
||||||
|
ID string
|
||||||
|
URL string
|
||||||
|
TS uint64
|
||||||
|
Name string
|
||||||
|
Asset string
|
||||||
|
Datacenter string
|
||||||
|
Team string
|
||||||
|
Resolved bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEvent(ID, URL string, TS uint64, Name, Asset, Datacenter, Team string, Resolved bool) Event {
|
||||||
|
return Event{
|
||||||
|
Updated: updated(),
|
||||||
|
ID: ID,
|
||||||
|
URL: URL,
|
||||||
|
TS: TS,
|
||||||
|
Name: Name,
|
||||||
|
Asset: Asset,
|
||||||
|
Datacenter: Datacenter,
|
||||||
|
Team: Team,
|
||||||
|
Resolved: Resolved,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e Event) Empty() bool {
|
||||||
|
return e == (Event{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func updated() uint64 {
|
||||||
|
return uint64(time.Now().UnixNano() / int64(time.Millisecond))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1 +1,28 @@
|
|||||||
package model
|
package model
|
||||||
|
|
||||||
|
// THREAD ||--|{ MESSAGE: "populated by"
|
||||||
|
type Message struct {
|
||||||
|
Updated uint64
|
||||||
|
ID string
|
||||||
|
URL string
|
||||||
|
TS uint64
|
||||||
|
Author string
|
||||||
|
Plaintext string
|
||||||
|
ThreadID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
||||||
|
return Message{
|
||||||
|
Updated: updated(),
|
||||||
|
ID: ID,
|
||||||
|
URL: URL,
|
||||||
|
TS: TS,
|
||||||
|
Author: Author,
|
||||||
|
Plaintext: Plaintext,
|
||||||
|
ThreadID: ThreadID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m Message) Empty() bool {
|
||||||
|
return m == (Message{})
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,10 +7,12 @@ erDiagram
|
|||||||
THREAD ||--|{ MESSAGE: "populated by"
|
THREAD ||--|{ MESSAGE: "populated by"
|
||||||
|
|
||||||
MESSAGE {
|
MESSAGE {
|
||||||
|
|
||||||
ID str
|
ID str
|
||||||
URL str
|
URL str
|
||||||
TS number
|
TS number
|
||||||
Plaintext str
|
Plaintext str
|
||||||
|
Author str
|
||||||
}
|
}
|
||||||
THREAD {
|
THREAD {
|
||||||
ID str
|
ID str
|
||||||
@@ -23,5 +25,6 @@ erDiagram
|
|||||||
Asset str
|
Asset str
|
||||||
Resolved bool
|
Resolved bool
|
||||||
Datacenter str
|
Datacenter str
|
||||||
|
Team str
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
|
|||||||
@@ -1 +1,26 @@
|
|||||||
package model
|
package model
|
||||||
|
|
||||||
|
// EVENT ||--|{ THREAD: "spawns"
|
||||||
|
type Thread struct {
|
||||||
|
Updated uint64
|
||||||
|
ID string
|
||||||
|
URL string
|
||||||
|
TS uint64
|
||||||
|
Channel string
|
||||||
|
EventID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
|
||||||
|
return Thread{
|
||||||
|
Updated: updated(),
|
||||||
|
ID: ID,
|
||||||
|
URL: URL,
|
||||||
|
TS: TS,
|
||||||
|
Channel: Channel,
|
||||||
|
EventID: EventID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t Thread) Empty() bool {
|
||||||
|
return t == (Thread{})
|
||||||
|
}
|
||||||
|
|||||||
32
normalize.go
32
normalize.go
@@ -1,32 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PersistenceToNormalized struct {
|
|
||||||
pipeline Pipeline
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
|
||||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
|
||||||
if err != nil {
|
|
||||||
return Pipeline{}, err
|
|
||||||
}
|
|
||||||
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
|
||||||
if err != nil {
|
|
||||||
return Pipeline{}, err
|
|
||||||
}
|
|
||||||
return Pipeline{
|
|
||||||
writer: writer,
|
|
||||||
reader: reader,
|
|
||||||
process: newPersistenceToNormalizedProcess(cfg.driver),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
|
|
||||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,18 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPersistenceToNormalizedProcessor(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
d := NewTestDriver(t)
|
|
||||||
process := newPersistenceToNormalizedProcess(d)
|
|
||||||
|
|
||||||
_, _ = ctx, process
|
|
||||||
}
|
|
||||||
@@ -2,14 +2,14 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageToPersistence struct {
|
type ModelToPersistence struct {
|
||||||
pipeline Pipeline
|
pipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
@@ -21,27 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
|
|||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newMessageToPersistenceProcess(cfg.driver),
|
process: newModelToPersistenceProcess(cfg.driver),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
func newModelToPersistenceProcess(driver Driver) processFunc {
|
||||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||||
m, err := Deserialize(msg)
|
return nil, errors.New("not impl")
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if result, err := driver.ExecContext(ctx, `
|
|
||||||
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
|
|
||||||
INSERT INTO messages (id, v) VALUES (?, ?)
|
|
||||||
ON CONFLICT(id) DO UPDATE set v = ?;
|
|
||||||
`, m.ID, msg, msg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if n, err := result.RowsAffected(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if n != 1 {
|
|
||||||
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
|
|
||||||
}
|
|
||||||
return msg, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,29 +6,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMessageToPersistenceProcessor(t *testing.T) {
|
func TestModelToPersistenceProcessor(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
d := NewTestDriver(t)
|
d := NewTestDriver(t)
|
||||||
process := newMessageToPersistenceProcess(d)
|
process := newModelToPersistenceProcess(d)
|
||||||
|
|
||||||
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
_, _ = ctx, process
|
||||||
t.Fatal(err)
|
|
||||||
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
|
||||||
t.Fatal("failed to upsert on redundant process", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var id, v []byte
|
|
||||||
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
|
||||||
if err := row.Err(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if err := row.Scan(&id, &v); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if string(id) != "x" {
|
|
||||||
t.Fatal(string(id))
|
|
||||||
} else if string(v) != `{"ID":"x"}` {
|
|
||||||
t.Fatal(string(v))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
217
slack.go
217
slack.go
@@ -2,14 +2,32 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SlackToMessage struct {
|
var (
|
||||||
|
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlackToModel struct {
|
||||||
pipeline Pipeline
|
pipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
type Models struct {
|
||||||
|
Event model.Event
|
||||||
|
Message model.Message
|
||||||
|
Thread model.Thread
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
|
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
@@ -21,16 +39,203 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error
|
|||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newSlackToMessageProcess(cfg),
|
process: newSlackToModelProcess(cfg),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSlackToMessageProcess(cfg Config) processFunc {
|
func newSlackToModelProcess(cfg Config) processFunc {
|
||||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||||
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
s, err := parseSlack(slack)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
||||||
}
|
}
|
||||||
return m.Serialize(), nil
|
|
||||||
|
for pattern, ptr := range map[string]*string{
|
||||||
|
cfg.AssetPattern: &s.Asset,
|
||||||
|
cfg.DatacenterPattern: &s.Datacenter,
|
||||||
|
cfg.EventNamePattern: &s.EventName,
|
||||||
|
} {
|
||||||
|
r := regexp.MustCompile(pattern)
|
||||||
|
parsed := r.FindString(*ptr)
|
||||||
|
for i, name := range r.SubexpNames() {
|
||||||
|
if i > 0 && name != "" {
|
||||||
|
parsed = r.FindStringSubmatch(*ptr)[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*ptr = parsed
|
||||||
|
}
|
||||||
|
|
||||||
|
event := model.Event{}
|
||||||
|
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
|
||||||
|
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
|
||||||
|
}
|
||||||
|
message := model.Message{}
|
||||||
|
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
|
||||||
|
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
|
||||||
|
}
|
||||||
|
thread := model.Thread{}
|
||||||
|
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
||||||
|
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||||
|
}
|
||||||
|
return json.Marshal(Models{
|
||||||
|
Event: event,
|
||||||
|
Message: message,
|
||||||
|
Thread: thread,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
parsedSlackMessage struct {
|
||||||
|
ID string
|
||||||
|
TS uint64
|
||||||
|
Source string
|
||||||
|
Channel string
|
||||||
|
Thread string
|
||||||
|
EventName string
|
||||||
|
Event string
|
||||||
|
Plaintext string
|
||||||
|
Asset string
|
||||||
|
Resolved bool
|
||||||
|
Datacenter string
|
||||||
|
}
|
||||||
|
|
||||||
|
slackMessage struct {
|
||||||
|
slackEvent
|
||||||
|
Type string
|
||||||
|
TS uint64 `json:"event_time"`
|
||||||
|
Event slackEvent
|
||||||
|
MessageTS string `json:"ts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
slackEvent struct {
|
||||||
|
ID string `json:"event_ts"`
|
||||||
|
Channel string
|
||||||
|
// rewrites
|
||||||
|
Nested *slackEvent `json:"message"`
|
||||||
|
PreviousMessage *slackEvent `json:"previous_message"`
|
||||||
|
// human
|
||||||
|
ParentID string `json:"thread_ts"`
|
||||||
|
Text string
|
||||||
|
Blocks []slackBlock
|
||||||
|
// bot
|
||||||
|
Bot slackBot `json:"bot_profile"`
|
||||||
|
Attachments []slackAttachment
|
||||||
|
}
|
||||||
|
|
||||||
|
slackBlock struct {
|
||||||
|
Elements []slackElement
|
||||||
|
}
|
||||||
|
|
||||||
|
slackElement struct {
|
||||||
|
Elements []slackElement
|
||||||
|
RichText string `json:"text"`
|
||||||
|
}
|
||||||
|
|
||||||
|
slackBot struct {
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
slackAttachment struct {
|
||||||
|
Color string
|
||||||
|
Title string
|
||||||
|
Text string
|
||||||
|
Fields []slackField
|
||||||
|
Actions []slackAction
|
||||||
|
}
|
||||||
|
|
||||||
|
slackField struct {
|
||||||
|
Value string
|
||||||
|
Title string
|
||||||
|
}
|
||||||
|
|
||||||
|
slackAction struct{}
|
||||||
|
)
|
||||||
|
|
||||||
|
func parseSlack(b []byte) (parsedSlackMessage, error) {
|
||||||
|
s, err := _parseSlack(b)
|
||||||
|
if err != nil {
|
||||||
|
return parsedSlackMessage{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
if ch != "" {
|
||||||
|
s.Event.Channel = ch
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
if s.Event.Bot.Name != "" {
|
||||||
|
if len(s.Event.Attachments) == 0 {
|
||||||
|
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||||
|
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
|
||||||
|
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||||
|
}
|
||||||
|
var tagsField string
|
||||||
|
for _, field := range s.Event.Attachments[0].Fields {
|
||||||
|
if field.Title == "Tags" {
|
||||||
|
tagsField = field.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parsedSlackMessage{
|
||||||
|
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
|
||||||
|
TS: s.TS,
|
||||||
|
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")),
|
||||||
|
Channel: s.Event.Channel,
|
||||||
|
Thread: s.Event.ID,
|
||||||
|
EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1],
|
||||||
|
Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"),
|
||||||
|
Plaintext: s.Event.Attachments[0].Text,
|
||||||
|
Asset: s.Event.Attachments[0].Text,
|
||||||
|
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||||
|
Datacenter: tagsField,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.Event.ParentID == "" {
|
||||||
|
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||||
|
}
|
||||||
|
return parsedSlackMessage{
|
||||||
|
ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS),
|
||||||
|
TS: s.TS,
|
||||||
|
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")),
|
||||||
|
Channel: s.Event.Channel,
|
||||||
|
Thread: s.Event.ParentID,
|
||||||
|
EventName: "",
|
||||||
|
Event: "",
|
||||||
|
Plaintext: s.Event.Text,
|
||||||
|
Asset: "",
|
||||||
|
Datacenter: "",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func _parseSlack(b []byte) (slackMessage, error) {
|
||||||
|
var result slackMessage
|
||||||
|
err := json.Unmarshal(b, &result)
|
||||||
|
switch result.Type {
|
||||||
|
case "message":
|
||||||
|
result.Event = result.slackEvent
|
||||||
|
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
|
||||||
|
result.Event.ID = result.MessageTS
|
||||||
|
}
|
||||||
|
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
||||||
|
result.Event.Blocks = result.Event.Nested.Blocks
|
||||||
|
result.Event.Bot = result.Event.Nested.Bot
|
||||||
|
result.Event.Attachments = result.Event.Nested.Attachments
|
||||||
|
result.Event.Nested = nil
|
||||||
|
}
|
||||||
|
if result.Event.PreviousMessage != nil {
|
||||||
|
if result.Event.PreviousMessage.ID != "" {
|
||||||
|
result.Event.ID = result.Event.PreviousMessage.ID
|
||||||
|
}
|
||||||
|
result.Event.PreviousMessage = nil
|
||||||
|
}
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this slackEvent) Empty() bool {
|
||||||
|
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this parsedSlackMessage) Time() time.Time {
|
||||||
|
return time.Unix(int64(this.TS), 0)
|
||||||
|
}
|
||||||
|
|||||||
227
slack_test.go
227
slack_test.go
@@ -2,19 +2,27 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gotest.tools/v3/assert"
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
"gotest.tools/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSlackToMessagePipeline(t *testing.T) {
|
func TestSlackToModelPipeline(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
|
pipeline, err := NewSlackToModelPipeline(ctx, Config{
|
||||||
|
driver: NewTestDriver(t),
|
||||||
|
AssetPattern: renderAssetPattern,
|
||||||
|
DatacenterPattern: renderDatacenterPattern,
|
||||||
|
EventNamePattern: renderEventNamePattern,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -24,27 +32,214 @@ func TestSlackToMessagePipeline(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
want := Message{
|
want := Models{
|
||||||
ID: "1712927439.728409/1712927439",
|
Event: model.NewEvent(
|
||||||
TS: 1712927439,
|
"11071",
|
||||||
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
Channel: "C06U1DDBBU4",
|
1712927439,
|
||||||
Thread: "1712927439.728409",
|
"Alertconfig Workflow Failed",
|
||||||
EventName: "",
|
"",
|
||||||
Event: "11071",
|
"",
|
||||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
"TODO",
|
||||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
false,
|
||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
),
|
||||||
|
Message: model.NewMessage(
|
||||||
|
"1712927439.728409/1712927439",
|
||||||
|
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
1712927439,
|
||||||
|
"TODO",
|
||||||
|
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
"1712927439.728409",
|
||||||
|
),
|
||||||
|
Thread: model.NewThread(
|
||||||
|
"1712927439.728409",
|
||||||
|
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
1712927439,
|
||||||
|
"C06U1DDBBU4",
|
||||||
|
"11071",
|
||||||
|
),
|
||||||
|
/*
|
||||||
|
ID: "1712927439.728409/1712927439",
|
||||||
|
TS: 1712927439,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712927439.728409",
|
||||||
|
EventName: "",
|
||||||
|
Event: "11071",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
|
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
|
||||||
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
|
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
t.Fatal("failed to enqueue", err)
|
t.Fatal("failed to enqueue", err)
|
||||||
}
|
}
|
||||||
|
var got Models
|
||||||
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
|
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
|
||||||
t.Fatal("failed to syn", err)
|
t.Fatal("failed to syn", err)
|
||||||
} else if m := MustDeserialize(b2); false {
|
} else if err := json.Unmarshal(b2, &got); err != nil {
|
||||||
|
t.Fatal("failed to parse outqueue:", err)
|
||||||
} else {
|
} else {
|
||||||
assert.DeepEqual(t, want, m)
|
want.Event.Updated = 0
|
||||||
|
want.Message.Updated = 0
|
||||||
|
want.Thread.Updated = 0
|
||||||
|
got.Event.Updated = 0
|
||||||
|
got.Message.Updated = 0
|
||||||
|
got.Thread.Updated = 0
|
||||||
|
assert.DeepEqual(t, want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseSlackTestdata(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
cases := map[string]struct {
|
||||||
|
slackMessage slackMessage
|
||||||
|
message parsedSlackMessage
|
||||||
|
}{
|
||||||
|
"human_thread_message_from_opsgenie_alert.json": {
|
||||||
|
slackMessage: slackMessage{
|
||||||
|
TS: 1712930706,
|
||||||
|
Event: slackEvent{
|
||||||
|
ID: "1712930706.598629",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
ParentID: "1712927439.728409",
|
||||||
|
Text: "I gotta do this",
|
||||||
|
Blocks: []slackBlock{{
|
||||||
|
Elements: []slackElement{{
|
||||||
|
Elements: []slackElement{{
|
||||||
|
RichText: "I gotta do this",
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
Bot: slackBot{
|
||||||
|
Name: "",
|
||||||
|
},
|
||||||
|
Attachments: []slackAttachment{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: parsedSlackMessage{
|
||||||
|
ID: "1712927439.728409/1712930706",
|
||||||
|
TS: 1712930706,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712927439.728409",
|
||||||
|
EventName: "",
|
||||||
|
Event: "",
|
||||||
|
Plaintext: "I gotta do this",
|
||||||
|
Asset: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"opsgenie_alert.json": {
|
||||||
|
slackMessage: slackMessage{
|
||||||
|
TS: 1712927439,
|
||||||
|
Event: slackEvent{
|
||||||
|
ID: "1712927439.728409",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Bot: slackBot{
|
||||||
|
Name: "Opsgenie for Alert Management",
|
||||||
|
},
|
||||||
|
Attachments: []slackAttachment{{
|
||||||
|
Color: "F4511E",
|
||||||
|
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Fields: []slackField{
|
||||||
|
{Value: "P3", Title: "Priority"},
|
||||||
|
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||||
|
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||||
|
},
|
||||||
|
Actions: []slackAction{{}, {}, {}},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: parsedSlackMessage{
|
||||||
|
ID: "1712927439.728409/1712927439",
|
||||||
|
TS: 1712927439,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712927439.728409",
|
||||||
|
EventName: "Alertconfig Workflow Failed",
|
||||||
|
Event: "11071",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"opsgenie_alert_resolved.json": {
|
||||||
|
slackMessage: slackMessage{
|
||||||
|
TS: 1712916339,
|
||||||
|
Event: slackEvent{
|
||||||
|
ID: "1712916339.000300",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Bot: slackBot{
|
||||||
|
Name: "Opsgenie for Alert Management",
|
||||||
|
},
|
||||||
|
Attachments: []slackAttachment{{
|
||||||
|
Color: "2ecc71",
|
||||||
|
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Fields: []slackField{
|
||||||
|
{Value: "P3", Title: "Priority"},
|
||||||
|
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||||
|
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||||
|
},
|
||||||
|
Actions: []slackAction{},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: parsedSlackMessage{
|
||||||
|
ID: "1712916339.000300/1712916339",
|
||||||
|
TS: 1712916339,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712916339.000300",
|
||||||
|
EventName: "Alertconfig Workflow Failed",
|
||||||
|
Event: "11069",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Resolved: true,
|
||||||
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"reingested_alert.json": {
|
||||||
|
message: parsedSlackMessage{
|
||||||
|
ID: "1712892637.037639/1712892637",
|
||||||
|
TS: 1712892637,
|
||||||
|
Source: "https://renderinc.slack.com/archives//p1712892637037639",
|
||||||
|
//Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712892637.037639",
|
||||||
|
EventName: "Alertconfig Workflow Failed",
|
||||||
|
Event: "11061",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Resolved: true,
|
||||||
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, d := range cases {
|
||||||
|
want := d
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("parseSlack", func(t *testing.T) {
|
||||||
|
got, err := parseSlack(b)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if got != want.message {
|
||||||
|
assert.DeepEqual(t, want.message, got)
|
||||||
|
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
||||||
|
}
|
||||||
|
if time := got.Time(); time.Unix() != int64(got.TS) {
|
||||||
|
t.Error("not unix time", got.TS, time)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
159
storage.go
Normal file
159
storage.go
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Storage struct {
|
||||||
|
driver Driver
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
|
||||||
|
if _, err := driver.ExecContext(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS events (ID TEXT UNIQUE);
|
||||||
|
CREATE TABLE IF NOT EXISTS messages (ID TEXT UNIQUE);
|
||||||
|
CREATE TABLE IF NOT EXISTS threads (ID TEXT UNIQUE);
|
||||||
|
`); err != nil {
|
||||||
|
return Storage{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for table, v := range map[string]any{
|
||||||
|
"events": model.Event{},
|
||||||
|
"messages": model.Message{},
|
||||||
|
"threads": model.Thread{},
|
||||||
|
} {
|
||||||
|
b, _ := json.Marshal(v)
|
||||||
|
var m map[string]struct{}
|
||||||
|
json.Unmarshal(b, &m)
|
||||||
|
for k := range m {
|
||||||
|
if k == `ID` {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
driver.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s TEXT`, table, k))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Storage{driver: driver}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
|
||||||
|
v := model.Event{}
|
||||||
|
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
|
||||||
|
return v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
|
||||||
|
return s.upsert(ctx, "events", event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
|
||||||
|
v := model.Message{}
|
||||||
|
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
|
||||||
|
return v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error {
|
||||||
|
return s.upsert(ctx, "messages", message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
|
||||||
|
v := model.Thread{}
|
||||||
|
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
|
||||||
|
return v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||||
|
return s.upsert(ctx, "threads", thread)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||||
|
if questions := strings.Count(clause, "?"); questions != len(args) {
|
||||||
|
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||||
|
}
|
||||||
|
|
||||||
|
keys, _, _, _, err := keysArgsKeyargsValues(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range args {
|
||||||
|
args[i], _ = json.Marshal(args[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
SELECT %s FROM %s WHERE %s
|
||||||
|
`, strings.Join(keys, ", "), table, clause)
|
||||||
|
row := s.driver.QueryRowContext(ctx, q, args...)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
scanTargets := make([]any, len(keys))
|
||||||
|
for i := range scanTargets {
|
||||||
|
scanTargets[i] = &[]byte{}
|
||||||
|
}
|
||||||
|
if err := row.Scan(scanTargets...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := map[string]json.RawMessage{}
|
||||||
|
for i, k := range keys {
|
||||||
|
m[k] = *scanTargets[i].(*[]byte)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(m)
|
||||||
|
return json.Unmarshal(b, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) upsert(ctx context.Context, table string, v any) error {
|
||||||
|
keys, args, keyArgs, values, err := keysArgsKeyargsValues(v)
|
||||||
|
if err != nil || len(keys) == 0 {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range keys {
|
||||||
|
values = append(values, values[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
INSERT INTO %s (%s) VALUES (%s)
|
||||||
|
ON CONFLICT (ID) DO UPDATE SET %s
|
||||||
|
`, table, strings.Join(keys, ", "), strings.Join(args, ", "), strings.Join(keyArgs, ", "))
|
||||||
|
if result, err := s.driver.ExecContext(ctx, q, values...); err != nil {
|
||||||
|
return err
|
||||||
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if n != 1 {
|
||||||
|
return fmt.Errorf("UpsertMessage affected %v rows", n)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
|
||||||
|
b, _ := json.Marshal(v)
|
||||||
|
var m map[string]json.RawMessage
|
||||||
|
err := json.Unmarshal(b, &m)
|
||||||
|
|
||||||
|
keys := []string{}
|
||||||
|
for k := range m {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
args := make([]string, len(keys))
|
||||||
|
for i := range args {
|
||||||
|
args[i] = "?"
|
||||||
|
}
|
||||||
|
keyArgs := make([]string, len(keys))
|
||||||
|
for i := range keyArgs {
|
||||||
|
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
|
||||||
|
}
|
||||||
|
values := make([]any, len(keys))
|
||||||
|
for i := range values {
|
||||||
|
values[i] = []byte(m[keys[i]])
|
||||||
|
}
|
||||||
|
|
||||||
|
return keys, args, keyArgs, values, err
|
||||||
|
}
|
||||||
89
storage_test.go
Normal file
89
storage_test.go
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStorage(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Minute)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
s, err := NewStorage(ctx, NewTestDriver(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("upsert get event", func(t *testing.T) {
|
||||||
|
m := model.NewEvent(
|
||||||
|
"ID",
|
||||||
|
"URL",
|
||||||
|
1,
|
||||||
|
"Name",
|
||||||
|
"Asset",
|
||||||
|
"Datacenter",
|
||||||
|
"Team",
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertEvent(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if err := s.UpsertEvent(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on noop update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, err := s.GetEvent(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on get:", err)
|
||||||
|
} else if got != m {
|
||||||
|
t.Fatal("unexpected result from get:", got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("upsert get thread", func(t *testing.T) {
|
||||||
|
m := model.NewThread(
|
||||||
|
"ID",
|
||||||
|
"URL",
|
||||||
|
1,
|
||||||
|
"Channel",
|
||||||
|
"EventID",
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertThread(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if err := s.UpsertThread(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on noop update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, err := s.GetThread(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on get:", err)
|
||||||
|
} else if got != m {
|
||||||
|
t.Fatal("unexpected result from get:", got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("upsert get message", func(t *testing.T) {
|
||||||
|
m := model.NewMessage(
|
||||||
|
"ID",
|
||||||
|
"URL",
|
||||||
|
1,
|
||||||
|
"Author",
|
||||||
|
"Plaintext",
|
||||||
|
"ThreadID",
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.UpsertMessage(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on insert:", err)
|
||||||
|
} else if err := s.UpsertMessage(ctx, m); err != nil {
|
||||||
|
t.Fatal("unexpected error on noop update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, err := s.GetMessage(ctx, m.ID); err != nil {
|
||||||
|
t.Fatal("unexpected error on get:", err)
|
||||||
|
} else if got != m {
|
||||||
|
t.Fatal("unexpected result from get:", got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user