Compare commits

...

26 Commits

Author SHA1 Message Date
Bel LaPointe
83c0ee3f53 ok report still botched but im werkin on it 2024-04-15 16:13:31 -06:00
Bel LaPointe
9d7a175c62 at least main_test runs 2024-04-15 16:12:41 -06:00
Bel LaPointe
1dcffdd956 ew compile errs 2024-04-15 16:04:12 -06:00
Bel LaPointe
580068d98b revive message and test slack pipeline parses slack into message 2024-04-15 15:57:56 -06:00
Bel LaPointe
eec5c39725 go mod tidy 2024-04-15 15:50:29 -06:00
Bel LaPointe
9848492b1e no test driver non driver things 2024-04-15 15:50:18 -06:00
Bel LaPointe
a674022357 revive ai, config*.go 2024-04-15 15:49:48 -06:00
Bel LaPointe
80df07089f rename ingest to pipeline 2024-04-15 15:22:23 -06:00
Bel LaPointe
d792626c2f test ingest 2024-04-15 15:18:03 -06:00
Bel LaPointe
acac2a60b0 finish ingest loop 2024-04-15 14:19:53 -06:00
Bel LaPointe
eef78d6e39 noop queue and topics embedded 2024-04-15 14:18:39 -06:00
Bel LaPointe
42c5b7d7ad multi topic done 2024-04-15 13:30:35 -06:00
Bel LaPointe
e85a2d25a1 queue from Dequeue to Syn for SynAck 2024-04-15 13:18:14 -06:00
Bel LaPointe
8193bf7377 f sql jeez 2024-04-15 13:08:21 -06:00
Bel LaPointe
2f3739b24f functions are good 2024-04-15 09:14:59 -06:00
Bel LaPointe
d38352f050 ooooo it is pretty 2024-04-15 09:09:09 -06:00
Bel LaPointe
ba833fa315 this is getting k 2024-04-15 08:45:20 -06:00
Bel LaPointe
d7cbcb9926 d3js not for me 2024-04-15 08:23:10 -06:00
Bel LaPointe
961be827d0 log less 2024-04-15 07:35:39 -06:00
Bel LaPointe
6fbafe6700 PUT /api/v1/rpc/scrapeslack 2024-04-15 07:34:30 -06:00
Bel LaPointe
7df7528ccf can parse slack messages from scraping channel history too 2024-04-15 07:34:21 -06:00
Bel LaPointe
a91da082c7 no max width on report.tmpl 2024-04-15 06:52:27 -06:00
Bel LaPointe
af2ad44109 remove debug console.log 2024-04-15 06:51:21 -06:00
Bel LaPointe
cabc5c00b7 dynamic alert dump via filters 2024-04-15 06:50:41 -06:00
Bel LaPointe
84dec31e53 more filter fields 2024-04-15 06:00:28 -06:00
Bel LaPointe
f2a23e5d8a extract named Pattern result if a group is named 2024-04-15 05:55:30 -06:00
23 changed files with 872 additions and 856 deletions

227
.report.tmpl Normal file
View File

@@ -0,0 +1,227 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
<script src="https://code.highcharts.com/10/highcharts.js"></script>
<script type="module">
const allMessages = {{ json "Marshal" .messages }};
function fillForm() {
const filterableFields = [
"Asset",
"Channel",
"Event",
"EventName",
"Resolved",
"Thread",
];
const fieldsToOptions = {};
filterableFields.map((field) => {fieldsToOptions[field] = {}});
allMessages.map((message) => {
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
});
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
return `
<label for="${field}">${field}</label>
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
${fieldsToOptions[field].map((option) => `
<option selected>${option}</option>
`)}
</select>
`
}).join("\n");
}
window.fillForm = fillForm;
function drawAll() {
const messages = filterMessages(allMessages)
dumpEvents(messages);
drawEventVolume(messages)
drawEventVolumeByHour(messages)
drawEventVolumeByAsset(messages)
}
window.drawAll = drawAll;
function dumpEvents(messages) {
const eventToThreads = {};
for(var m of messages) {
if (!eventToThreads[m.Event])
eventToThreads[m.Event] = [];
eventToThreads[m.Event].push(m.Thread);
}
const threadToMessages = {};
for(var m of messages) {
if (!threadToMessages[m.Thread])
threadToMessages[m.Thread] = [];
threadToMessages[m.Thread].push(m);
}
const eventToMessages = {};
for(var e in eventToThreads) {
if (!eventToMessages[e])
eventToMessages[e] = [];
for (var thread of eventToThreads[e])
eventToMessages[e] = eventToMessages[e].concat(threadToMessages[thread]);
}
for(var e in eventToMessages)
eventToMessages[e].sort((a, b) => a.TS - b.TS);
var events = Object.keys(eventToMessages);
events.sort();
events.reverse();
var keys = ["TS", "Event", "EventName", "Latest"];
document.getElementById("events").innerHTML = `
<tr>
<th>TS</th>
<th>Event</th>
<th>EventName</th>
<th>Latest</th>
</tr>
${events.map((e) => `
<tr>
<td><a href="${eventToMessages[e][0].Source}">${new Date(eventToMessages[e][0].TS * 1000).toDateString()}</a></td>
<td><a href="${eventToMessages[e][0].Source}">${eventToMessages[e][0].Event}</a></td>
<td>${eventToMessages[e][0].EventName}</td>
<td><a href="${eventToMessages[e].at(-1).Source}">${eventToMessages[e].at(-1).Plaintext}</a></td>
</tr>
`).join("")}
`;
}
function filterMessages(messages) {
const selects = document.getElementById("form").getElementsByTagName("select");
const fieldsToOptions = {};
for(var select of selects) {
fieldsToOptions[select.name] = [];
for(var option of select.getElementsByTagName("option"))
if (option.selected)
fieldsToOptions[select.name].push(option.innerHTML);
}
return messages.map((m) => {
for(var k in fieldsToOptions) {
if (fieldsToOptions[k].filter((v) => `${v}` == `${m[k]}`).length == 0) {
return null;
}
}
return m;
}).filter((m) => { return m != null });
}
function drawEventVolume(messages) {
drawEventVolumeWith(
messages,
"eventVolume",
(ts) => new Date(1000 * ts).
toLocaleDateString('en-US', {month: 'numeric', day: 'numeric', weekday: 'short'}),
(m) => m.EventName,
);
}
function drawEventVolumeWith(messages, documentId, kify, nameify) {
const points = [];
messages.forEach((m) => {
points.push({x: m.TS, name: nameify(m)});
});
var xs = points.map((point) => point.x);
if (xs && !isNaN(parseFloat(kify(xs[0])))) {
xs = xs.map(kify);
xs.sort((a, b) => parseFloat(a) - parseFloat(b));
} else {
xs.sort();
xs = xs.map(kify);
}
xs = [...new Set(xs)];
const names = [...new Set(points.map((p) => p.name))];
const nameAndData = names.map((name) => {
return {
name: name,
data: xs.map((x) => points.filter((p) => { return p.name == name && kify(p.x) == x }).length),
}
});
draw(documentId, xs, nameAndData);
}
function drawEventVolumeByHour(messages) {
drawEventVolumeWith(
messages,
"eventVolumeByHour",
(ts) => new Date(1000 * ts).getHours(),
(m) => m.EventName,
);
}
function drawEventVolumeByAsset(messages) {}
function draw(documentId, xs, nameAndData) {
document.getElementById(documentId).innerHTML = "";
Highcharts.chart(documentId, {
chart: { type: 'column' },
title: { text: '' },
xAxis: { categories: xs },
yAxis: { allowDecimals: false, title: { text: '' } },
//legend: { enabled: false },
series: nameAndData,
plotOptions: { column: { stacking: 'normal' } },
});
}
</script>
<style>
rows {
display: flex;
flex-direction: column;
flex-grow: 1;
}
columns {
display: flex;
flex-direction: row;
flex-grow: 1;
}
rows, columns { border: 1px solid red; }
</style>
</head>
<body onload="fillForm(); drawAll();" style="max-width: inherit;">
<h1>Report</h1>
<columns>
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
<columns>
<button type="submit">Apply</button>
</columns>
<rows id="form"></rows>
</form>
<rows>
<rows>
<rows>
<h2>Event Volume</h2>
<div id="eventVolume"></div>
</rows>
<columns>
<rows>
<h3>by Hour</h3>
<div id="eventVolumeByHour"></div>
</rows>
</columns>
<rows>
<h3>by Asset</h3>
<div>DRAW ME</div>
</rows>
</rows>
<rows>
<div>
<h2>Events</h2>
<table id="events">
</table>
</div>
</rows>
</rows>
</columns>
</body>
<footer>
</footer>
</html>

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"regexp" "regexp"
@@ -17,7 +18,7 @@ type Config struct {
InitializeSlack bool InitializeSlack bool
SlackToken string SlackToken string
SlackChannels []string SlackChannels []string
PostgresConn string DriverConn string
BasicAuthUser string BasicAuthUser string
BasicAuthPassword string BasicAuthPassword string
FillWithTestdata bool FillWithTestdata bool
@@ -28,12 +29,17 @@ type Config struct {
AssetPattern string AssetPattern string
DatacenterPattern string DatacenterPattern string
EventNamePattern string EventNamePattern string
storage Storage
queue Queue
driver Driver driver Driver
ai AI ai AI
slackToMessagePipeline Pipeline
} }
var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
)
func newConfig(ctx context.Context) (Config, error) { func newConfig(ctx context.Context) (Config, error) {
return newConfigFromEnv(ctx, os.Getenv) return newConfigFromEnv(ctx, os.Getenv)
} }
@@ -42,9 +48,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
def := Config{ def := Config{
Port: 38080, Port: 38080,
OllamaModel: "gemma:2b", OllamaModel: "gemma:2b",
AssetPattern: `(dpg|svc|red)-[a-z0-9-]*`, AssetPattern: renderAssetPattern,
DatacenterPattern: `[a-z]{4}[a-z]*-[0-9]`, DatacenterPattern: renderDatacenterPattern,
EventNamePattern: `(^\[[^\]]*\] *)?(?P<result>.*)`, EventNamePattern: renderEventNamePattern,
} }
var m map[string]any var m map[string]any
@@ -98,23 +104,18 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, err return Config{}, err
} }
result.driver = NewRAM()
if result.PostgresConn != "" {
ctx, can := context.WithTimeout(ctx, time.Second*10) ctx, can := context.WithTimeout(ctx, time.Second*10)
defer can() defer can()
pg, err := NewPostgres(ctx, result.PostgresConn) driver, err := NewDriver(ctx, result.DriverConn)
if err != nil { if err != nil {
return Config{}, err return Config{}, err
} }
result.driver = pg result.driver = driver
if !result.FillWithTestdata {
//} else if err := result.driver.FillWithTestdata(ctx, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
} else {
return Config{}, errors.New("not impl")
} }
if result.FillWithTestdata {
if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
return Config{}, err
}
}
result.storage = NewStorage(result.driver)
result.queue = NewQueue(result.driver)
if result.OllamaURL != "" { if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel) result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
@@ -124,5 +125,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.ai = NewAINoop() result.ai = NewAINoop()
} }
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackToMessagePipeline = slackToMessagePipeline
return result, nil return result, nil
} }

303
driver.go
View File

@@ -5,25 +5,44 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "net/url"
"os"
"path"
"sync"
"time"
"go.etcd.io/bbolt"
_ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
type Driver interface { type Driver struct {
Close() error *sql.DB
ForEach(context.Context, string, func(string, []byte) error) error
Get(context.Context, string, string) ([]byte, error)
Set(context.Context, string, string, []byte) error
} }
func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern, eventNamePattern string) error { func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {
conn = ":memory:"
} else {
if u, err := url.Parse(conn); err != nil {
return Driver{}, err
} else if u.Scheme != "" {
engine = u.Scheme
}
}
db, err := sql.Open(engine, conn)
if err != nil {
return Driver{}, err
}
driver := Driver{DB: db}
if err := driver.setup(ctx); err != nil {
driver.Close()
return Driver{}, fmt.Errorf("failed setup: %w", err)
}
return driver, nil
}
/*
func (driver Driver) FillWithTestdata(ctx context.Context, assetPattern, datacenterPattern, eventNamePattern string) error {
d := "./testdata/slack_events" d := "./testdata/slack_events"
entries, err := os.ReadDir(d) entries, err := os.ReadDir(d)
if err != nil { if err != nil {
@@ -49,61 +68,17 @@ func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacent
} }
return nil return nil
} }
*/
type Postgres struct { func (driver Driver) setup(ctx context.Context) error {
db *sql.DB _, err := driver.ExecContext(ctx, `
DROP TABLE IF EXISTS spoc_bot_vr_q;
DROP TABLE IF EXISTS spoc_bot_vr_messages;
`)
return err
} }
func NewPostgres(ctx context.Context, conn string) (Postgres, error) { func (d Driver) table(s string) (string, error) {
db, err := sql.Open("postgres", conn)
if err != nil {
return Postgres{}, err
}
pg := Postgres{db: db}
if err := pg.setup(ctx); err != nil {
pg.Close()
return Postgres{}, fmt.Errorf("failed setup: %w", err)
}
return pg, nil
}
func (pg Postgres) setup(ctx context.Context) error {
tableQ, err := pg.table("q")
if err != nil {
return err
}
tableM, err := pg.table("m")
if err != nil {
return err
}
if _, err := pg.db.ExecContext(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
`, tableQ,
tableM,
tableQ, tableQ,
tableQ, tableQ,
tableM, tableM,
tableM, tableM,
)); err != nil {
return err
}
return nil
}
func (pg Postgres) table(s string) (string, error) {
switch s { switch s {
case "q": case "q":
return "spoc_bot_vr_q", nil return "spoc_bot_vr_q", nil
@@ -112,201 +87,3 @@ func (pg Postgres) table(s string) (string, error) {
} }
return "", errors.New("invalid table " + s) return "", errors.New("invalid table " + s)
} }
func (pg Postgres) Close() error {
return pg.db.Close()
}
func (pg Postgres) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
table, err := pg.table(ns)
if err != nil {
return err
}
rows, err := pg.db.QueryContext(ctx, fmt.Sprintf(`SELECT id, v FROM %s;`, table))
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id string
var v []byte
if err := rows.Scan(&id, &v); err != nil {
return err
} else if err := cb(id, v); err != nil {
return err
}
}
return ctx.Err()
}
func (pg Postgres) Get(ctx context.Context, ns, id string) ([]byte, error) {
table, err := pg.table(ns)
if err != nil {
return nil, err
}
row := pg.db.QueryRowContext(ctx, fmt.Sprintf(`SELECT v FROM %s WHERE id='%s';`, table, id))
if err := row.Err(); err != nil {
return nil, err
}
var v []byte
if err := row.Scan(&v); err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
return v, nil
}
func (pg Postgres) Set(ctx context.Context, ns, id string, v []byte) error {
table, err := pg.table(ns)
if err != nil {
return err
}
if v == nil {
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE id='%s';`, table, id))
return err
}
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s (id, v) VALUES ('%s', '%s') ON CONFLICT (id) DO UPDATE SET v = '%s'`, table, id, v, v))
return err
}
type RAM struct {
m map[string]map[string][]byte
lock *sync.RWMutex
}
func NewRAM() RAM {
return RAM{
m: make(map[string]map[string][]byte),
lock: &sync.RWMutex{},
}
}
func (ram RAM) Close() error {
return nil
}
func (ram RAM) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
ram.lock.RLock()
defer ram.lock.RUnlock()
for k, v := range ram.m[ns] {
if ctx.Err() != nil {
break
}
if err := cb(k, v); err != nil {
return err
}
}
return ctx.Err()
}
func (ram RAM) Get(_ context.Context, ns, id string) ([]byte, error) {
ram.lock.RLock()
defer ram.lock.RUnlock()
if _, ok := ram.m[ns]; !ok {
return nil, nil
}
return ram.m[ns][id], nil
}
func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.lock.Lock()
defer ram.lock.Unlock()
if _, ok := ram.m[ns]; !ok {
ram.m[ns] = map[string][]byte{}
}
ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil
}
type BBolt struct {
db *bbolt.DB
}
func NewTestDBIn(d string) BBolt {
d, err := ioutil.TempDir(d, "test-db-*")
if err != nil {
panic(err)
}
db, err := NewDB(path.Join(d, "bb"))
if err != nil {
panic(err)
}
return db
}
func NewDB(p string) (BBolt, error) {
db, err := bbolt.Open(p, 0600, &bbolt.Options{
Timeout: time.Second,
})
return BBolt{db: db}, err
}
func (bb BBolt) Close() error {
return bb.db.Close()
}
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
return bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
c := bkt.Cursor()
for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() {
if err := cb(string(k), v); err != nil {
return err
}
}
return ctx.Err()
})
}
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
var b []byte
err := bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
b = bkt.Get([]byte(id))
return nil
})
return b, err
}
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
return bb.db.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
var err error
bkt, err = tx.CreateBucket([]byte(db))
if err != nil {
return err
}
}
if value == nil {
return bkt.Delete([]byte(id))
}
return bkt.Put([]byte(id), value)
})
}

View File

@@ -1,22 +0,0 @@
//go:build postgres
package main
import (
"context"
"os"
"testing"
"time"
)
func TestPostgres(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
conn := os.Getenv("INTEGRATION_POSTGRES_CONN")
pg, err := NewPostgres(ctx, conn)
if err != nil {
t.Fatal(err)
}
testDriver(t, pg)
}

View File

@@ -2,91 +2,17 @@ package main
import ( import (
"context" "context"
"errors"
"io"
"testing" "testing"
"time" "time"
) )
func TestDriverRAM(t *testing.T) { func TestDriver(t *testing.T) {
testDriver(t, NewRAM())
}
func TestFillTestdata(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15) ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can() defer can()
ram := NewRAM() d, err := NewDriver(ctx, "")
if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern); err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir()))
}
func testDriver(t *testing.T, d Driver) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
defer d.Close() defer d.Close()
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from empty:", err)
} else if b != nil {
t.Error("got fake from empty")
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil {
t.Error("cannot set from empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from full:", err)
} else if string(b) != `"hello world"` {
t.Error("got fake from full")
}
if err := d.ForEach(ctx, "m", func(id string, v []byte) error {
if id != "id" {
t.Error("for each id weird:", id)
}
if string(v) != `"hello world"` {
t.Error("for each value weird:", string(v))
}
return io.EOF
}); err != io.EOF {
t.Error("failed to forEach full:", err)
}
if err := d.Set(ctx, "m", "id", nil); err != nil {
t.Error("cannot set from full:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {
t.Error("got fake from deleted")
}
} }

15
go.mod
View File

@@ -3,17 +3,24 @@ module github.com/breel-render/spoc-bot-vr
go 1.22.1 go 1.22.1
require ( require (
github.com/go-errors/errors v1.5.1 github.com/glebarez/go-sqlite v1.21.2
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1 github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8 github.com/tmc/langchaingo v0.1.8
go.etcd.io/bbolt v1.3.9 gotest.tools/v3 v3.5.1
) )
require ( require (
github.com/dlclark/regexp2 v1.10.0 // indirect github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/gage-technologies/mistral-go v1.0.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/go-cmp v0.6.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.23.1 // indirect
) )

32
go.sum
View File

@@ -2,29 +2,45 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE= github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM= github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg= github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA= github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ= github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=

146
main.go
View File

@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
@@ -37,11 +36,25 @@ func run(ctx context.Context, cfg Config) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg):
return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
return err return err
} }
} }
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
errs := make(chan error)
go func() {
defer close(errs)
select {
case errs <- cfg.slackToMessagePipeline.Process(ctx):
case <-ctx.Done():
}
}()
return errs
}
func listenAndServe(ctx context.Context, cfg Config) chan error { func listenAndServe(ctx context.Context, cfg Config) chan error {
s := http.Server{ s := http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port), Addr: fmt.Sprintf(":%d", cfg.Port),
@@ -64,12 +77,13 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc { func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug { if cfg.Debug {
@@ -82,25 +96,62 @@ func newHandler(cfg Config) http.HandlerFunc {
} }
} }
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
var page struct {
OK bool
Messages []json.RawMessage
}
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
} else if !page.OK {
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) { if !basicAuth(cfg, w, r) {
return return
} }
since, err := parseSince(r.URL.Query().Get("since")) http.Error(w, "not impl", http.StatusNotImplemented)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
} }
} }
@@ -110,19 +161,7 @@ func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return return
} }
since, err := parseSince(r.URL.Query().Get("since")) http.Error(w, "not impl", http.StatusNotImplemented)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := cfg.storage.EventsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"events": events})
} }
} }
@@ -132,19 +171,7 @@ func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return return
} }
since, err := parseSince(r.URL.Query().Get("since")) http.Error(w, "not impl", http.StatusNotImplemented)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
messages, err := cfg.storage.MessagesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"messages": messages})
} }
} }
@@ -154,19 +181,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return return
} }
since, err := parseSince(r.URL.Query().Get("since")) http.Error(w, "not impl", http.StatusNotImplemented)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"threads": threads})
} }
} }
@@ -177,14 +192,9 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
} }
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
_ = thread
messages, err := cfg.storage.Thread(r.Context(), thread) http.Error(w, "not impl", http.StatusNotImplemented)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"thread": messages})
} }
} }
@@ -246,20 +256,12 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return return
} }
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
if errors.Is(err, ErrIrrelevantMessage) { log.Printf("failed to ingest: %v", err)
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
log.Printf("ingested %v", m.ID) log.Printf("ingested")
} }
} }

View File

@@ -39,10 +39,12 @@ func TestRun(t *testing.T) {
u := fmt.Sprintf("http://localhost:%d", port) u := fmt.Sprintf("http://localhost:%d", port)
cfg := Config{} cfg := Config{}
cfg.DatacenterPattern = renderDatacenterPattern
cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port cfg.Port = port
cfg.driver = NewRAM() cfg.driver, _ = NewDriver(ctx, "")
cfg.storage = NewStorage(cfg.driver) cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.queue = NewQueue(cfg.driver)
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}
@@ -120,7 +122,7 @@ func TestRun(t *testing.T) {
} }
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err) t.Fatal(err)
} else if result.EventNames[0] != "[Oregon-1] Wal Receive Count Alert" { } else if result.EventNames[0] != "Wal Receive Count Alert" {
t.Fatal(result.EventNames) t.Fatal(result.EventNames)
} else { } else {
t.Logf("%+v", result) t.Logf("%+v", result)
@@ -215,14 +217,17 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b) t.Fatalf("(%d) %s", resp.StatusCode, b)
} }
dec := csv.NewReader(resp.Body) b, _ := io.ReadAll(resp.Body)
t.Logf("whole csv: \n%s", b)
dec := csv.NewReader(bytes.NewReader(b))
var lastLine []string var lastLine []string
for { for {
line, err := dec.Read() line, err := dec.Read()
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {
t.Error(err) t.Error("unexpected error while reading csv line:", err)
} }
if lastLine == nil { if lastLine == nil {
@@ -230,7 +235,7 @@ func TestRun(t *testing.T) {
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line)) t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
} }
t.Logf("%+v", line) t.Logf("CSV line: %+v", line)
lastLine = line lastLine = line
} }
if lastLine == nil { if lastLine == nil {

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"regexp" "regexp"
"strconv"
"strings" "strings"
"time" "time"
) )
@@ -59,8 +60,11 @@ func Deserialize(b []byte) (Message, error) {
type ( type (
slackMessage struct { slackMessage struct {
slackEvent
Type string
TS uint64 `json:"event_time"` TS uint64 `json:"event_time"`
Event slackEvent Event slackEvent
MessageTS string `json:"ts"`
} }
slackEvent struct { slackEvent struct {
@@ -108,28 +112,43 @@ type (
) )
func ParseSlack(b []byte, assetPattern, datacenterPattern, eventNamePattern string) (Message, error) { func ParseSlack(b []byte, assetPattern, datacenterPattern, eventNamePattern string) (Message, error) {
m, err := parseSlackJSON(b) return ParseSlackFromChannel(b, assetPattern, datacenterPattern, eventNamePattern, "")
}
func ParseSlackFromChannel(b []byte, assetPattern, datacenterPattern, eventNamePattern string, ch string) (Message, error) {
m, err := parseSlackJSON(b, ch)
if err != nil { if err != nil {
return Message{}, err return Message{}, err
} }
asset := regexp.MustCompile(assetPattern) for pattern, ptr := range map[string]*string{
datacenter := regexp.MustCompile(datacenterPattern) assetPattern: &m.Asset,
eventName := regexp.MustCompile(eventNamePattern) datacenterPattern: &m.Datacenter,
eventNamePattern: &m.EventName,
m.Asset = asset.FindString(m.Asset) } {
m.Datacenter = datacenter.FindString(m.Datacenter) r := regexp.MustCompile(pattern)
m.EventName = eventName.FindString(m.EventName) parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
}
return m, nil return m, nil
} }
func parseSlackJSON(b []byte) (Message, error) { func parseSlackJSON(b []byte, ch string) (Message, error) {
s, err := _parseSlackJSON(b) s, err := _parseSlackJSON(b)
if err != nil { if err != nil {
return Message{}, err return Message{}, err
} }
if ch != "" {
s.Event.Channel = ch
}
if s.Event.Bot.Name != "" { if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 { if len(s.Event.Attachments) == 0 {
return Message{}, ErrIrrelevantMessage return Message{}, ErrIrrelevantMessage
@@ -177,6 +196,12 @@ func parseSlackJSON(b []byte) (Message, error) {
func _parseSlackJSON(b []byte) (slackMessage, error) { func _parseSlackJSON(b []byte) (slackMessage, error) {
var result slackMessage var result slackMessage
err := json.Unmarshal(b, &result) err := json.Unmarshal(b, &result)
switch result.Type {
case "message":
result.Event = result.slackEvent
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
result.Event.ID = result.MessageTS
}
if result.Event.Nested != nil && !result.Event.Nested.Empty() { if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot result.Event.Bot = result.Event.Nested.Bot

View File

@@ -1,20 +1,14 @@
package main package main
import ( import (
"fmt"
"os" "os"
"path" "path"
"testing" "testing"
) )
var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(^\[[^\]]*\] *)?(?P<result>.*)`
)
func TestParseSlackTestdata(t *testing.T) { func TestParseSlackTestdata(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
inCh string
slackMessage slackMessage slackMessage slackMessage
message Message message Message
}{ }{
@@ -120,6 +114,21 @@ func TestParseSlackTestdata(t *testing.T) {
Resolved: true, Resolved: true,
}, },
}, },
"reingested_alert.json": {
inCh: "C06U1DDBBU4",
message: Message{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
} }
for name, d := range cases { for name, d := range cases {
@@ -130,18 +139,8 @@ func TestParseSlackTestdata(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
t.Run("parseSlackJSON", func(t *testing.T) { t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
got, err := parseSlackJSON(b) got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
if err != nil {
t.Fatal(err)
}
if fmt.Sprintf("%+v", got) != fmt.Sprintf("%+v", want.slackMessage) {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.slackMessage, got)
}
})
t.Run("ParseSlack", func(t *testing.T) {
got, err := ParseSlack(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

44
pipeline.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import "context"
type (
Pipeline struct {
writer Queue
reader Queue
process processFunc
}
processFunc func(context.Context, []byte) ([]byte, error)
)
func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
return Pipeline{
writer: writer,
reader: reader,
process: process,
}
}
func (p Pipeline) Process(ctx context.Context) error {
ctx, can := context.WithCancel(ctx)
defer can()
for ctx.Err() == nil {
reservation, read, err := p.reader.Syn(ctx)
if err != nil {
return err
}
processed, err := p.process(ctx, read)
if err != nil {
return err
}
if err := p.writer.Enqueue(ctx, processed); err != nil {
return err
}
if err := p.reader.Ack(ctx, reservation); err != nil {
return err
}
}
return ctx.Err()
}

52
pipeline_test.go Normal file
View File

@@ -0,0 +1,52 @@
package main
import (
"context"
"testing"
"time"
)
func TestPipeline(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driverOutput, _ := NewDriver(ctx, "")
output, err := NewQueue(ctx, "output", driverOutput)
if err != nil {
t.Fatal(err)
}
driverInput, _ := NewDriver(ctx, "")
input, err := NewQueue(ctx, "input", driverInput)
if err != nil {
t.Fatal(err)
}
found := map[string]struct{}{}
process := func(_ context.Context, v []byte) ([]byte, error) {
found[string(v)] = struct{}{}
return []byte("world"), nil
}
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
t.Error(err)
}
ing := NewPipeline(output, input, process)
go func() {
defer can()
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
if r, p, err := output.Syn(ctx); err != nil {
t.Error(err)
} else if string(p) != "world" {
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
} else if err := output.Ack(ctx, r); err != nil {
t.Error(err)
}
if len(found) != 1 {
t.Error(found)
}
}

117
queue.go
View File

@@ -2,57 +2,118 @@ package main
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/go-errors/errors" "github.com/google/uuid"
) )
type Queue struct { type Queue struct {
driver Driver driver Driver
topic string
} }
func NewQueue(driver Driver) Queue { func NewNoopQueue() Queue {
return Queue{driver: driver} return Queue{}
} }
func (q Queue) Push(ctx context.Context, m Message) error { func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
return q.driver.Set(ctx, "q", m.ID, m.Serialize()) if _, err := driver.ExecContext(ctx, `
} CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY,
func (q Queue) PeekFirst(ctx context.Context) (Message, error) { topic TEXT NOT NULL,
for { updated INTEGER NOT NULL,
m, err := q.peekFirst(ctx) reservation TEXT,
if err != nil { payload TEXT
return m, err );
`); err != nil {
return Queue{}, fmt.Errorf("failed to create table: %w", err)
} }
return Queue{topic: topic, driver: driver}, nil
}
if !m.Empty() { func (q Queue) Enqueue(ctx context.Context, b []byte) error {
return m, nil if q.driver.DB == nil {
return nil
}
_, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
`,
q.topic,
time.Now().Unix(),
b,
)
return err
}
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
if q.driver.DB == nil {
return "", nil, nil
}
for {
reservation, m, err := q.syn(ctx)
if reservation != nil || err != nil {
return string(reservation), m, err
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
return Message{}, ctx.Err() return "", nil, ctx.Err()
case <-time.After(time.Second): case <-time.After(time.Second):
} }
} }
} }
func (q Queue) Ack(ctx context.Context, id string) error { func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
return q.driver.Set(ctx, "q", id, nil) now := time.Now().Unix()
reservation := []byte(uuid.New().String())
var payload []byte
if result, err := q.driver.ExecContext(ctx, `
UPDATE queue
SET
updated = ?, reservation = ?
WHERE
id IN (
SELECT id
FROM queue
WHERE
topic == ?
AND (
reservation IS NULL
OR ? - updated > 60
)
LIMIT 1
)
`, now, reservation, q.topic, now); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
} else if n, err := result.RowsAffected(); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
} else if n == 0 {
return nil, nil, nil
}
row := q.driver.QueryRowContext(ctx, `
SELECT payload
FROM queue
WHERE reservation==?
LIMIT 1
`, reservation)
if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
}
return reservation, payload, nil
} }
func (q Queue) peekFirst(ctx context.Context) (Message, error) { func (q Queue) Ack(ctx context.Context, reservation string) error {
var m Message if q.driver.DB == nil {
subctx, subcan := context.WithCancel(ctx)
defer subcan()
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
m = MustDeserialize(value)
subcan()
return nil return nil
})
if errors.Is(err, subctx.Err()) {
err = nil
} }
return m, err _, err := q.driver.ExecContext(ctx, `
DELETE FROM queue
WHERE reservation==?
`, reservation)
return err
} }

View File

@@ -11,24 +11,64 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
q := NewQueue(NewRAM()) driver, _ := NewDriver(ctx, "")
q, err := NewQueue(ctx, "", driver)
for i := 0; i < 39; i++ { if err != nil {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil { t.Fatal(err)
t.Fatal(i, err)
}
} }
qOther, _ := NewQueue(ctx, "other", driver)
found := map[uint64]struct{}{} if reservation, _, err := q.syn(ctx); reservation != nil {
for i := 0; i < 39; i++ { t.Errorf("able to syn before any enqueues created: %v", err)
if m, err := q.PeekFirst(ctx); err != nil {
t.Fatal(i, err)
} else if _, ok := found[m.TS]; ok {
t.Error(i, m.TS)
} else if err := q.Ack(ctx, m.ID); err != nil {
t.Fatal(i, err)
} else { } else {
found[m.TS] = struct{}{} t.Logf("sync before enqueues: %v", err)
}
t.Run("enqueue", func(t *testing.T) {
for i := 0; i < 39; i++ {
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
t.Fatal(i, err)
} }
} }
})
if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil {
t.Fatal(err)
}
t.Run("syn ack", func(t *testing.T) {
found := map[string]struct{}{}
for i := 0; i < 39; i++ {
if reservation, b, err := q.Syn(ctx); err != nil {
t.Fatal(i, "syn err", err)
} else if _, ok := found[string(b)]; ok {
t.Errorf("syn'd %q twice (%+v)", b, found)
} else if err := q.Ack(ctx, reservation); err != nil {
t.Fatal(i, "failed to ack", err)
} else {
found[string(b)] = struct{}{}
}
}
})
if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn 1 more message than created: %v", err)
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
t.Errorf("unable to syn from other topic: %v", err)
} else {
t.Logf("empty q.syn = %v", err)
}
t.Run("noop", func(t *testing.T) {
q := NewNoopQueue()
if err := q.Enqueue(nil, nil); err != nil {
t.Error(err)
}
if _, _, err := q.Syn(nil); err != nil {
t.Error(err)
}
if err := q.Ack(nil, ""); err != nil {
t.Error(err)
}
})
} }

View File

@@ -1,130 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
<script type="module">
import * as d3 from "https://cdn.jsdelivr.net/npm/d3@7/+esm";
</script>
<script>
const allMessages = {{ json "Marshal" .messages }};
console.log(allMessages);
function fillForm() {
const filterableFields = [
"EventName",
"Asset",
];
const fieldsToOptions = {};
filterableFields.map((field) => {fieldsToOptions[field] = {}});
allMessages.map((message) => {
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
});
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
return `
<label for="${field}">${field}</label>
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
${fieldsToOptions[field].map((option) => `
<option selected>${option}</option>
`)}
</select>
`
}).join("\n");
}
function drawAll() {
const messages = filterMessages(allMessages)
drawEventVolumeByName(messages)
drawEventVolumeByWeekday(messages)
drawEventVolumeByHour(messages)
drawEventVolumeByAsset(messages)
}
function filterMessages(messages) {
const filters = document.getElementById("form");
console.log(filters);
return messages.map(() => {
});
}
function drawEventVolumeByName() {}
function drawEventVolumeByWeekday() {}
function drawEventVolumeByHour() {}
function drawEventVolumeByAsset() {}
</script>
<style>
rows {
display: flex;
flex-direction: column;
flex-grow: 1;
}
columns {
display: flex;
flex-direction: row;
flex-grow: 1;
}
rows, columns { border: 1px solid red; }
</style>
</head>
<body onload="fillForm(); drawAll();">
<h1>Report</h1>
<columns>
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
<columns>
<button type="submit">Apply</button>
</columns>
<rows id="form"></rows>
</form>
<rows>
<rows>
<rows>
<h2>Event Volume by Name</h2>
<div>DRAW ME</div>
</rows>
<columns>
<rows>
<h3>by Weekday</h3>
<div>DRAW ME</div>
</rows>
<rows>
<h3>by Hour</h3>
<div>DRAW ME</div>
</rows>
</columns>
<rows>
<h3>by Asset</h3>
<div>DRAW ME</div>
</rows>
</rows>
<rows>
<div>
<h2>Events</h2>
<table>
<tr>
<th>TS</th>
<th>Event</th>
<th>EventName</th>
<th>Latest</th>
</tr>
{{ range .events.Events }}
<tr>
<td><a href="{{ .First.Source }}">{{ time "Unix" .First.TS | time "Time.Format" "Mon Jan 02" }}</a></td>
<td><a href="https://TODO">{{ .Event }}</a></td>
<td>{{ .First.EventName }}</td>
<td><a href="{{ .Last.Source }}">{{ .Last.Plaintext }}</a></td>
</tr>
{{ end }}
</table>
</div>
</rows>
</rows>
</columns>
</body>
<footer>
</footer>
</html>

36
slack.go Normal file
View File

@@ -0,0 +1,36 @@
package main
import (
"context"
"fmt"
)
type SlackToMessage struct {
pipeline Pipeline
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newSlackToMessageProcess(cfg),
}, nil
}
func newSlackToMessageProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
}
return m.Serialize(), nil
}
}

50
slack_test.go Normal file
View File

@@ -0,0 +1,50 @@
package main
import (
"context"
"os"
"testing"
"time"
"gotest.tools/v3/assert"
)
func TestSlackToMessagePipeline(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
driver, _ := NewDriver(ctx, "/tmp/f")
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
if err != nil {
t.Fatal(err)
}
go func() {
if err := pipeline.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
want := Message{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
}
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal("failed to enqueue", err)
}
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
t.Fatal("failed to syn", err)
} else if m := MustDeserialize(b2); false {
} else {
assert.DeepEqual(t, want, m)
}
}

View File

@@ -1,96 +0,0 @@
package main
import (
"context"
"errors"
"sort"
"time"
)
var (
ErrNotFound = errors.New("not found")
)
type Storage struct {
driver Driver
}
func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
}
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
}
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
}
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
messages, err := s.MessagesSince(ctx, t)
if err != nil {
return nil, err
}
values := map[string]struct{}{}
for _, m := range messages {
values[fielder(m)] = struct{}{}
}
result := make([]string, 0, len(values))
for k := range values {
result = append(result, k)
}
sort.Strings(result)
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "m", id)
if err != nil {
return Message{}, err
}
if b == nil {
return Message{}, ErrNotFound
}
return MustDeserialize(b), nil
}

View File

@@ -1,67 +0,0 @@
package main
import (
"context"
"testing"
"time"
)
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second)
defer can()
t.Run("Threads", func(t *testing.T) {
s := NewStorage(NewRAM())
mX1 := Message{ID: "1", Thread: "X", TS: 1}
mX2 := Message{ID: "2", Thread: "X", TS: 2}
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
for _, m := range []Message{mX1, mX2, mY1} {
if err := s.Upsert(ctx, m); err != nil {
t.Fatal(err)
}
}
if threads, err := s.Threads(ctx); err != nil {
t.Error(err)
} else if len(threads) != 2 {
t.Error(threads)
} else if threads[0] != "X" {
t.Error(threads, "X")
} else if threads[1] != "Y" {
t.Error(threads, "Y")
}
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
t.Error(err)
} else if len(threads) != 1 {
t.Error(threads)
} else if threads[0] != "Y" {
t.Error(threads[0])
}
})
t.Run("Get Upsert", func(t *testing.T) {
s := NewStorage(NewRAM())
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
t.Error("failed to get 404", err)
}
m := Message{
ID: "id",
TS: 1,
}
if err := s.Upsert(ctx, m); err != nil {
t.Error("failed to upsert", err)
}
if m2, err := s.Get(ctx, "id"); err != nil {
t.Error("failed to get", err)
} else if m != m2 {
t.Error(m2)
}
})
}

View File

@@ -0,0 +1,57 @@
{
"user": "U03RUK7FBUY",
"type": "message",
"ts": "1712892637.037639",
"edited": {
"user": "B03RHGBPH2M",
"ts": "1712896236.000000"
},
"bot_id": "B03RHGBPH2M",
"app_id": "A286WATV2",
"text": "",
"team": "T9RQLQ0KV",
"bot_profile": {
"id": "B03RHGBPH2M",
"app_id": "A286WATV2",
"name": "Opsgenie for Alert Management",
"icons": {
"image_36": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_36.png",
"image_48": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_48.png",
"image_72": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_72.png"
},
"deleted": false,
"updated": 1658887059,
"team_id": "T9RQLQ0KV"
},
"attachments": [
{
"id": 1,
"color": "2ecc71",
"fallback": "\"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfcb2ead-1712892636514|11061>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"title": "#11061: [Grafana]: Firing: Alertconfig Workflow Failed",
"title_link": "https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfc$2ead-1712892636514",
"callback_id": "bbd4a269-08a9-470e-ba79-ce238ac03dc7_05fa2e9b-bec4-4a7e-842d-36043d267a13_11061",
"fields": [
{
"value": "P3",
"title": "Priority",
"short": true
},
{
"value": "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"title": "Tags",
"short": true
},
{
"value": "Datastores Non-Critical",
"title": "Routed Teams",
"short": true
}
],
"mrkdwn_in": [
"text"
]
}
]
}