Compare commits

...

9 Commits

Author SHA1 Message Date
Bel LaPointe
709f2ac254 slack to model pipeline tested and K 2024-04-16 07:19:25 -06:00
Bel LaPointe
ba06796b8c save what i need from old .Message 2024-04-16 07:05:30 -06:00
Bel LaPointe
f38c183fe8 stubbing 2024-04-16 06:53:30 -06:00
Bel LaPointe
8ae8f47753 tests run and fail again 2024-04-16 06:52:07 -06:00
Bel LaPointe
e372be4288 rm temp 2024-04-16 06:46:30 -06:00
Bel LaPointe
acfd95e5af everything just werks for storage 2024-04-16 06:45:40 -06:00
Bel LaPointe
d70a0e313f STORAGE TEST WERKS 2024-04-16 06:43:25 -06:00
Bel LaPointe
0cecd5ea04 add external id to .model 2024-04-16 05:55:35 -06:00
Bel LaPointe
a7d5d021d6 fill model 2024-04-16 05:51:08 -06:00
20 changed files with 816 additions and 310 deletions

View File

@@ -13,27 +13,27 @@ import (
)
type Config struct {
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
storage Storage
ai AI
slackToModelPipeline Pipeline
modelToPersistencePipeline Pipeline
}
var (
@@ -119,6 +119,12 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, errors.New("not impl")
}
storage, err := NewStorage(ctx, result.driver)
if err != nil {
return Config{}, err
}
result.storage = storage
if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
} else if result.LocalCheckpoint != "" && result.LocalTokenizer != "" {
@@ -127,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.ai = NewAINoop()
}
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackToMessagePipeline = slackToMessagePipeline
result.slackToModelPipeline = slackToModelPipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
result.modelToPersistencePipeline = modelToPersistencePipeline
return result, nil
}

View File

@@ -17,8 +17,12 @@ type Driver struct {
*sql.DB
}
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
func NewTestDriver(t *testing.T, optionalP ...string) Driver {
p := path.Join(t.TempDir(), "db")
if len(optionalP) > 0 {
p = optionalP[0]
}
driver, err := NewDriver(context.Background(), p)
if err != nil {
t.Fatal(err)
}

2
go.mod
View File

@@ -16,9 +16,11 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect

4
go.sum
View File

@@ -18,6 +18,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,6 +36,8 @@ golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=

View File

@@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error {
case <-ctx.Done():
return ctx.Err()
case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
@@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
}
errs := []error{}
for _, messageJSON := range page.Messages {
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
}
}
@@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return
}
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@@ -45,7 +45,6 @@ func TestRun(t *testing.T) {
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port
cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}
@@ -96,7 +95,7 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Messages []Message
Messages []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
@@ -189,7 +188,7 @@ func TestRun(t *testing.T) {
}
var result struct {
Thread []Message
Thread []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)

View File

@@ -1,157 +0,0 @@
package main
import (
"os"
"path"
"testing"
)
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
inCh string
slackMessage slackMessage
message Message
}{
"human_thread_message_from_opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712930706,
Event: slackEvent{
ID: "1712930706.598629",
Channel: "C06U1DDBBU4",
ParentID: "1712927439.728409",
Text: "I gotta do this",
Blocks: []slackBlock{{
Elements: []slackElement{{
Elements: []slackElement{{
RichText: "I gotta do this",
}},
}},
}},
Bot: slackBot{
Name: "",
},
Attachments: []slackAttachment{},
},
},
message: Message{
ID: "1712927439.728409/1712930706",
TS: 1712930706,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "",
Plaintext: "I gotta do this",
Asset: "",
},
},
"opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712927439,
Event: slackEvent{
ID: "1712927439.728409",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{{}, {}, {}},
}},
},
},
message: Message{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "Alertconfig Workflow Failed",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: Message{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
"reingested_alert.json": {
inCh: "C06U1DDBBU4",
message: Message{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
}
for name, d := range cases {
want := d
t.Run(name, func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
if err != nil {
t.Fatal(err)
}
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
if err != nil {
t.Fatal(err)
}
if got != want.message {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}
}

View File

@@ -1,4 +1,37 @@
package model
import "time"
type Event struct {
Updated uint64
ID string
URL string
TS uint64
Name string
Asset string
Datacenter string
Team string
Resolved bool
}
func NewEvent(ID, URL string, TS uint64, Name, Asset, Datacenter, Team string, Resolved bool) Event {
return Event{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Name: Name,
Asset: Asset,
Datacenter: Datacenter,
Team: Team,
Resolved: Resolved,
}
}
func (e Event) Empty() bool {
return e == (Event{})
}
func updated() uint64 {
return uint64(time.Now().UnixNano() / int64(time.Millisecond))
}

View File

@@ -1 +1,28 @@
package model
// THREAD ||--|{ MESSAGE: "populated by"
type Message struct {
Updated uint64
ID string
URL string
TS uint64
Author string
Plaintext string
ThreadID string
}
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
return Message{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Author: Author,
Plaintext: Plaintext,
ThreadID: ThreadID,
}
}
func (m Message) Empty() bool {
return m == (Message{})
}

View File

@@ -7,10 +7,12 @@ erDiagram
THREAD ||--|{ MESSAGE: "populated by"
MESSAGE {
ID str
URL str
TS number
Plaintext str
Author str
}
THREAD {
ID str
@@ -23,5 +25,6 @@ erDiagram
Asset str
Resolved bool
Datacenter str
Team str
}
`

View File

@@ -1 +1,26 @@
package model
// EVENT ||--|{ THREAD: "spawns"
type Thread struct {
Updated uint64
ID string
URL string
TS uint64
Channel string
EventID string
}
func NewThread(ID, URL string, TS uint64, Channel string, EventID string) Thread {
return Thread{
Updated: updated(),
ID: ID,
URL: URL,
TS: TS,
Channel: Channel,
EventID: EventID,
}
}
func (t Thread) Empty() bool {
return t == (Thread{})
}

View File

@@ -1,32 +0,0 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

View File

@@ -1,18 +0,0 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

View File

@@ -2,14 +2,14 @@ package main
import (
"context"
"fmt"
"errors"
)
type MessageToPersistence struct {
type ModelToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
@@ -21,27 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
process: newModelToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
func newModelToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
return nil, errors.New("not impl")
}
}

View File

@@ -6,29 +6,13 @@ import (
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
func TestModelToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
process := newModelToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
_, _ = ctx, process
}

217
slack.go
View File

@@ -2,14 +2,32 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
type SlackToMessage struct {
var (
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
)
type SlackToModel struct {
pipeline Pipeline
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
type Models struct {
Event model.Event
Message model.Message
Thread model.Thread
}
func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil {
return Pipeline{}, err
@@ -21,16 +39,203 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error
return Pipeline{
writer: writer,
reader: reader,
process: newSlackToMessageProcess(cfg),
process: newSlackToModelProcess(cfg),
}, nil
}
func newSlackToMessageProcess(cfg Config) processFunc {
func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
s, err := parseSlack(slack)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
}
return m.Serialize(), nil
for pattern, ptr := range map[string]*string{
cfg.AssetPattern: &s.Asset,
cfg.DatacenterPattern: &s.Datacenter,
cfg.EventNamePattern: &s.EventName,
} {
r := regexp.MustCompile(pattern)
parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
}
event := model.Event{}
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
}
message := model.Message{}
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
}
thread := model.Thread{}
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
}
return json.Marshal(Models{
Event: event,
Message: message,
Thread: thread,
})
}
}
type (
parsedSlackMessage struct {
ID string
TS uint64
Source string
Channel string
Thread string
EventName string
Event string
Plaintext string
Asset string
Resolved bool
Datacenter string
}
slackMessage struct {
slackEvent
Type string
TS uint64 `json:"event_time"`
Event slackEvent
MessageTS string `json:"ts"`
}
slackEvent struct {
ID string `json:"event_ts"`
Channel string
// rewrites
Nested *slackEvent `json:"message"`
PreviousMessage *slackEvent `json:"previous_message"`
// human
ParentID string `json:"thread_ts"`
Text string
Blocks []slackBlock
// bot
Bot slackBot `json:"bot_profile"`
Attachments []slackAttachment
}
slackBlock struct {
Elements []slackElement
}
slackElement struct {
Elements []slackElement
RichText string `json:"text"`
}
slackBot struct {
Name string
}
slackAttachment struct {
Color string
Title string
Text string
Fields []slackField
Actions []slackAction
}
slackField struct {
Value string
Title string
}
slackAction struct{}
)
func parseSlack(b []byte) (parsedSlackMessage, error) {
s, err := _parseSlack(b)
if err != nil {
return parsedSlackMessage{}, err
}
/*
if ch != "" {
s.Event.Channel = ch
}
*/
if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 {
return parsedSlackMessage{}, ErrIrrelevantMessage
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
return parsedSlackMessage{}, ErrIrrelevantMessage
}
var tagsField string
for _, field := range s.Event.Attachments[0].Fields {
if field.Title == "Tags" {
tagsField = field.Value
}
}
return parsedSlackMessage{
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
TS: s.TS,
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ID, ".", "")),
Channel: s.Event.Channel,
Thread: s.Event.ID,
EventName: strings.Split(s.Event.Attachments[0].Title, ": Firing: ")[1],
Event: strings.TrimPrefix(strings.Split(s.Event.Attachments[0].Title, ":")[0], "#"),
Plaintext: s.Event.Attachments[0].Text,
Asset: s.Event.Attachments[0].Text,
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
Datacenter: tagsField,
}, nil
}
if s.Event.ParentID == "" {
return parsedSlackMessage{}, ErrIrrelevantMessage
}
return parsedSlackMessage{
ID: fmt.Sprintf("%s/%v", s.Event.ParentID, s.TS),
TS: s.TS,
Source: fmt.Sprintf(`https://renderinc.slack.com/archives/%s/p%s`, s.Event.Channel, strings.ReplaceAll(s.Event.ParentID, ".", "")),
Channel: s.Event.Channel,
Thread: s.Event.ParentID,
EventName: "",
Event: "",
Plaintext: s.Event.Text,
Asset: "",
Datacenter: "",
}, nil
}
func _parseSlack(b []byte) (slackMessage, error) {
var result slackMessage
err := json.Unmarshal(b, &result)
switch result.Type {
case "message":
result.Event = result.slackEvent
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
result.Event.ID = result.MessageTS
}
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot
result.Event.Attachments = result.Event.Nested.Attachments
result.Event.Nested = nil
}
if result.Event.PreviousMessage != nil {
if result.Event.PreviousMessage.ID != "" {
result.Event.ID = result.Event.PreviousMessage.ID
}
result.Event.PreviousMessage = nil
}
return result, err
}
func (this slackEvent) Empty() bool {
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
}
func (this parsedSlackMessage) Time() time.Time {
return time.Unix(int64(this.TS), 0)
}

View File

@@ -2,19 +2,27 @@ package main
import (
"context"
"encoding/json"
"os"
"path"
"testing"
"time"
"gotest.tools/v3/assert"
"github.com/breel-render/spoc-bot-vr/model"
"gotest.tools/assert"
)
func TestSlackToMessagePipeline(t *testing.T) {
func TestSlackToModelPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
pipeline, err := NewSlackToModelPipeline(ctx, Config{
driver: NewTestDriver(t),
AssetPattern: renderAssetPattern,
DatacenterPattern: renderDatacenterPattern,
EventNamePattern: renderEventNamePattern,
})
if err != nil {
t.Fatal(err)
}
@@ -24,27 +32,214 @@ func TestSlackToMessagePipeline(t *testing.T) {
}
}()
want := Message{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
want := Models{
Event: model.NewEvent(
"11071",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"Alertconfig Workflow Failed",
"",
"",
"TODO",
false,
),
Message: model.NewMessage(
"1712927439.728409/1712927439",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"TODO",
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"1712927439.728409",
),
Thread: model.NewThread(
"1712927439.728409",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439,
"C06U1DDBBU4",
"11071",
),
/*
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
*/
}
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal("failed to enqueue", err)
}
var got Models
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
t.Fatal("failed to syn", err)
} else if m := MustDeserialize(b2); false {
} else if err := json.Unmarshal(b2, &got); err != nil {
t.Fatal("failed to parse outqueue:", err)
} else {
assert.DeepEqual(t, want, m)
want.Event.Updated = 0
want.Message.Updated = 0
want.Thread.Updated = 0
got.Event.Updated = 0
got.Message.Updated = 0
got.Thread.Updated = 0
assert.DeepEqual(t, want, got)
}
}
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
slackMessage slackMessage
message parsedSlackMessage
}{
"human_thread_message_from_opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712930706,
Event: slackEvent{
ID: "1712930706.598629",
Channel: "C06U1DDBBU4",
ParentID: "1712927439.728409",
Text: "I gotta do this",
Blocks: []slackBlock{{
Elements: []slackElement{{
Elements: []slackElement{{
RichText: "I gotta do this",
}},
}},
}},
Bot: slackBot{
Name: "",
},
Attachments: []slackAttachment{},
},
},
message: parsedSlackMessage{
ID: "1712927439.728409/1712930706",
TS: 1712930706,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "",
Plaintext: "I gotta do this",
Asset: "",
},
},
"opsgenie_alert.json": {
slackMessage: slackMessage{
TS: 1712927439,
Event: slackEvent{
ID: "1712927439.728409",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "F4511E",
Title: "#11071: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{{}, {}, {}},
}},
},
},
message: parsedSlackMessage{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "Alertconfig Workflow Failed",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
"opsgenie_alert_resolved.json": {
slackMessage: slackMessage{
TS: 1712916339,
Event: slackEvent{
ID: "1712916339.000300",
Channel: "C06U1DDBBU4",
Bot: slackBot{
Name: "Opsgenie for Alert Management",
},
Attachments: []slackAttachment{{
Color: "2ecc71",
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Fields: []slackField{
{Value: "P3", Title: "Priority"},
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
},
Actions: []slackAction{},
}},
},
},
message: parsedSlackMessage{
ID: "1712916339.000300/1712916339",
TS: 1712916339,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
Channel: "C06U1DDBBU4",
Thread: "1712916339.000300",
EventName: "Alertconfig Workflow Failed",
Event: "11069",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
"reingested_alert.json": {
message: parsedSlackMessage{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives//p1712892637037639",
//Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
},
},
}
for name, d := range cases {
want := d
t.Run(name, func(t *testing.T) {
b, err := os.ReadFile(path.Join("testdata", "slack_events", name))
if err != nil {
t.Fatal(err)
}
t.Run("parseSlack", func(t *testing.T) {
got, err := parseSlack(b)
if err != nil {
t.Fatal(err)
}
if got != want.message {
assert.DeepEqual(t, want.message, got)
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
}
if time := got.Time(); time.Unix() != int64(got.TS) {
t.Error("not unix time", got.TS, time)
}
})
})
}
}

159
storage.go Normal file
View File

@@ -0,0 +1,159 @@
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/breel-render/spoc-bot-vr/model"
)
type Storage struct {
driver Driver
}
func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS events (ID TEXT UNIQUE);
CREATE TABLE IF NOT EXISTS messages (ID TEXT UNIQUE);
CREATE TABLE IF NOT EXISTS threads (ID TEXT UNIQUE);
`); err != nil {
return Storage{}, err
}
for table, v := range map[string]any{
"events": model.Event{},
"messages": model.Message{},
"threads": model.Thread{},
} {
b, _ := json.Marshal(v)
var m map[string]struct{}
json.Unmarshal(b, &m)
for k := range m {
if k == `ID` {
continue
}
driver.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s TEXT`, table, k))
}
}
return Storage{driver: driver}, nil
}
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
v := model.Event{}
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
return s.upsert(ctx, "events", event)
}
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
v := model.Message{}
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error {
return s.upsert(ctx, "messages", message)
}
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
v := model.Thread{}
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
return v, err
}
func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
return s.upsert(ctx, "threads", thread)
}
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
if questions := strings.Count(clause, "?"); questions != len(args) {
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
}
keys, _, _, _, err := keysArgsKeyargsValues(v)
if err != nil {
return err
}
for i := range args {
args[i], _ = json.Marshal(args[i])
}
q := fmt.Sprintf(`
SELECT %s FROM %s WHERE %s
`, strings.Join(keys, ", "), table, clause)
row := s.driver.QueryRowContext(ctx, q, args...)
if err := row.Err(); err != nil {
return err
}
scanTargets := make([]any, len(keys))
for i := range scanTargets {
scanTargets[i] = &[]byte{}
}
if err := row.Scan(scanTargets...); err != nil {
return err
}
m := map[string]json.RawMessage{}
for i, k := range keys {
m[k] = *scanTargets[i].(*[]byte)
}
b, _ := json.Marshal(m)
return json.Unmarshal(b, v)
}
func (s Storage) upsert(ctx context.Context, table string, v any) error {
keys, args, keyArgs, values, err := keysArgsKeyargsValues(v)
if err != nil || len(keys) == 0 {
return err
}
for i := range keys {
values = append(values, values[i])
}
q := fmt.Sprintf(`
INSERT INTO %s (%s) VALUES (%s)
ON CONFLICT (ID) DO UPDATE SET %s
`, table, strings.Join(keys, ", "), strings.Join(args, ", "), strings.Join(keyArgs, ", "))
if result, err := s.driver.ExecContext(ctx, q, values...); err != nil {
return err
} else if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("UpsertMessage affected %v rows", n)
}
return nil
}
func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
b, _ := json.Marshal(v)
var m map[string]json.RawMessage
err := json.Unmarshal(b, &m)
keys := []string{}
for k := range m {
keys = append(keys, k)
}
args := make([]string, len(keys))
for i := range args {
args[i] = "?"
}
keyArgs := make([]string, len(keys))
for i := range keyArgs {
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
}
values := make([]any, len(keys))
for i := range values {
values[i] = []byte(m[keys[i]])
}
return keys, args, keyArgs, values, err
}

89
storage_test.go Normal file
View File

@@ -0,0 +1,89 @@
package main
import (
"context"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Minute)
defer can()
s, err := NewStorage(ctx, NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
t.Run("upsert get event", func(t *testing.T) {
m := model.NewEvent(
"ID",
"URL",
1,
"Name",
"Asset",
"Datacenter",
"Team",
true,
)
if err := s.UpsertEvent(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertEvent(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetEvent(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
t.Run("upsert get thread", func(t *testing.T) {
m := model.NewThread(
"ID",
"URL",
1,
"Channel",
"EventID",
)
if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertThread(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetThread(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
t.Run("upsert get message", func(t *testing.T) {
m := model.NewMessage(
"ID",
"URL",
1,
"Author",
"Plaintext",
"ThreadID",
)
if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on insert:", err)
} else if err := s.UpsertMessage(ctx, m); err != nil {
t.Fatal("unexpected error on noop update:", err)
}
if got, err := s.GetMessage(ctx, m.ID); err != nil {
t.Fatal("unexpected error on get:", err)
} else if got != m {
t.Fatal("unexpected result from get:", got)
}
})
}