Compare commits

...

26 Commits

Author SHA1 Message Date
Bel LaPointe
83c0ee3f53 ok report still botched but im werkin on it 2024-04-15 16:13:31 -06:00
Bel LaPointe
9d7a175c62 at least main_test runs 2024-04-15 16:12:41 -06:00
Bel LaPointe
1dcffdd956 ew compile errs 2024-04-15 16:04:12 -06:00
Bel LaPointe
580068d98b revive message and test slack pipeline parses slack into message 2024-04-15 15:57:56 -06:00
Bel LaPointe
eec5c39725 go mod tidy 2024-04-15 15:50:29 -06:00
Bel LaPointe
9848492b1e no test driver non driver things 2024-04-15 15:50:18 -06:00
Bel LaPointe
a674022357 revive ai, config*.go 2024-04-15 15:49:48 -06:00
Bel LaPointe
80df07089f rename ingest to pipeline 2024-04-15 15:22:23 -06:00
Bel LaPointe
d792626c2f test ingest 2024-04-15 15:18:03 -06:00
Bel LaPointe
acac2a60b0 finish ingest loop 2024-04-15 14:19:53 -06:00
Bel LaPointe
eef78d6e39 noop queue and topics embedded 2024-04-15 14:18:39 -06:00
Bel LaPointe
42c5b7d7ad multi topic done 2024-04-15 13:30:35 -06:00
Bel LaPointe
e85a2d25a1 queue from Dequeue to Syn for SynAck 2024-04-15 13:18:14 -06:00
Bel LaPointe
8193bf7377 f sql jeez 2024-04-15 13:08:21 -06:00
Bel LaPointe
2f3739b24f functions are good 2024-04-15 09:14:59 -06:00
Bel LaPointe
d38352f050 ooooo it is pretty 2024-04-15 09:09:09 -06:00
Bel LaPointe
ba833fa315 this is getting k 2024-04-15 08:45:20 -06:00
Bel LaPointe
d7cbcb9926 d3js not for me 2024-04-15 08:23:10 -06:00
Bel LaPointe
961be827d0 log less 2024-04-15 07:35:39 -06:00
Bel LaPointe
6fbafe6700 PUT /api/v1/rpc/scrapeslack 2024-04-15 07:34:30 -06:00
Bel LaPointe
7df7528ccf can parse slack messages from scraping channel history too 2024-04-15 07:34:21 -06:00
Bel LaPointe
a91da082c7 no max width on report.tmpl 2024-04-15 06:52:27 -06:00
Bel LaPointe
af2ad44109 remove debug console.log 2024-04-15 06:51:21 -06:00
Bel LaPointe
cabc5c00b7 dynamic alert dump via filters 2024-04-15 06:50:41 -06:00
Bel LaPointe
84dec31e53 more filter fields 2024-04-15 06:00:28 -06:00
Bel LaPointe
f2a23e5d8a extract named Pattern result if a group is named 2024-04-15 05:55:30 -06:00
23 changed files with 872 additions and 856 deletions

227
.report.tmpl Normal file
View File

@@ -0,0 +1,227 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
<script src="https://code.highcharts.com/10/highcharts.js"></script>
<script type="module">
const allMessages = {{ json "Marshal" .messages }};
function fillForm() {
const filterableFields = [
"Asset",
"Channel",
"Event",
"EventName",
"Resolved",
"Thread",
];
const fieldsToOptions = {};
filterableFields.map((field) => {fieldsToOptions[field] = {}});
allMessages.map((message) => {
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
});
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
return `
<label for="${field}">${field}</label>
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
${fieldsToOptions[field].map((option) => `
<option selected>${option}</option>
`)}
</select>
`
}).join("\n");
}
window.fillForm = fillForm;
function drawAll() {
const messages = filterMessages(allMessages)
dumpEvents(messages);
drawEventVolume(messages)
drawEventVolumeByHour(messages)
drawEventVolumeByAsset(messages)
}
window.drawAll = drawAll;
function dumpEvents(messages) {
const eventToThreads = {};
for(var m of messages) {
if (!eventToThreads[m.Event])
eventToThreads[m.Event] = [];
eventToThreads[m.Event].push(m.Thread);
}
const threadToMessages = {};
for(var m of messages) {
if (!threadToMessages[m.Thread])
threadToMessages[m.Thread] = [];
threadToMessages[m.Thread].push(m);
}
const eventToMessages = {};
for(var e in eventToThreads) {
if (!eventToMessages[e])
eventToMessages[e] = [];
for (var thread of eventToThreads[e])
eventToMessages[e] = eventToMessages[e].concat(threadToMessages[thread]);
}
for(var e in eventToMessages)
eventToMessages[e].sort((a, b) => a.TS - b.TS);
var events = Object.keys(eventToMessages);
events.sort();
events.reverse();
var keys = ["TS", "Event", "EventName", "Latest"];
document.getElementById("events").innerHTML = `
<tr>
<th>TS</th>
<th>Event</th>
<th>EventName</th>
<th>Latest</th>
</tr>
${events.map((e) => `
<tr>
<td><a href="${eventToMessages[e][0].Source}">${new Date(eventToMessages[e][0].TS * 1000).toDateString()}</a></td>
<td><a href="${eventToMessages[e][0].Source}">${eventToMessages[e][0].Event}</a></td>
<td>${eventToMessages[e][0].EventName}</td>
<td><a href="${eventToMessages[e].at(-1).Source}">${eventToMessages[e].at(-1).Plaintext}</a></td>
</tr>
`).join("")}
`;
}
function filterMessages(messages) {
const selects = document.getElementById("form").getElementsByTagName("select");
const fieldsToOptions = {};
for(var select of selects) {
fieldsToOptions[select.name] = [];
for(var option of select.getElementsByTagName("option"))
if (option.selected)
fieldsToOptions[select.name].push(option.innerHTML);
}
return messages.map((m) => {
for(var k in fieldsToOptions) {
if (fieldsToOptions[k].filter((v) => `${v}` == `${m[k]}`).length == 0) {
return null;
}
}
return m;
}).filter((m) => { return m != null });
}
function drawEventVolume(messages) {
drawEventVolumeWith(
messages,
"eventVolume",
(ts) => new Date(1000 * ts).
toLocaleDateString('en-US', {month: 'numeric', day: 'numeric', weekday: 'short'}),
(m) => m.EventName,
);
}
function drawEventVolumeWith(messages, documentId, kify, nameify) {
const points = [];
messages.forEach((m) => {
points.push({x: m.TS, name: nameify(m)});
});
var xs = points.map((point) => point.x);
if (xs && !isNaN(parseFloat(kify(xs[0])))) {
xs = xs.map(kify);
xs.sort((a, b) => parseFloat(a) - parseFloat(b));
} else {
xs.sort();
xs = xs.map(kify);
}
xs = [...new Set(xs)];
const names = [...new Set(points.map((p) => p.name))];
const nameAndData = names.map((name) => {
return {
name: name,
data: xs.map((x) => points.filter((p) => { return p.name == name && kify(p.x) == x }).length),
}
});
draw(documentId, xs, nameAndData);
}
function drawEventVolumeByHour(messages) {
drawEventVolumeWith(
messages,
"eventVolumeByHour",
(ts) => new Date(1000 * ts).getHours(),
(m) => m.EventName,
);
}
function drawEventVolumeByAsset(messages) {}
function draw(documentId, xs, nameAndData) {
document.getElementById(documentId).innerHTML = "";
Highcharts.chart(documentId, {
chart: { type: 'column' },
title: { text: '' },
xAxis: { categories: xs },
yAxis: { allowDecimals: false, title: { text: '' } },
//legend: { enabled: false },
series: nameAndData,
plotOptions: { column: { stacking: 'normal' } },
});
}
</script>
<style>
rows {
display: flex;
flex-direction: column;
flex-grow: 1;
}
columns {
display: flex;
flex-direction: row;
flex-grow: 1;
}
rows, columns { border: 1px solid red; }
</style>
</head>
<body onload="fillForm(); drawAll();" style="max-width: inherit;">
<h1>Report</h1>
<columns>
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
<columns>
<button type="submit">Apply</button>
</columns>
<rows id="form"></rows>
</form>
<rows>
<rows>
<rows>
<h2>Event Volume</h2>
<div id="eventVolume"></div>
</rows>
<columns>
<rows>
<h3>by Hour</h3>
<div id="eventVolumeByHour"></div>
</rows>
</columns>
<rows>
<h3>by Asset</h3>
<div>DRAW ME</div>
</rows>
</rows>
<rows>
<div>
<h2>Events</h2>
<table id="events">
</table>
</div>
</rows>
</rows>
</columns>
</body>
<footer>
</footer>
</html>

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
@@ -12,28 +13,33 @@ import (
)
type Config struct {
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
PostgresConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
storage Storage
queue Queue
driver Driver
ai AI
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
}
var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
)
func newConfig(ctx context.Context) (Config, error) {
return newConfigFromEnv(ctx, os.Getenv)
}
@@ -42,9 +48,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
def := Config{
Port: 38080,
OllamaModel: "gemma:2b",
AssetPattern: `(dpg|svc|red)-[a-z0-9-]*`,
DatacenterPattern: `[a-z]{4}[a-z]*-[0-9]`,
EventNamePattern: `(^\[[^\]]*\] *)?(?P<result>.*)`,
AssetPattern: renderAssetPattern,
DatacenterPattern: renderDatacenterPattern,
EventNamePattern: renderEventNamePattern,
}
var m map[string]any
@@ -98,23 +104,18 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, err
}
result.driver = NewRAM()
if result.PostgresConn != "" {
ctx, can := context.WithTimeout(ctx, time.Second*10)
defer can()
pg, err := NewPostgres(ctx, result.PostgresConn)
if err != nil {
return Config{}, err
}
result.driver = pg
ctx, can := context.WithTimeout(ctx, time.Second*10)
defer can()
driver, err := NewDriver(ctx, result.DriverConn)
if err != nil {
return Config{}, err
}
if result.FillWithTestdata {
if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
return Config{}, err
}
result.driver = driver
if !result.FillWithTestdata {
//} else if err := result.driver.FillWithTestdata(ctx, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
} else {
return Config{}, errors.New("not impl")
}
result.storage = NewStorage(result.driver)
result.queue = NewQueue(result.driver)
if result.OllamaURL != "" {
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
@@ -124,5 +125,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.ai = NewAINoop()
}
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackToMessagePipeline = slackToMessagePipeline
return result, nil
}

303
driver.go
View File

@@ -5,25 +5,44 @@ import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"
"go.etcd.io/bbolt"
"net/url"
_ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq"
)
type Driver interface {
Close() error
ForEach(context.Context, string, func(string, []byte) error) error
Get(context.Context, string, string) ([]byte, error)
Set(context.Context, string, string, []byte) error
type Driver struct {
*sql.DB
}
func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern, eventNamePattern string) error {
func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {
conn = ":memory:"
} else {
if u, err := url.Parse(conn); err != nil {
return Driver{}, err
} else if u.Scheme != "" {
engine = u.Scheme
}
}
db, err := sql.Open(engine, conn)
if err != nil {
return Driver{}, err
}
driver := Driver{DB: db}
if err := driver.setup(ctx); err != nil {
driver.Close()
return Driver{}, fmt.Errorf("failed setup: %w", err)
}
return driver, nil
}
/*
func (driver Driver) FillWithTestdata(ctx context.Context, assetPattern, datacenterPattern, eventNamePattern string) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
@@ -49,61 +68,17 @@ func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacent
}
return nil
}
*/
type Postgres struct {
db *sql.DB
func (driver Driver) setup(ctx context.Context) error {
_, err := driver.ExecContext(ctx, `
DROP TABLE IF EXISTS spoc_bot_vr_q;
DROP TABLE IF EXISTS spoc_bot_vr_messages;
`)
return err
}
func NewPostgres(ctx context.Context, conn string) (Postgres, error) {
db, err := sql.Open("postgres", conn)
if err != nil {
return Postgres{}, err
}
pg := Postgres{db: db}
if err := pg.setup(ctx); err != nil {
pg.Close()
return Postgres{}, fmt.Errorf("failed setup: %w", err)
}
return pg, nil
}
func (pg Postgres) setup(ctx context.Context) error {
tableQ, err := pg.table("q")
if err != nil {
return err
}
tableM, err := pg.table("m")
if err != nil {
return err
}
if _, err := pg.db.ExecContext(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
`, tableQ,
tableM,
tableQ, tableQ,
tableQ, tableQ,
tableM, tableM,
tableM, tableM,
)); err != nil {
return err
}
return nil
}
func (pg Postgres) table(s string) (string, error) {
func (d Driver) table(s string) (string, error) {
switch s {
case "q":
return "spoc_bot_vr_q", nil
@@ -112,201 +87,3 @@ func (pg Postgres) table(s string) (string, error) {
}
return "", errors.New("invalid table " + s)
}
func (pg Postgres) Close() error {
return pg.db.Close()
}
func (pg Postgres) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
table, err := pg.table(ns)
if err != nil {
return err
}
rows, err := pg.db.QueryContext(ctx, fmt.Sprintf(`SELECT id, v FROM %s;`, table))
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id string
var v []byte
if err := rows.Scan(&id, &v); err != nil {
return err
} else if err := cb(id, v); err != nil {
return err
}
}
return ctx.Err()
}
func (pg Postgres) Get(ctx context.Context, ns, id string) ([]byte, error) {
table, err := pg.table(ns)
if err != nil {
return nil, err
}
row := pg.db.QueryRowContext(ctx, fmt.Sprintf(`SELECT v FROM %s WHERE id='%s';`, table, id))
if err := row.Err(); err != nil {
return nil, err
}
var v []byte
if err := row.Scan(&v); err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
return v, nil
}
func (pg Postgres) Set(ctx context.Context, ns, id string, v []byte) error {
table, err := pg.table(ns)
if err != nil {
return err
}
if v == nil {
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE id='%s';`, table, id))
return err
}
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s (id, v) VALUES ('%s', '%s') ON CONFLICT (id) DO UPDATE SET v = '%s'`, table, id, v, v))
return err
}
type RAM struct {
m map[string]map[string][]byte
lock *sync.RWMutex
}
func NewRAM() RAM {
return RAM{
m: make(map[string]map[string][]byte),
lock: &sync.RWMutex{},
}
}
func (ram RAM) Close() error {
return nil
}
func (ram RAM) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
ram.lock.RLock()
defer ram.lock.RUnlock()
for k, v := range ram.m[ns] {
if ctx.Err() != nil {
break
}
if err := cb(k, v); err != nil {
return err
}
}
return ctx.Err()
}
func (ram RAM) Get(_ context.Context, ns, id string) ([]byte, error) {
ram.lock.RLock()
defer ram.lock.RUnlock()
if _, ok := ram.m[ns]; !ok {
return nil, nil
}
return ram.m[ns][id], nil
}
func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.lock.Lock()
defer ram.lock.Unlock()
if _, ok := ram.m[ns]; !ok {
ram.m[ns] = map[string][]byte{}
}
ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil
}
type BBolt struct {
db *bbolt.DB
}
func NewTestDBIn(d string) BBolt {
d, err := ioutil.TempDir(d, "test-db-*")
if err != nil {
panic(err)
}
db, err := NewDB(path.Join(d, "bb"))
if err != nil {
panic(err)
}
return db
}
func NewDB(p string) (BBolt, error) {
db, err := bbolt.Open(p, 0600, &bbolt.Options{
Timeout: time.Second,
})
return BBolt{db: db}, err
}
func (bb BBolt) Close() error {
return bb.db.Close()
}
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
return bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
c := bkt.Cursor()
for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() {
if err := cb(string(k), v); err != nil {
return err
}
}
return ctx.Err()
})
}
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
var b []byte
err := bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
b = bkt.Get([]byte(id))
return nil
})
return b, err
}
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
return bb.db.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
var err error
bkt, err = tx.CreateBucket([]byte(db))
if err != nil {
return err
}
}
if value == nil {
return bkt.Delete([]byte(id))
}
return bkt.Put([]byte(id), value)
})
}

View File

@@ -1,22 +0,0 @@
//go:build postgres
package main
import (
"context"
"os"
"testing"
"time"
)
func TestPostgres(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
conn := os.Getenv("INTEGRATION_POSTGRES_CONN")
pg, err := NewPostgres(ctx, conn)
if err != nil {
t.Fatal(err)
}
testDriver(t, pg)
}

View File

@@ -2,91 +2,17 @@ package main
import (
"context"
"errors"
"io"
"testing"
"time"
)
func TestDriverRAM(t *testing.T) {
testDriver(t, NewRAM())
}
func TestFillTestdata(t *testing.T) {
func TestDriver(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern); err != nil {
d, err := NewDriver(ctx, "")
if err != nil {
t.Fatal(err)
}
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir()))
}
func testDriver(t *testing.T, d Driver) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
defer d.Close()
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from empty:", err)
} else if b != nil {
t.Error("got fake from empty")
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil {
t.Error("cannot set from empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from full:", err)
} else if string(b) != `"hello world"` {
t.Error("got fake from full")
}
if err := d.ForEach(ctx, "m", func(id string, v []byte) error {
if id != "id" {
t.Error("for each id weird:", id)
}
if string(v) != `"hello world"` {
t.Error("for each value weird:", string(v))
}
return io.EOF
}); err != io.EOF {
t.Error("failed to forEach full:", err)
}
if err := d.Set(ctx, "m", "id", nil); err != nil {
t.Error("cannot set from full:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {
t.Error("got fake from deleted")
}
}

15
go.mod
View File

@@ -3,17 +3,24 @@ module github.com/breel-render/spoc-bot-vr
go 1.22.1
require (
github.com/go-errors/errors v1.5.1
github.com/glebarez/go-sqlite v1.21.2
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8
go.etcd.io/bbolt v1.3.9
gotest.tools/v3 v3.5.1
)
require (
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/gage-technologies/mistral-go v1.0.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.23.1 // indirect
)

32
go.sum
View File

@@ -2,29 +2,45 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A=
github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=

146
main.go
View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -37,11 +36,25 @@ func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg):
return err
case err := <-listenAndServe(ctx, cfg):
return err
}
}
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
errs := make(chan error)
go func() {
defer close(errs)
select {
case errs <- cfg.slackToMessagePipeline.Process(ctx):
case <-ctx.Done():
}
}()
return errs
}
func listenAndServe(ctx context.Context, cfg Config) chan error {
s := http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
@@ -64,12 +77,13 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
@@ -82,25 +96,62 @@ func newHandler(cfg Config) http.HandlerFunc {
}
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
var page struct {
OK bool
Messages []json.RawMessage
}
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
} else if !page.OK {
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
@@ -110,19 +161,7 @@ func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := cfg.storage.EventsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"events": events})
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
@@ -132,19 +171,7 @@ func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
messages, err := cfg.storage.MessagesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"messages": messages})
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
@@ -154,19 +181,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"threads": threads})
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
@@ -177,14 +192,9 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
}
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
_ = thread
messages, err := cfg.storage.Thread(r.Context(), thread)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"thread": messages})
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
@@ -246,20 +256,12 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return
}
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("ingested %v", m.ID)
log.Printf("ingested")
}
}

View File

@@ -39,10 +39,12 @@ func TestRun(t *testing.T) {
u := fmt.Sprintf("http://localhost:%d", port)
cfg := Config{}
cfg.DatacenterPattern = renderDatacenterPattern
cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port
cfg.driver = NewRAM()
cfg.storage = NewStorage(cfg.driver)
cfg.queue = NewQueue(cfg.driver)
cfg.driver, _ = NewDriver(ctx, "")
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}
@@ -120,7 +122,7 @@ func TestRun(t *testing.T) {
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.EventNames[0] != "[Oregon-1] Wal Receive Count Alert" {
} else if result.EventNames[0] != "Wal Receive Count Alert" {
t.Fatal(result.EventNames)
} else {
t.Logf("%+v", result)
@@ -215,14 +217,17 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
dec := csv.NewReader(resp.Body)
b, _ := io.ReadAll(resp.Body)
t.Logf("whole csv: \n%s", b)
dec := csv.NewReader(bytes.NewReader(b))
var lastLine []string
for {
line, err := dec.Read()
if err == io.EOF {
break
} else if err != nil {
t.Error(err)
t.Error("unexpected error while reading csv line:", err)
}
if lastLine == nil {
@@ -230,7 +235,7 @@ func TestRun(t *testing.T) {
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
}
t.Logf("%+v", line)
t.Logf("CSV line: %+v", line)
lastLine = line
}
if lastLine == nil {

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"time"
)
@@ -59,8 +60,11 @@ func Deserialize(b []byte) (Message, error) {
type (
slackMessage struct {
TS uint64 `json:"event_time"`
Event slackEvent
slackEvent
Type string
TS uint64 `json:"event_time"`
Event slackEvent
MessageTS string `json:"ts"`
}
slackEvent struct {
@@ -108,28 +112,43 @@ type (
)
func ParseSlack(b []byte, assetPattern, datacenterPattern, eventNamePattern string) (Message, error) {
m, err := parseSlackJSON(b)
return ParseSlackFromChannel(b, assetPattern, datacenterPattern, eventNamePattern, "")
}
func ParseSlackFromChannel(b []byte, assetPattern, datacenterPattern, eventNamePattern string, ch string) (Message, error) {
m, err := parseSlackJSON(b, ch)
if err != nil {
return Message{}, err
}
asset := regexp.MustCompile(assetPattern)
datacenter := regexp.MustCompile(datacenterPattern)
eventName := regexp.MustCompile(eventNamePattern)
m.Asset = asset.FindString(m.Asset)
m.Datacenter = datacenter.FindString(m.Datacenter)
m.EventName = eventName.FindString(m.EventName)
for pattern, ptr := range map[string]*string{
assetPattern: &m.Asset,
datacenterPattern: &m.Datacenter,
eventNamePattern: &m.EventName,
} {
r := regexp.MustCompile(pattern)
parsed := r.FindString(*ptr)
for i, name := range r.SubexpNames() {
if i > 0 && name != "" {
parsed = r.FindStringSubmatch(*ptr)[i]
}
}
*ptr = parsed
}
return m, nil
}
func parseSlackJSON(b []byte) (Message, error) {
func parseSlackJSON(b []byte, ch string) (Message, error) {
s, err := _parseSlackJSON(b)
if err != nil {
return Message{}, err
}
if ch != "" {
s.Event.Channel = ch
}
if s.Event.Bot.Name != "" {
if len(s.Event.Attachments) == 0 {
return Message{}, ErrIrrelevantMessage
@@ -177,6 +196,12 @@ func parseSlackJSON(b []byte) (Message, error) {
func _parseSlackJSON(b []byte) (slackMessage, error) {
var result slackMessage
err := json.Unmarshal(b, &result)
switch result.Type {
case "message":
result.Event = result.slackEvent
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
result.Event.ID = result.MessageTS
}
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
result.Event.Blocks = result.Event.Nested.Blocks
result.Event.Bot = result.Event.Nested.Bot

View File

@@ -1,20 +1,14 @@
package main
import (
"fmt"
"os"
"path"
"testing"
)
var (
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
renderEventNamePattern = `(^\[[^\]]*\] *)?(?P<result>.*)`
)
func TestParseSlackTestdata(t *testing.T) {
cases := map[string]struct {
inCh string
slackMessage slackMessage
message Message
}{
@@ -120,6 +114,21 @@ func TestParseSlackTestdata(t *testing.T) {
Resolved: true,
},
},
"reingested_alert.json": {
inCh: "C06U1DDBBU4",
message: Message{
ID: "1712892637.037639/1712892637",
TS: 1712892637,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
Channel: "C06U1DDBBU4",
Thread: "1712892637.037639",
EventName: "Alertconfig Workflow Failed",
Event: "11061",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "",
Resolved: true,
},
},
}
for name, d := range cases {
@@ -130,18 +139,8 @@ func TestParseSlackTestdata(t *testing.T) {
t.Fatal(err)
}
t.Run("parseSlackJSON", func(t *testing.T) {
got, err := parseSlackJSON(b)
if err != nil {
t.Fatal(err)
}
if fmt.Sprintf("%+v", got) != fmt.Sprintf("%+v", want.slackMessage) {
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.slackMessage, got)
}
})
t.Run("ParseSlack", func(t *testing.T) {
got, err := ParseSlack(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
if err != nil {
t.Fatal(err)
}

44
pipeline.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import "context"
type (
Pipeline struct {
writer Queue
reader Queue
process processFunc
}
processFunc func(context.Context, []byte) ([]byte, error)
)
func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
return Pipeline{
writer: writer,
reader: reader,
process: process,
}
}
func (p Pipeline) Process(ctx context.Context) error {
ctx, can := context.WithCancel(ctx)
defer can()
for ctx.Err() == nil {
reservation, read, err := p.reader.Syn(ctx)
if err != nil {
return err
}
processed, err := p.process(ctx, read)
if err != nil {
return err
}
if err := p.writer.Enqueue(ctx, processed); err != nil {
return err
}
if err := p.reader.Ack(ctx, reservation); err != nil {
return err
}
}
return ctx.Err()
}

52
pipeline_test.go Normal file
View File

@@ -0,0 +1,52 @@
package main
import (
"context"
"testing"
"time"
)
func TestPipeline(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driverOutput, _ := NewDriver(ctx, "")
output, err := NewQueue(ctx, "output", driverOutput)
if err != nil {
t.Fatal(err)
}
driverInput, _ := NewDriver(ctx, "")
input, err := NewQueue(ctx, "input", driverInput)
if err != nil {
t.Fatal(err)
}
found := map[string]struct{}{}
process := func(_ context.Context, v []byte) ([]byte, error) {
found[string(v)] = struct{}{}
return []byte("world"), nil
}
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
t.Error(err)
}
ing := NewPipeline(output, input, process)
go func() {
defer can()
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
if r, p, err := output.Syn(ctx); err != nil {
t.Error(err)
} else if string(p) != "world" {
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
} else if err := output.Ack(ctx, r); err != nil {
t.Error(err)
}
if len(found) != 1 {
t.Error(found)
}
}

115
queue.go
View File

@@ -2,57 +2,118 @@ package main
import (
"context"
"fmt"
"time"
"github.com/go-errors/errors"
"github.com/google/uuid"
)
type Queue struct {
driver Driver
topic string
}
func NewQueue(driver Driver) Queue {
return Queue{driver: driver}
func NewNoopQueue() Queue {
return Queue{}
}
func (q Queue) Push(ctx context.Context, m Message) error {
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY,
topic TEXT NOT NULL,
updated INTEGER NOT NULL,
reservation TEXT,
payload TEXT
);
`); err != nil {
return Queue{}, fmt.Errorf("failed to create table: %w", err)
}
return Queue{topic: topic, driver: driver}, nil
}
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
if q.driver.DB == nil {
return nil
}
_, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
`,
q.topic,
time.Now().Unix(),
b,
)
return err
}
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
if q.driver.DB == nil {
return "", nil, nil
}
for {
m, err := q.peekFirst(ctx)
if err != nil {
return m, err
}
if !m.Empty() {
return m, nil
reservation, m, err := q.syn(ctx)
if reservation != nil || err != nil {
return string(reservation), m, err
}
select {
case <-ctx.Done():
return Message{}, ctx.Err()
return "", nil, ctx.Err()
case <-time.After(time.Second):
}
}
}
func (q Queue) Ack(ctx context.Context, id string) error {
return q.driver.Set(ctx, "q", id, nil)
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
now := time.Now().Unix()
reservation := []byte(uuid.New().String())
var payload []byte
if result, err := q.driver.ExecContext(ctx, `
UPDATE queue
SET
updated = ?, reservation = ?
WHERE
id IN (
SELECT id
FROM queue
WHERE
topic == ?
AND (
reservation IS NULL
OR ? - updated > 60
)
LIMIT 1
)
`, now, reservation, q.topic, now); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
} else if n, err := result.RowsAffected(); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
} else if n == 0 {
return nil, nil, nil
}
row := q.driver.QueryRowContext(ctx, `
SELECT payload
FROM queue
WHERE reservation==?
LIMIT 1
`, reservation)
if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
}
return reservation, payload, nil
}
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
var m Message
subctx, subcan := context.WithCancel(ctx)
defer subcan()
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
m = MustDeserialize(value)
subcan()
func (q Queue) Ack(ctx context.Context, reservation string) error {
if q.driver.DB == nil {
return nil
})
if errors.Is(err, subctx.Err()) {
err = nil
}
return m, err
_, err := q.driver.ExecContext(ctx, `
DELETE FROM queue
WHERE reservation==?
`, reservation)
return err
}

View File

@@ -11,24 +11,64 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
q := NewQueue(NewRAM())
driver, _ := NewDriver(ctx, "")
q, err := NewQueue(ctx, "", driver)
if err != nil {
t.Fatal(err)
}
qOther, _ := NewQueue(ctx, "other", driver)
for i := 0; i < 39; i++ {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
t.Fatal(i, err)
}
if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn before any enqueues created: %v", err)
} else {
t.Logf("sync before enqueues: %v", err)
}
found := map[uint64]struct{}{}
for i := 0; i < 39; i++ {
if m, err := q.PeekFirst(ctx); err != nil {
t.Fatal(i, err)
} else if _, ok := found[m.TS]; ok {
t.Error(i, m.TS)
} else if err := q.Ack(ctx, m.ID); err != nil {
t.Fatal(i, err)
} else {
found[m.TS] = struct{}{}
t.Run("enqueue", func(t *testing.T) {
for i := 0; i < 39; i++ {
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
t.Fatal(i, err)
}
}
})
if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil {
t.Fatal(err)
}
t.Run("syn ack", func(t *testing.T) {
found := map[string]struct{}{}
for i := 0; i < 39; i++ {
if reservation, b, err := q.Syn(ctx); err != nil {
t.Fatal(i, "syn err", err)
} else if _, ok := found[string(b)]; ok {
t.Errorf("syn'd %q twice (%+v)", b, found)
} else if err := q.Ack(ctx, reservation); err != nil {
t.Fatal(i, "failed to ack", err)
} else {
found[string(b)] = struct{}{}
}
}
})
if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn 1 more message than created: %v", err)
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
t.Errorf("unable to syn from other topic: %v", err)
} else {
t.Logf("empty q.syn = %v", err)
}
t.Run("noop", func(t *testing.T) {
q := NewNoopQueue()
if err := q.Enqueue(nil, nil); err != nil {
t.Error(err)
}
if _, _, err := q.Syn(nil); err != nil {
t.Error(err)
}
if err := q.Ack(nil, ""); err != nil {
t.Error(err)
}
})
}

View File

@@ -1,130 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
<script type="module">
import * as d3 from "https://cdn.jsdelivr.net/npm/d3@7/+esm";
</script>
<script>
const allMessages = {{ json "Marshal" .messages }};
console.log(allMessages);
function fillForm() {
const filterableFields = [
"EventName",
"Asset",
];
const fieldsToOptions = {};
filterableFields.map((field) => {fieldsToOptions[field] = {}});
allMessages.map((message) => {
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
});
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
return `
<label for="${field}">${field}</label>
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
${fieldsToOptions[field].map((option) => `
<option selected>${option}</option>
`)}
</select>
`
}).join("\n");
}
function drawAll() {
const messages = filterMessages(allMessages)
drawEventVolumeByName(messages)
drawEventVolumeByWeekday(messages)
drawEventVolumeByHour(messages)
drawEventVolumeByAsset(messages)
}
function filterMessages(messages) {
const filters = document.getElementById("form");
console.log(filters);
return messages.map(() => {
});
}
function drawEventVolumeByName() {}
function drawEventVolumeByWeekday() {}
function drawEventVolumeByHour() {}
function drawEventVolumeByAsset() {}
</script>
<style>
rows {
display: flex;
flex-direction: column;
flex-grow: 1;
}
columns {
display: flex;
flex-direction: row;
flex-grow: 1;
}
rows, columns { border: 1px solid red; }
</style>
</head>
<body onload="fillForm(); drawAll();">
<h1>Report</h1>
<columns>
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
<columns>
<button type="submit">Apply</button>
</columns>
<rows id="form"></rows>
</form>
<rows>
<rows>
<rows>
<h2>Event Volume by Name</h2>
<div>DRAW ME</div>
</rows>
<columns>
<rows>
<h3>by Weekday</h3>
<div>DRAW ME</div>
</rows>
<rows>
<h3>by Hour</h3>
<div>DRAW ME</div>
</rows>
</columns>
<rows>
<h3>by Asset</h3>
<div>DRAW ME</div>
</rows>
</rows>
<rows>
<div>
<h2>Events</h2>
<table>
<tr>
<th>TS</th>
<th>Event</th>
<th>EventName</th>
<th>Latest</th>
</tr>
{{ range .events.Events }}
<tr>
<td><a href="{{ .First.Source }}">{{ time "Unix" .First.TS | time "Time.Format" "Mon Jan 02" }}</a></td>
<td><a href="https://TODO">{{ .Event }}</a></td>
<td>{{ .First.EventName }}</td>
<td><a href="{{ .Last.Source }}">{{ .Last.Plaintext }}</a></td>
</tr>
{{ end }}
</table>
</div>
</rows>
</rows>
</columns>
</body>
<footer>
</footer>
</html>

36
slack.go Normal file
View File

@@ -0,0 +1,36 @@
package main
import (
"context"
"fmt"
)
type SlackToMessage struct {
pipeline Pipeline
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newSlackToMessageProcess(cfg),
}, nil
}
func newSlackToMessageProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
}
return m.Serialize(), nil
}
}

50
slack_test.go Normal file
View File

@@ -0,0 +1,50 @@
package main
import (
"context"
"os"
"testing"
"time"
"gotest.tools/v3/assert"
)
func TestSlackToMessagePipeline(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
driver, _ := NewDriver(ctx, "/tmp/f")
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
if err != nil {
t.Fatal(err)
}
go func() {
if err := pipeline.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
want := Message{
ID: "1712927439.728409/1712927439",
TS: 1712927439,
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
Channel: "C06U1DDBBU4",
Thread: "1712927439.728409",
EventName: "",
Event: "11071",
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
}
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
t.Fatal("failed to enqueue", err)
}
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
t.Fatal("failed to syn", err)
} else if m := MustDeserialize(b2); false {
} else {
assert.DeepEqual(t, want, m)
}
}

View File

@@ -1,96 +0,0 @@
package main
import (
"context"
"errors"
"sort"
"time"
)
var (
ErrNotFound = errors.New("not found")
)
type Storage struct {
driver Driver
}
func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
}
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
}
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
}
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
messages, err := s.MessagesSince(ctx, t)
if err != nil {
return nil, err
}
values := map[string]struct{}{}
for _, m := range messages {
values[fielder(m)] = struct{}{}
}
result := make([]string, 0, len(values))
for k := range values {
result = append(result, k)
}
sort.Strings(result)
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "m", id)
if err != nil {
return Message{}, err
}
if b == nil {
return Message{}, ErrNotFound
}
return MustDeserialize(b), nil
}

View File

@@ -1,67 +0,0 @@
package main
import (
"context"
"testing"
"time"
)
func TestStorage(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second)
defer can()
t.Run("Threads", func(t *testing.T) {
s := NewStorage(NewRAM())
mX1 := Message{ID: "1", Thread: "X", TS: 1}
mX2 := Message{ID: "2", Thread: "X", TS: 2}
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
for _, m := range []Message{mX1, mX2, mY1} {
if err := s.Upsert(ctx, m); err != nil {
t.Fatal(err)
}
}
if threads, err := s.Threads(ctx); err != nil {
t.Error(err)
} else if len(threads) != 2 {
t.Error(threads)
} else if threads[0] != "X" {
t.Error(threads, "X")
} else if threads[1] != "Y" {
t.Error(threads, "Y")
}
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
t.Error(err)
} else if len(threads) != 1 {
t.Error(threads)
} else if threads[0] != "Y" {
t.Error(threads[0])
}
})
t.Run("Get Upsert", func(t *testing.T) {
s := NewStorage(NewRAM())
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
t.Error("failed to get 404", err)
}
m := Message{
ID: "id",
TS: 1,
}
if err := s.Upsert(ctx, m); err != nil {
t.Error("failed to upsert", err)
}
if m2, err := s.Get(ctx, "id"); err != nil {
t.Error("failed to get", err)
} else if m != m2 {
t.Error(m2)
}
})
}

View File

@@ -0,0 +1,57 @@
{
"user": "U03RUK7FBUY",
"type": "message",
"ts": "1712892637.037639",
"edited": {
"user": "B03RHGBPH2M",
"ts": "1712896236.000000"
},
"bot_id": "B03RHGBPH2M",
"app_id": "A286WATV2",
"text": "",
"team": "T9RQLQ0KV",
"bot_profile": {
"id": "B03RHGBPH2M",
"app_id": "A286WATV2",
"name": "Opsgenie for Alert Management",
"icons": {
"image_36": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_36.png",
"image_48": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_48.png",
"image_72": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_72.png"
},
"deleted": false,
"updated": 1658887059,
"team_id": "T9RQLQ0KV"
},
"attachments": [
{
"id": 1,
"color": "2ecc71",
"fallback": "\"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfcb2ead-1712892636514|11061>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"title": "#11061: [Grafana]: Firing: Alertconfig Workflow Failed",
"title_link": "https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfc$2ead-1712892636514",
"callback_id": "bbd4a269-08a9-470e-ba79-ce238ac03dc7_05fa2e9b-bec4-4a7e-842d-36043d267a13_11061",
"fields": [
{
"value": "P3",
"title": "Priority",
"short": true
},
{
"value": "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
"title": "Tags",
"short": true
},
{
"value": "Datastores Non-Critical",
"title": "Routed Teams",
"short": true
}
],
"mrkdwn_in": [
"text"
]
}
]
}