Compare commits

..

9 Commits

Author SHA1 Message Date
Bel LaPointe
709f2ac254 slack to model pipeline tested and K 2024-04-16 07:19:25 -06:00
Bel LaPointe
ba06796b8c save what i need from old .Message 2024-04-16 07:05:30 -06:00
Bel LaPointe
f38c183fe8 stubbing 2024-04-16 06:53:30 -06:00
Bel LaPointe
8ae8f47753 tests run and fail again 2024-04-16 06:52:07 -06:00
Bel LaPointe
e372be4288 rm temp 2024-04-16 06:46:30 -06:00
Bel LaPointe
acfd95e5af everything just werks for storage 2024-04-16 06:45:40 -06:00
Bel LaPointe
d70a0e313f STORAGE TEST WERKS 2024-04-16 06:43:25 -06:00
Bel LaPointe
0cecd5ea04 add external id to .model 2024-04-16 05:55:35 -06:00
Bel LaPointe
a7d5d021d6 fill model 2024-04-16 05:51:08 -06:00
20 changed files with 816 additions and 310 deletions

View File

@@ -30,10 +30,10 @@ type Config struct {
DatacenterPattern string
EventNamePattern string
driver Driver
storage Storage
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
slackToModelPipeline Pipeline
modelToPersistencePipeline Pipeline
}
var (
@@ -119,6 +119,12 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, errors.New("not impl")
}
storage, err := NewStorage(ctx, result.driver)
if err != nil {
return Config{}, err
}
result.storage = storage
if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
@@ -127,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.ai = NewAINoop()
}
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackToMessagePipeline = slackToMessagePipeline
result.slackToModelPipeline = slackToModelPipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
result.modelToPersistencePipeline = modelToPersistencePipeline
return result, nil
}

View File

@@ -17,8 +17,12 @@ type Driver struct {
*sql.DB
}
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
func NewTestDriver(t *testing.T, optionalP ...string) Driver {
p := path.Join(t.TempDir(), "db")
if len(optionalP) > 0 {
p = optionalP[0]
}
driver, err := NewDriver(context.Background(), p)
if err != nil {
t.Fatal(err)
}

2
go.mod
View File

@@ -16,9 +16,11 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect

4
go.sum
View File

@@ -18,6 +18,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,6 +36,8 @@ golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=

View File

@@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error {
case <-ctx.Done():
return ctx.Err()
case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
@@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
}
errs := []error{}
for _, messageJSON := range page.Messages {
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
}
}
@@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return
}
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@@ -45,7 +45,6 @@ func TestRun(t *testing.T) {
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port
cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}
@@ -96,7 +95,7 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Messages []Message
Messages []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
@@ -189,7 +188,7 @@ func TestRun(t *testing.T) {
}
var result struct {
Thread []Message
Thread []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)

View File

@@ -1,157 +0,0 @@
package main
import (
"os"
"path"
"testing"
)
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
inCh string
slackMessage slackMessage
message Message
}{
"human_thread_message_from_opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712930706,
Event: slackEvent{
ID: "1712930706.598629",
Channel: "C06U1DDBBU4",
ParentID: "1712927439.728409",
Text: "I gotta do this",
Blocks: []slackBlock{{
Elements: []slackElement{{
Elements: []slackElement{{
RichText: "I gotta do this",
}},
}},
}},
Bot: slackBot{
Name: "",
},
Attachments: []slackAttachment{},
},
},
message: Message{
ID: "1712927439.728409/1712930706",
TS: 1712930706,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "",
Plaintext: "I gotta do this",
Asset: "",
},
},
"opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712927439,
Event: slackEvent{
ID: "1712927439.728409",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{{}, {}, {}},
}},
},
},
message: Message{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "Alertconfig Workflow Failed",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: Message{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
"reingested_alert.json": {
inCh: "C06U1DDBBU4",
message: Message{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
}
for name, d := range cases {
want := d
t.Run(name, func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
if err != nil {
t.Fatal(err)
}
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
if err != nil {
t.Fatal(err)
}
if got != want.message {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}
}

View File

@@ -1,4 +1,37 @@
package model
import "time"
type Event struct {
Updated uint64
ID string
URL string
TS uint64
Name string
Asset string
Datacenter string
Team string
Resolved bool
}
func NewEvent(ID, URL string, TS uint64, Name, Asset, Datacenter, Team string, Resolved bool) Event {
return Event{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Name: Name,
Asset: Asset,
Datacenter: Datacenter,
Team: Team,
Resolved: Resolved,
}
}
func (e Event) Empty() bool {
return e == (Event{})
}
func updated() uint64 {
return uint64(time.Now().UnixNano() / int64(time.Millisecond))
}

View File

@@ -1 +1,28 @@
package model
// THREAD ||--|{ MESSAGE: "populated by"
type Message struct {
Updated uint64
ID string
URL string
TS uint64
Author string
Plaintext string
ThreadID string
}
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
return Message{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Author: Author,
Plaintext: Plaintext,
ThreadID: ThreadID,
}
}
func (m Message) Empty() bool {
return m == (Message{})
}

View File

@@ -7,10 +7,12 @@ erDiagram
THREAD ||--|{ MESSAGE: "populated by"
MESSAGE {
ID str
URL str
TS number
Plaintext str
Author str
}
THREAD {
ID str
@@ -23,5 +25,6 @@ erDiagram
Asset str
Resolved bool
Datacenter str
Team str
}
`

View File

@@ -1 +1,26 @@
package model
// EVENT ||--|{ THREAD: "spawns"
type Thread struct {
Updated uint64
ID string
URL string
TS uint64
Channel string
EventID string
}
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
return Thread{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Channel: Channel,
EventID: EventID,
}
}
func (t Thread) Empty() bool {
return t == (Thread{})
}

View File

@@ -1,32 +0,0 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

View File

@@ -1,18 +0,0 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

View File

@@ -2,14 +2,14 @@ package main
import (
"context"
"fmt"
"errors"
)
type MessageToPersistence struct {
type ModelToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
@@ -21,27 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
process: newModelToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
func newModelToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
return nil, errors.New("not impl")
}
}

View File

@@ -6,29 +6,13 @@ import (
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
func TestModelToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
process := newModelToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
_, _ = ctx, process
}

217
slack.go
View File

@@ -2,14 +2,32 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
type SlackToMessage struct {
var (
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
)
type SlackToModel struct {
pipeline Pipeline
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
type Models struct {
Event model.Event
Message model.Message
Thread model.Thread
}
func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil {
return Pipeline{}, err
@@ -21,16 +39,203 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error
return Pipeline{
writer: writer,
reader: reader,
process: newSlackToMessageProcess(cfg),
process: newSlackToModelProcess(cfg),
}, nil
}
func newSlackToMessageProcess(cfg Config) processFunc {
func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
s, err := parseSlack(slack)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
}
return m.Serialize(), nil
for pattern, ptr := range map[string]*string{
cfg.AssetPattern: &s.Asset,
cfg.DatacenterPattern: &s.Datacenter,
cfg.EventNamePattern: &s.EventName,
} {
r := regexp.MustCompile(pattern)
parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
}
event := model.Event{}
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
}
message := model.Message{}
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
}
thread := model.Thread{}
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
}
return json.Marshal(Models{
Event: event,
Message: message,
Thread: thread,
})
}
}
type (
parsedSlackMessage struct {
ID string
TS uint64
Source string
Channel string
Thread string
EventName string
Event string
Plaintext string
Asset string
Resolved bool
Datacenter string
}
slackMessage struct {
slackEvent
Type string
TS uint64 `json:"event_time"`
Event slackEvent
MessageTS string `json:"ts"`
}
slackEvent struct {
ID string `json:"event_ts"`
Channel string
// rewrites
Nested *slackEvent `json:"message"`
PreviousMessage *slackEvent `json:"previous_message"`
// human
ParentID string `json:"thread_ts"`
Text string
Blocks []slackBlock
// bot
Bot slackBot `json:"bot_profile"`
Attachments []slackAttachment
}
slackBlock struct {
Elements []slackElement
}
slackElement struct {
Elements []slackElement
RichText string `json:"text"`
}
slackBot struct {
Name string
}
slackAttachment struct {
Color string
Title string
Text string
Fields []slackField
Actions []slackAction
}
slackField struct {
Value string
Title string
}
slackAction struct{}
)
func parseSlack(b []byte) (parsedSlackMessage, error) {
s, err := _parseSlack(b)
if err != nil {
return parsedSlackMessage{}, err
}
/*
if ch != "" {
s.Event.Channel = ch
}
*/
if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 {
return parsedSlackMessage{}, ErrIrrelevantMessage
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return parsedSlackMessage{}, ErrIrrelevantMessage
}
var tagsField string
for _, field := range s.Event.Attachments[0].Fields {
if field.Title == "Tags" {
tagsField = field.Value
}
}
return parsedSlackMessage{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
TS: s.TS,
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")),
Channel: s.Event.Channel,
Thread: s.Event.ID,
EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1],
Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"),
Plaintext: s.Event.Attachments[0].Text,
Asset: s.Event.Attachments[0].Text,
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
Datacenter: tagsField,
}, nil
}
if s.Event.ParentID == "" {
return parsedSlackMessage{}, ErrIrrelevantMessage
}
return parsedSlackMessage{
ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS),
TS: s.TS,
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")),
Channel: s.Event.Channel,
Thread: s.Event.ParentID,
EventName: "",
Event: "",
Plaintext: s.Event.Text,
Asset: "",
Datacenter: "",
}, nil
}
func _parseSlack(b []byte) (slackMessage, error) {
var result slackMessage
err := json.Unmarshal(b, &result)
switch result.Type {
case "message":
result.Event = result.slackEvent
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
result.Event.ID = result.MessageTS
}
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot
result.Event.Attachments = result.Event.Nested.Attachments
result.Event.Nested = nil
}
if result.Event.PreviousMessage != nil {
if result.Event.PreviousMessage.ID != "" {
result.Event.ID = result.Event.PreviousMessage.ID
}
result.Event.PreviousMessage = nil
}
return result, err
}
func (this slackEvent) Empty() bool {
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
}
func (this parsedSlackMessage) Time() time.Time {
return time.Unix(int64(this.TS), 0)
}

View File

@@ -2,19 +2,27 @@ package main
import (
"context"
"encoding/json"
"os"
"path"
"testing"
"time"
"gotest.tools/v3/assert"
"github.com/breel-render/spoc-bot-vr/model"
"gotest.tools/assert"
)
func TestSlackToMessagePipeline(t *testing.T) {
func TestSlackToModelPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
pipeline, err := NewSlackToModelPipeline(ctx, Config{
driver: NewTestDriver(t),
AssetPattern: renderAssetPattern,
DatacenterPattern: renderDatacenterPattern,
EventNamePattern: renderEventNamePattern,
})
if err != nil {
t.Fatal(err)
}
@@ -24,7 +32,33 @@ func TestSlackToMessagePipeline(t *testing.T) {
}
}()
want := Message{
want := Models{
Event: model.NewEvent(
"11071",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"Alertconfig Workflow Failed",
"",
"",
"TODO",
false,
),
Message: model.NewMessage(
"1712927439.728409/1712927439",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"TODO",
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"1712927439.728409",
),
Thread: model.NewThread(
"1712927439.728409",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"C06U1DDBBU4",
"11071",
),
/*
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
@@ -35,16 +69,177 @@ func TestSlackToMessagePipeline(t *testing.T) {
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
*/
}
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal("failed to enqueue", err)
}
var got Models
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
t.Fatal("failed to syn", err)
} else if m := MustDeserialize(b2); false {
} else if err := json.Unmarshal(b2, &got); err != nil {
t.Fatal("failed to parse outqueue:", err)
} else {
assert.DeepEqual(t, want, m)
want.Event.Updated = 0
want.Message.Updated = 0
want.Thread.Updated = 0
got.Event.Updated = 0
got.Message.Updated = 0
got.Thread.Updated = 0
assert.DeepEqual(t, want, got)
}
}
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
slackMessage slackMessage
message parsedSlackMessage
}{
"human_thread_message_from_opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712930706,
Event: slackEvent{
ID: "1712930706.598629",
Channel: "C06U1DDBBU4",
ParentID: "1712927439.728409",
Text: "I gotta do this",
Blocks: []slackBlock{{
Elements: []slackElement{{
Elements: []slackElement{{
RichText: "I gotta do this",
}},
}},
}},
Bot: slackBot{
Name: "",
},
Attachments: []slackAttachment{},
},
},
message: parsedSlackMessage{
ID: "1712927439.728409/1712930706",
TS: 1712930706,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "",
Plaintext: "I gotta do this",
Asset: "",
},
},
"opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712927439,
Event: slackEvent{
ID: "1712927439.728409",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{{}, {}, {}},
}},
},
},
message: parsedSlackMessage{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "Alertconfig Workflow Failed",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: parsedSlackMessage{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
"reingested_alert.json": {
message: parsedSlackMessage{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives//p1712892637037639",
//Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
}
for name, d := range cases {
want := d
t.Run(name, func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
if err != nil {
t.Fatal(err)
}
t.Run("parseSlack", func(t *testing.T) {
got, err := parseSlack(b)
if err != nil {
t.Fatal(err)
}
if got != want.message {
assert.DeepEqual(t, want.message, got)
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}
}

159
storage.go Normal file
View File

@@ -0,0 +1,159 @@
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/breel-render/spoc-bot-vr/model"
)
type Storage struct {
driver Driver
}
func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS events (ID TEXT UNIQUE);
CREATE TABLE IF NOT EXISTS messages (ID TEXT UNIQUE);
CREATE TABLE IF NOT EXISTS threads (ID TEXT UNIQUE);
`); err != nil {
return Storage{}, err
}
for table, v := range map[string]any{
"events": model.Event{},
"messages": model.Message{},
"threads": model.Thread{},
} {
b, _ := json.Marshal(v)
var m map[string]struct{}
json.Unmarshal(b, &m)
for k := range m {
if k == `ID` {
continue
}
driver.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s TEXT`, table, k))
}
}
return Storage{driver: driver}, nil
}
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
v := model.Event{}
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
return s.upsert(ctx, "events", event)
}
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
v := model.Message{}
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error {
return s.upsert(ctx, "messages", message)
}
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
v := model.Thread{}
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
return s.upsert(ctx, "threads", thread)
}
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
if questions := strings.Count(clause, "?"); questions != len(args) {
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
}
keys, _, _, _, err := keysArgsKeyargsValues(v)
if err != nil {
return err
}
for i := range args {
args[i], _ = json.Marshal(args[i])
}
q := fmt.Sprintf(`
SELECT %s FROM %s WHERE %s
`, strings.Join(keys, ", "), table, clause)
row := s.driver.QueryRowContext(ctx, q, args...)
if err := row.Err(); err != nil {
return err
}
scanTargets := make([]any, len(keys))
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := row.Scan(scanTargets...); err != nil {
return err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
return json.Unmarshal(b, v)
}
func (s Storage) upsert(ctx context.Context, table string, v any) error {
keys, args, keyArgs, values, err := keysArgsKeyargsValues(v)
if err != nil || len(keys) == 0 {
return err
}
for i := range keys {
values = append(values, values[i])
}
q := fmt.Sprintf(`
INSERT INTO %s (%s) VALUES (%s)
ON CONFLICT (ID) DO UPDATE SET %s
`, table, strings.Join(keys, ", "), strings.Join(args, ", "), strings.Join(keyArgs, ", "))
if result, err := s.driver.ExecContext(ctx, q, values...); err != nil {
return err
} else if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("UpsertMessage affected %v rows", n)
}
return nil
}
func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
b, _ := json.Marshal(v)
var m map[string]json.RawMessage
err := json.Unmarshal(b, &m)
keys := []string{}
for k := range m {
keys = append(keys, k)
}
args := make([]string, len(keys))
for i := range args {
args[i] = "?"
}
keyArgs := make([]string, len(keys))
for i := range keyArgs {
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
}
values := make([]any, len(keys))
for i := range values {
values[i] = []byte(m[keys[i]])
}
return keys, args, keyArgs, values, err
}

89
storage_test.go Normal file
View File

@@ -0,0 +1,89 @@
package main
import (
"context"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Minute)
defer can()
s, err := NewStorage(ctx, NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
t.Run("upsert get event", func(t *testing.T) {
m := model.NewEvent(
"ID",
"URL",
1,
"Name",
"Asset",
"Datacenter",
"Team",
true,
)
if err := s.UpsertEvent(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertEvent(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetEvent(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
t.Run("upsert get thread", func(t *testing.T) {
m := model.NewThread(
"ID",
"URL",
1,
"Channel",
"EventID",
)
if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetThread(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
t.Run("upsert get message", func(t *testing.T) {
m := model.NewMessage(
"ID",
"URL",
1,
"Author",
"Plaintext",
"ThreadID",
)
if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetMessage(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
}