Compare commits
26 Commits
a8270b524c
...
83c0ee3f53
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83c0ee3f53 | ||
|
|
9d7a175c62 | ||
|
|
1dcffdd956 | ||
|
|
580068d98b | ||
|
|
eec5c39725 | ||
|
|
9848492b1e | ||
|
|
a674022357 | ||
|
|
80df07089f | ||
|
|
d792626c2f | ||
|
|
acac2a60b0 | ||
|
|
eef78d6e39 | ||
|
|
42c5b7d7ad | ||
|
|
e85a2d25a1 | ||
|
|
8193bf7377 | ||
|
|
2f3739b24f | ||
|
|
d38352f050 | ||
|
|
ba833fa315 | ||
|
|
d7cbcb9926 | ||
|
|
961be827d0 | ||
|
|
6fbafe6700 | ||
|
|
7df7528ccf | ||
|
|
a91da082c7 | ||
|
|
af2ad44109 | ||
|
|
cabc5c00b7 | ||
|
|
84dec31e53 | ||
|
|
f2a23e5d8a |
227
.report.tmpl
Normal file
227
.report.tmpl
Normal file
@@ -0,0 +1,227 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||||
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
|
||||||
|
<script src="https://code.highcharts.com/10/highcharts.js"></script>
|
||||||
|
<script type="module">
|
||||||
|
const allMessages = {{ json "Marshal" .messages }};
|
||||||
|
|
||||||
|
function fillForm() {
|
||||||
|
const filterableFields = [
|
||||||
|
"Asset",
|
||||||
|
"Channel",
|
||||||
|
"Event",
|
||||||
|
"EventName",
|
||||||
|
"Resolved",
|
||||||
|
"Thread",
|
||||||
|
];
|
||||||
|
const fieldsToOptions = {};
|
||||||
|
filterableFields.map((field) => {fieldsToOptions[field] = {}});
|
||||||
|
allMessages.map((message) => {
|
||||||
|
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
|
||||||
|
});
|
||||||
|
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
|
||||||
|
|
||||||
|
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
|
||||||
|
return `
|
||||||
|
<label for="${field}">${field}</label>
|
||||||
|
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
|
||||||
|
${fieldsToOptions[field].map((option) => `
|
||||||
|
<option selected>${option}</option>
|
||||||
|
`)}
|
||||||
|
</select>
|
||||||
|
`
|
||||||
|
}).join("\n");
|
||||||
|
}
|
||||||
|
window.fillForm = fillForm;
|
||||||
|
|
||||||
|
function drawAll() {
|
||||||
|
const messages = filterMessages(allMessages)
|
||||||
|
|
||||||
|
dumpEvents(messages);
|
||||||
|
drawEventVolume(messages)
|
||||||
|
drawEventVolumeByHour(messages)
|
||||||
|
drawEventVolumeByAsset(messages)
|
||||||
|
}
|
||||||
|
window.drawAll = drawAll;
|
||||||
|
|
||||||
|
function dumpEvents(messages) {
|
||||||
|
const eventToThreads = {};
|
||||||
|
for(var m of messages) {
|
||||||
|
if (!eventToThreads[m.Event])
|
||||||
|
eventToThreads[m.Event] = [];
|
||||||
|
eventToThreads[m.Event].push(m.Thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
const threadToMessages = {};
|
||||||
|
for(var m of messages) {
|
||||||
|
if (!threadToMessages[m.Thread])
|
||||||
|
threadToMessages[m.Thread] = [];
|
||||||
|
threadToMessages[m.Thread].push(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventToMessages = {};
|
||||||
|
for(var e in eventToThreads) {
|
||||||
|
if (!eventToMessages[e])
|
||||||
|
eventToMessages[e] = [];
|
||||||
|
for (var thread of eventToThreads[e])
|
||||||
|
eventToMessages[e] = eventToMessages[e].concat(threadToMessages[thread]);
|
||||||
|
}
|
||||||
|
for(var e in eventToMessages)
|
||||||
|
eventToMessages[e].sort((a, b) => a.TS - b.TS);
|
||||||
|
var events = Object.keys(eventToMessages);
|
||||||
|
events.sort();
|
||||||
|
events.reverse();
|
||||||
|
|
||||||
|
var keys = ["TS", "Event", "EventName", "Latest"];
|
||||||
|
document.getElementById("events").innerHTML = `
|
||||||
|
<tr>
|
||||||
|
<th>TS</th>
|
||||||
|
<th>Event</th>
|
||||||
|
<th>EventName</th>
|
||||||
|
<th>Latest</th>
|
||||||
|
</tr>
|
||||||
|
${events.map((e) => `
|
||||||
|
<tr>
|
||||||
|
<td><a href="${eventToMessages[e][0].Source}">${new Date(eventToMessages[e][0].TS * 1000).toDateString()}</a></td>
|
||||||
|
<td><a href="${eventToMessages[e][0].Source}">${eventToMessages[e][0].Event}</a></td>
|
||||||
|
<td>${eventToMessages[e][0].EventName}</td>
|
||||||
|
<td><a href="${eventToMessages[e].at(-1).Source}">${eventToMessages[e].at(-1).Plaintext}</a></td>
|
||||||
|
</tr>
|
||||||
|
`).join("")}
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
|
||||||
|
function filterMessages(messages) {
|
||||||
|
const selects = document.getElementById("form").getElementsByTagName("select");
|
||||||
|
const fieldsToOptions = {};
|
||||||
|
for(var select of selects) {
|
||||||
|
fieldsToOptions[select.name] = [];
|
||||||
|
for(var option of select.getElementsByTagName("option"))
|
||||||
|
if (option.selected)
|
||||||
|
fieldsToOptions[select.name].push(option.innerHTML);
|
||||||
|
}
|
||||||
|
return messages.map((m) => {
|
||||||
|
for(var k in fieldsToOptions) {
|
||||||
|
if (fieldsToOptions[k].filter((v) => `${v}` == `${m[k]}`).length == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}).filter((m) => { return m != null });
|
||||||
|
}
|
||||||
|
|
||||||
|
function drawEventVolume(messages) {
|
||||||
|
drawEventVolumeWith(
|
||||||
|
messages,
|
||||||
|
"eventVolume",
|
||||||
|
(ts) => new Date(1000 * ts).
|
||||||
|
toLocaleDateString('en-US', {month: 'numeric', day: 'numeric', weekday: 'short'}),
|
||||||
|
(m) => m.EventName,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function drawEventVolumeWith(messages, documentId, kify, nameify) {
|
||||||
|
const points = [];
|
||||||
|
messages.forEach((m) => {
|
||||||
|
points.push({x: m.TS, name: nameify(m)});
|
||||||
|
});
|
||||||
|
|
||||||
|
var xs = points.map((point) => point.x);
|
||||||
|
if (xs && !isNaN(parseFloat(kify(xs[0])))) {
|
||||||
|
xs = xs.map(kify);
|
||||||
|
xs.sort((a, b) => parseFloat(a) - parseFloat(b));
|
||||||
|
} else {
|
||||||
|
xs.sort();
|
||||||
|
xs = xs.map(kify);
|
||||||
|
}
|
||||||
|
xs = [...new Set(xs)];
|
||||||
|
|
||||||
|
const names = [...new Set(points.map((p) => p.name))];
|
||||||
|
const nameAndData = names.map((name) => {
|
||||||
|
return {
|
||||||
|
name: name,
|
||||||
|
data: xs.map((x) => points.filter((p) => { return p.name == name && kify(p.x) == x }).length),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
draw(documentId, xs, nameAndData);
|
||||||
|
}
|
||||||
|
|
||||||
|
function drawEventVolumeByHour(messages) {
|
||||||
|
drawEventVolumeWith(
|
||||||
|
messages,
|
||||||
|
"eventVolumeByHour",
|
||||||
|
(ts) => new Date(1000 * ts).getHours(),
|
||||||
|
(m) => m.EventName,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function drawEventVolumeByAsset(messages) {}
|
||||||
|
|
||||||
|
function draw(documentId, xs, nameAndData) {
|
||||||
|
document.getElementById(documentId).innerHTML = "";
|
||||||
|
Highcharts.chart(documentId, {
|
||||||
|
chart: { type: 'column' },
|
||||||
|
title: { text: '' },
|
||||||
|
xAxis: { categories: xs },
|
||||||
|
yAxis: { allowDecimals: false, title: { text: '' } },
|
||||||
|
//legend: { enabled: false },
|
||||||
|
series: nameAndData,
|
||||||
|
plotOptions: { column: { stacking: 'normal' } },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
</script>
|
||||||
|
<style>
|
||||||
|
rows {
|
||||||
|
display: flex;
|
||||||
|
flex-direction: column;
|
||||||
|
flex-grow: 1;
|
||||||
|
}
|
||||||
|
columns {
|
||||||
|
display: flex;
|
||||||
|
flex-direction: row;
|
||||||
|
flex-grow: 1;
|
||||||
|
}
|
||||||
|
rows, columns { border: 1px solid red; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body onload="fillForm(); drawAll();" style="max-width: inherit;">
|
||||||
|
<h1>Report</h1>
|
||||||
|
<columns>
|
||||||
|
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
|
||||||
|
<columns>
|
||||||
|
<button type="submit">Apply</button>
|
||||||
|
</columns>
|
||||||
|
<rows id="form"></rows>
|
||||||
|
</form>
|
||||||
|
<rows>
|
||||||
|
<rows>
|
||||||
|
<rows>
|
||||||
|
<h2>Event Volume</h2>
|
||||||
|
<div id="eventVolume"></div>
|
||||||
|
</rows>
|
||||||
|
<columns>
|
||||||
|
<rows>
|
||||||
|
<h3>by Hour</h3>
|
||||||
|
<div id="eventVolumeByHour"></div>
|
||||||
|
</rows>
|
||||||
|
</columns>
|
||||||
|
<rows>
|
||||||
|
<h3>by Asset</h3>
|
||||||
|
<div>DRAW ME</div>
|
||||||
|
</rows>
|
||||||
|
</rows>
|
||||||
|
<rows>
|
||||||
|
<div>
|
||||||
|
<h2>Events</h2>
|
||||||
|
<table id="events">
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
</rows>
|
||||||
|
</rows>
|
||||||
|
</columns>
|
||||||
|
</body>
|
||||||
|
<footer>
|
||||||
|
</footer>
|
||||||
|
</html>
|
||||||
41
config.go
41
config.go
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
@@ -17,7 +18,7 @@ type Config struct {
|
|||||||
InitializeSlack bool
|
InitializeSlack bool
|
||||||
SlackToken string
|
SlackToken string
|
||||||
SlackChannels []string
|
SlackChannels []string
|
||||||
PostgresConn string
|
DriverConn string
|
||||||
BasicAuthUser string
|
BasicAuthUser string
|
||||||
BasicAuthPassword string
|
BasicAuthPassword string
|
||||||
FillWithTestdata bool
|
FillWithTestdata bool
|
||||||
@@ -28,12 +29,17 @@ type Config struct {
|
|||||||
AssetPattern string
|
AssetPattern string
|
||||||
DatacenterPattern string
|
DatacenterPattern string
|
||||||
EventNamePattern string
|
EventNamePattern string
|
||||||
storage Storage
|
|
||||||
queue Queue
|
|
||||||
driver Driver
|
driver Driver
|
||||||
ai AI
|
ai AI
|
||||||
|
slackToMessagePipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
|
||||||
|
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
||||||
|
renderEventNamePattern = `(\[[^\]]*\] *)?(?P<result>.*)`
|
||||||
|
)
|
||||||
|
|
||||||
func newConfig(ctx context.Context) (Config, error) {
|
func newConfig(ctx context.Context) (Config, error) {
|
||||||
return newConfigFromEnv(ctx, os.Getenv)
|
return newConfigFromEnv(ctx, os.Getenv)
|
||||||
}
|
}
|
||||||
@@ -42,9 +48,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
def := Config{
|
def := Config{
|
||||||
Port: 38080,
|
Port: 38080,
|
||||||
OllamaModel: "gemma:2b",
|
OllamaModel: "gemma:2b",
|
||||||
AssetPattern: `(dpg|svc|red)-[a-z0-9-]*`,
|
AssetPattern: renderAssetPattern,
|
||||||
DatacenterPattern: `[a-z]{4}[a-z]*-[0-9]`,
|
DatacenterPattern: renderDatacenterPattern,
|
||||||
EventNamePattern: `(^\[[^\]]*\] *)?(?P<result>.*)`,
|
EventNamePattern: renderEventNamePattern,
|
||||||
}
|
}
|
||||||
|
|
||||||
var m map[string]any
|
var m map[string]any
|
||||||
@@ -98,23 +104,18 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result.driver = NewRAM()
|
|
||||||
if result.PostgresConn != "" {
|
|
||||||
ctx, can := context.WithTimeout(ctx, time.Second*10)
|
ctx, can := context.WithTimeout(ctx, time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
pg, err := NewPostgres(ctx, result.PostgresConn)
|
driver, err := NewDriver(ctx, result.DriverConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
result.driver = pg
|
result.driver = driver
|
||||||
|
if !result.FillWithTestdata {
|
||||||
|
//} else if err := result.driver.FillWithTestdata(ctx, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
|
||||||
|
} else {
|
||||||
|
return Config{}, errors.New("not impl")
|
||||||
}
|
}
|
||||||
if result.FillWithTestdata {
|
|
||||||
if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
|
|
||||||
return Config{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result.storage = NewStorage(result.driver)
|
|
||||||
result.queue = NewQueue(result.driver)
|
|
||||||
|
|
||||||
if result.OllamaURL != "" {
|
if result.OllamaURL != "" {
|
||||||
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
result.ai = NewAIOllama(result.OllamaURL, result.OllamaModel)
|
||||||
@@ -124,5 +125,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
result.ai = NewAINoop()
|
result.ai = NewAINoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.slackToMessagePipeline = slackToMessagePipeline
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
303
driver.go
303
driver.go
@@ -5,25 +5,44 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"net/url"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
|
|
||||||
|
_ "github.com/glebarez/go-sqlite"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Driver interface {
|
type Driver struct {
|
||||||
Close() error
|
*sql.DB
|
||||||
ForEach(context.Context, string, func(string, []byte) error) error
|
|
||||||
Get(context.Context, string, string) ([]byte, error)
|
|
||||||
Set(context.Context, string, string, []byte) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern, eventNamePattern string) error {
|
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||||
|
engine := "sqlite"
|
||||||
|
if conn == "" {
|
||||||
|
conn = ":memory:"
|
||||||
|
} else {
|
||||||
|
if u, err := url.Parse(conn); err != nil {
|
||||||
|
return Driver{}, err
|
||||||
|
} else if u.Scheme != "" {
|
||||||
|
engine = u.Scheme
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := sql.Open(engine, conn)
|
||||||
|
if err != nil {
|
||||||
|
return Driver{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
driver := Driver{DB: db}
|
||||||
|
if err := driver.setup(ctx); err != nil {
|
||||||
|
driver.Close()
|
||||||
|
return Driver{}, fmt.Errorf("failed setup: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return driver, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (driver Driver) FillWithTestdata(ctx context.Context, assetPattern, datacenterPattern, eventNamePattern string) error {
|
||||||
d := "./testdata/slack_events"
|
d := "./testdata/slack_events"
|
||||||
entries, err := os.ReadDir(d)
|
entries, err := os.ReadDir(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -49,61 +68,17 @@ func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacent
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
type Postgres struct {
|
func (driver Driver) setup(ctx context.Context) error {
|
||||||
db *sql.DB
|
_, err := driver.ExecContext(ctx, `
|
||||||
|
DROP TABLE IF EXISTS spoc_bot_vr_q;
|
||||||
|
DROP TABLE IF EXISTS spoc_bot_vr_messages;
|
||||||
|
`)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgres(ctx context.Context, conn string) (Postgres, error) {
|
func (d Driver) table(s string) (string, error) {
|
||||||
db, err := sql.Open("postgres", conn)
|
|
||||||
if err != nil {
|
|
||||||
return Postgres{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pg := Postgres{db: db}
|
|
||||||
if err := pg.setup(ctx); err != nil {
|
|
||||||
pg.Close()
|
|
||||||
return Postgres{}, fmt.Errorf("failed setup: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return pg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pg Postgres) setup(ctx context.Context) error {
|
|
||||||
tableQ, err := pg.table("q")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tableM, err := pg.table("m")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := pg.db.ExecContext(ctx, fmt.Sprintf(`
|
|
||||||
CREATE TABLE IF NOT EXISTS %s (
|
|
||||||
id TEXT NOT NULL,
|
|
||||||
v JSONB NOT NULL
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS %s (
|
|
||||||
id TEXT NOT NULL,
|
|
||||||
v JSONB NOT NULL
|
|
||||||
);
|
|
||||||
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
|
|
||||||
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
|
|
||||||
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
|
|
||||||
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
|
|
||||||
`, tableQ,
|
|
||||||
tableM,
|
|
||||||
tableQ, tableQ,
|
|
||||||
tableQ, tableQ,
|
|
||||||
tableM, tableM,
|
|
||||||
tableM, tableM,
|
|
||||||
)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pg Postgres) table(s string) (string, error) {
|
|
||||||
switch s {
|
switch s {
|
||||||
case "q":
|
case "q":
|
||||||
return "spoc_bot_vr_q", nil
|
return "spoc_bot_vr_q", nil
|
||||||
@@ -112,201 +87,3 @@ func (pg Postgres) table(s string) (string, error) {
|
|||||||
}
|
}
|
||||||
return "", errors.New("invalid table " + s)
|
return "", errors.New("invalid table " + s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pg Postgres) Close() error {
|
|
||||||
return pg.db.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pg Postgres) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
|
|
||||||
table, err := pg.table(ns)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := pg.db.QueryContext(ctx, fmt.Sprintf(`SELECT id, v FROM %s;`, table))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
var id string
|
|
||||||
var v []byte
|
|
||||||
if err := rows.Scan(&id, &v); err != nil {
|
|
||||||
return err
|
|
||||||
} else if err := cb(id, v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pg Postgres) Get(ctx context.Context, ns, id string) ([]byte, error) {
|
|
||||||
table, err := pg.table(ns)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
row := pg.db.QueryRowContext(ctx, fmt.Sprintf(`SELECT v FROM %s WHERE id='%s';`, table, id))
|
|
||||||
if err := row.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var v []byte
|
|
||||||
if err := row.Scan(&v); err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pg Postgres) Set(ctx context.Context, ns, id string, v []byte) error {
|
|
||||||
table, err := pg.table(ns)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if v == nil {
|
|
||||||
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE id='%s';`, table, id))
|
|
||||||
return err
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s (id, v) VALUES ('%s', '%s') ON CONFLICT (id) DO UPDATE SET v = '%s'`, table, id, v, v))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
type RAM struct {
|
|
||||||
m map[string]map[string][]byte
|
|
||||||
lock *sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRAM() RAM {
|
|
||||||
return RAM{
|
|
||||||
m: make(map[string]map[string][]byte),
|
|
||||||
lock: &sync.RWMutex{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ram RAM) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ram RAM) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
|
|
||||||
ram.lock.RLock()
|
|
||||||
defer ram.lock.RUnlock()
|
|
||||||
|
|
||||||
for k, v := range ram.m[ns] {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err := cb(k, v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ram RAM) Get(_ context.Context, ns, id string) ([]byte, error) {
|
|
||||||
ram.lock.RLock()
|
|
||||||
defer ram.lock.RUnlock()
|
|
||||||
|
|
||||||
if _, ok := ram.m[ns]; !ok {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return ram.m[ns][id], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
|
|
||||||
ram.lock.Lock()
|
|
||||||
defer ram.lock.Unlock()
|
|
||||||
|
|
||||||
if _, ok := ram.m[ns]; !ok {
|
|
||||||
ram.m[ns] = map[string][]byte{}
|
|
||||||
}
|
|
||||||
ram.m[ns][id] = v
|
|
||||||
if v == nil {
|
|
||||||
delete(ram.m[ns], id)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type BBolt struct {
|
|
||||||
db *bbolt.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestDBIn(d string) BBolt {
|
|
||||||
d, err := ioutil.TempDir(d, "test-db-*")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
db, err := NewDB(path.Join(d, "bb"))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return db
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDB(p string) (BBolt, error) {
|
|
||||||
db, err := bbolt.Open(p, 0600, &bbolt.Options{
|
|
||||||
Timeout: time.Second,
|
|
||||||
})
|
|
||||||
return BBolt{db: db}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bb BBolt) Close() error {
|
|
||||||
return bb.db.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
|
|
||||||
return bb.db.View(func(tx *bbolt.Tx) error {
|
|
||||||
bkt := tx.Bucket([]byte(db))
|
|
||||||
if bkt == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c := bkt.Cursor()
|
|
||||||
for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() {
|
|
||||||
if err := cb(string(k), v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctx.Err()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
|
|
||||||
var b []byte
|
|
||||||
err := bb.db.View(func(tx *bbolt.Tx) error {
|
|
||||||
bkt := tx.Bucket([]byte(db))
|
|
||||||
if bkt == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
b = bkt.Get([]byte(id))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return b, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
|
|
||||||
return bb.db.Update(func(tx *bbolt.Tx) error {
|
|
||||||
bkt := tx.Bucket([]byte(db))
|
|
||||||
if bkt == nil {
|
|
||||||
var err error
|
|
||||||
bkt, err = tx.CreateBucket([]byte(db))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if value == nil {
|
|
||||||
return bkt.Delete([]byte(id))
|
|
||||||
}
|
|
||||||
return bkt.Put([]byte(id), value)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
//go:build postgres
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPostgres(t *testing.T) {
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
conn := os.Getenv("INTEGRATION_POSTGRES_CONN")
|
|
||||||
pg, err := NewPostgres(ctx, conn)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
testDriver(t, pg)
|
|
||||||
}
|
|
||||||
@@ -2,91 +2,17 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDriverRAM(t *testing.T) {
|
func TestDriver(t *testing.T) {
|
||||||
testDriver(t, NewRAM())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFillTestdata(t *testing.T) {
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
ram := NewRAM()
|
d, err := NewDriver(ctx, "")
|
||||||
if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern); err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
n := 0
|
|
||||||
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
|
|
||||||
n += 1
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
t.Log(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDriverBBolt(t *testing.T) {
|
|
||||||
testDriver(t, NewTestDBIn(t.TempDir()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func testDriver(t *testing.T, d Driver) {
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
defer d.Close()
|
defer d.Close()
|
||||||
|
|
||||||
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
|
||||||
t.Error("cannot get from empty:", err)
|
|
||||||
} else if b != nil {
|
|
||||||
t.Error("got fake from empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.ForEach(ctx, "m", func(string, []byte) error {
|
|
||||||
return errors.New("should have no hits")
|
|
||||||
}); err != nil {
|
|
||||||
t.Error("failed to forEach empty:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil {
|
|
||||||
t.Error("cannot set from empty:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
|
||||||
t.Error("cannot get from full:", err)
|
|
||||||
} else if string(b) != `"hello world"` {
|
|
||||||
t.Error("got fake from full")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.ForEach(ctx, "m", func(id string, v []byte) error {
|
|
||||||
if id != "id" {
|
|
||||||
t.Error("for each id weird:", id)
|
|
||||||
}
|
|
||||||
if string(v) != `"hello world"` {
|
|
||||||
t.Error("for each value weird:", string(v))
|
|
||||||
}
|
|
||||||
return io.EOF
|
|
||||||
}); err != io.EOF {
|
|
||||||
t.Error("failed to forEach full:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.Set(ctx, "m", "id", nil); err != nil {
|
|
||||||
t.Error("cannot set from full:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.ForEach(ctx, "m", func(string, []byte) error {
|
|
||||||
return errors.New("should have no hits")
|
|
||||||
}); err != nil {
|
|
||||||
t.Error("failed to forEach empty:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
|
||||||
t.Error("cannot get from deleted:", err)
|
|
||||||
} else if b != nil {
|
|
||||||
t.Error("got fake from deleted")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
15
go.mod
15
go.mod
@@ -3,17 +3,24 @@ module github.com/breel-render/spoc-bot-vr
|
|||||||
go 1.22.1
|
go 1.22.1
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/go-errors/errors v1.5.1
|
github.com/glebarez/go-sqlite v1.21.2
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
github.com/lib/pq v1.10.9
|
github.com/lib/pq v1.10.9
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1
|
github.com/nikolaydubina/llama2.go v0.7.1
|
||||||
github.com/tmc/langchaingo v0.1.8
|
github.com/tmc/langchaingo v0.1.8
|
||||||
go.etcd.io/bbolt v1.3.9
|
gotest.tools/v3 v3.5.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/dlclark/regexp2 v1.10.0 // indirect
|
github.com/dlclark/regexp2 v1.10.0 // indirect
|
||||||
github.com/gage-technologies/mistral-go v1.0.0 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/go-cmp v0.6.0 // indirect
|
||||||
|
github.com/mattn/go-isatty v0.0.19 // indirect
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
golang.org/x/sys v0.16.0 // indirect
|
golang.org/x/sys v0.16.0 // indirect
|
||||||
|
modernc.org/libc v1.22.5 // indirect
|
||||||
|
modernc.org/mathutil v1.5.0 // indirect
|
||||||
|
modernc.org/memory v1.5.0 // indirect
|
||||||
|
modernc.org/sqlite v1.23.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
32
go.sum
32
go.sum
@@ -2,29 +2,45 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
|||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
|
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
|
||||||
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
|
||||||
github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A=
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28=
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
|
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
|
||||||
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
|
||||||
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
|
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
|
||||||
|
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
||||||
|
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
|
||||||
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||||
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
|
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
|
||||||
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
|
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
|
||||||
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
|
|
||||||
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
|
|
||||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
|
||||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
|
||||||
|
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||||
|
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||||
|
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||||
|
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||||
|
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||||
|
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
|
||||||
|
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
|
||||||
|
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
|
||||||
|
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=
|
||||||
|
|||||||
146
main.go
146
main.go
@@ -4,7 +4,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -37,11 +36,25 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
case err := <-processSlackToMessagePipeline(ctx, cfg):
|
||||||
|
return err
|
||||||
case err := <-listenAndServe(ctx, cfg):
|
case err := <-listenAndServe(ctx, cfg):
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
|
||||||
|
errs := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer close(errs)
|
||||||
|
select {
|
||||||
|
case errs <- cfg.slackToMessagePipeline.Process(ctx):
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
func listenAndServe(ctx context.Context, cfg Config) chan error {
|
func listenAndServe(ctx context.Context, cfg Config) chan error {
|
||||||
s := http.Server{
|
s := http.Server{
|
||||||
Addr: fmt.Sprintf(":%d", cfg.Port),
|
Addr: fmt.Sprintf(":%d", cfg.Port),
|
||||||
@@ -64,12 +77,13 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|||||||
func newHandler(cfg Config) http.HandlerFunc {
|
func newHandler(cfg Config) http.HandlerFunc {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
|
||||||
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
||||||
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
||||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
||||||
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
||||||
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
||||||
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
@@ -82,25 +96,62 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !basicAuth(cfg, w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
channel := r.Header.Get("slack-channel")
|
||||||
|
token := r.Header.Get("slack-oauth-token")
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.Header.Set("Authorization", "Bearer "+token)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
defer io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
var page struct {
|
||||||
|
OK bool
|
||||||
|
Messages []json.RawMessage
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
} else if !page.OK {
|
||||||
|
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
errs := []error{}
|
||||||
|
for _, messageJSON := range page.Messages {
|
||||||
|
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !basicAuth(cfg, w, r) {
|
if !basicAuth(cfg, w, r) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
since, err := parseSince(r.URL.Query().Get("since"))
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,19 +161,7 @@ func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
since, err := parseSince(r.URL.Query().Get("since"))
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
events, err := cfg.storage.EventsSince(r.Context(), since)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
encodeResponse(w, r, map[string]any{"events": events})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,19 +171,7 @@ func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
since, err := parseSince(r.URL.Query().Get("since"))
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
messages, err := cfg.storage.MessagesSince(r.Context(), since)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
encodeResponse(w, r, map[string]any{"messages": messages})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,19 +181,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
since, err := parseSince(r.URL.Query().Get("since"))
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
encodeResponse(w, r, map[string]any{"threads": threads})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,14 +192,9 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
||||||
|
_ = thread
|
||||||
|
|
||||||
messages, err := cfg.storage.Thread(r.Context(), thread)
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
encodeResponse(w, r, map[string]any{"thread": messages})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,20 +256,12 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil {
|
||||||
if errors.Is(err, ErrIrrelevantMessage) {
|
log.Printf("failed to ingest: %v", err)
|
||||||
return
|
|
||||||
} else if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
|
||||||
log.Printf("failed to ingest %+v: %v", m, err)
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Printf("ingested %v", m.ID)
|
log.Printf("ingested")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
19
main_test.go
19
main_test.go
@@ -39,10 +39,12 @@ func TestRun(t *testing.T) {
|
|||||||
u := fmt.Sprintf("http://localhost:%d", port)
|
u := fmt.Sprintf("http://localhost:%d", port)
|
||||||
|
|
||||||
cfg := Config{}
|
cfg := Config{}
|
||||||
|
cfg.DatacenterPattern = renderDatacenterPattern
|
||||||
|
cfg.AssetPattern = renderAssetPattern
|
||||||
|
cfg.EventNamePattern = renderEventNamePattern
|
||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver = NewRAM()
|
cfg.driver, _ = NewDriver(ctx, "")
|
||||||
cfg.storage = NewStorage(cfg.driver)
|
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
||||||
cfg.queue = NewQueue(cfg.driver)
|
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
|
|
||||||
@@ -120,7 +122,7 @@ func TestRun(t *testing.T) {
|
|||||||
}
|
}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if result.EventNames[0] != "[Oregon-1] Wal Receive Count Alert" {
|
} else if result.EventNames[0] != "Wal Receive Count Alert" {
|
||||||
t.Fatal(result.EventNames)
|
t.Fatal(result.EventNames)
|
||||||
} else {
|
} else {
|
||||||
t.Logf("%+v", result)
|
t.Logf("%+v", result)
|
||||||
@@ -215,14 +217,17 @@ func TestRun(t *testing.T) {
|
|||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := csv.NewReader(resp.Body)
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
t.Logf("whole csv: \n%s", b)
|
||||||
|
|
||||||
|
dec := csv.NewReader(bytes.NewReader(b))
|
||||||
var lastLine []string
|
var lastLine []string
|
||||||
for {
|
for {
|
||||||
line, err := dec.Read()
|
line, err := dec.Read()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
t.Error(err)
|
t.Error("unexpected error while reading csv line:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if lastLine == nil {
|
if lastLine == nil {
|
||||||
@@ -230,7 +235,7 @@ func TestRun(t *testing.T) {
|
|||||||
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
|
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("%+v", line)
|
t.Logf("CSV line: %+v", line)
|
||||||
lastLine = line
|
lastLine = line
|
||||||
}
|
}
|
||||||
if lastLine == nil {
|
if lastLine == nil {
|
||||||
|
|||||||
43
message.go
43
message.go
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -59,8 +60,11 @@ func Deserialize(b []byte) (Message, error) {
|
|||||||
|
|
||||||
type (
|
type (
|
||||||
slackMessage struct {
|
slackMessage struct {
|
||||||
|
slackEvent
|
||||||
|
Type string
|
||||||
TS uint64 `json:"event_time"`
|
TS uint64 `json:"event_time"`
|
||||||
Event slackEvent
|
Event slackEvent
|
||||||
|
MessageTS string `json:"ts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
slackEvent struct {
|
slackEvent struct {
|
||||||
@@ -108,28 +112,43 @@ type (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func ParseSlack(b []byte, assetPattern, datacenterPattern, eventNamePattern string) (Message, error) {
|
func ParseSlack(b []byte, assetPattern, datacenterPattern, eventNamePattern string) (Message, error) {
|
||||||
m, err := parseSlackJSON(b)
|
return ParseSlackFromChannel(b, assetPattern, datacenterPattern, eventNamePattern, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseSlackFromChannel(b []byte, assetPattern, datacenterPattern, eventNamePattern string, ch string) (Message, error) {
|
||||||
|
m, err := parseSlackJSON(b, ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Message{}, err
|
return Message{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
asset := regexp.MustCompile(assetPattern)
|
for pattern, ptr := range map[string]*string{
|
||||||
datacenter := regexp.MustCompile(datacenterPattern)
|
assetPattern: &m.Asset,
|
||||||
eventName := regexp.MustCompile(eventNamePattern)
|
datacenterPattern: &m.Datacenter,
|
||||||
|
eventNamePattern: &m.EventName,
|
||||||
m.Asset = asset.FindString(m.Asset)
|
} {
|
||||||
m.Datacenter = datacenter.FindString(m.Datacenter)
|
r := regexp.MustCompile(pattern)
|
||||||
m.EventName = eventName.FindString(m.EventName)
|
parsed := r.FindString(*ptr)
|
||||||
|
for i, name := range r.SubexpNames() {
|
||||||
|
if i > 0 && name != "" {
|
||||||
|
parsed = r.FindStringSubmatch(*ptr)[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*ptr = parsed
|
||||||
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSlackJSON(b []byte) (Message, error) {
|
func parseSlackJSON(b []byte, ch string) (Message, error) {
|
||||||
s, err := _parseSlackJSON(b)
|
s, err := _parseSlackJSON(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Message{}, err
|
return Message{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ch != "" {
|
||||||
|
s.Event.Channel = ch
|
||||||
|
}
|
||||||
|
|
||||||
if s.Event.Bot.Name != "" {
|
if s.Event.Bot.Name != "" {
|
||||||
if len(s.Event.Attachments) == 0 {
|
if len(s.Event.Attachments) == 0 {
|
||||||
return Message{}, ErrIrrelevantMessage
|
return Message{}, ErrIrrelevantMessage
|
||||||
@@ -177,6 +196,12 @@ func parseSlackJSON(b []byte) (Message, error) {
|
|||||||
func _parseSlackJSON(b []byte) (slackMessage, error) {
|
func _parseSlackJSON(b []byte) (slackMessage, error) {
|
||||||
var result slackMessage
|
var result slackMessage
|
||||||
err := json.Unmarshal(b, &result)
|
err := json.Unmarshal(b, &result)
|
||||||
|
switch result.Type {
|
||||||
|
case "message":
|
||||||
|
result.Event = result.slackEvent
|
||||||
|
result.TS, _ = strconv.ParseUint(strings.Split(result.MessageTS, ".")[0], 10, 64)
|
||||||
|
result.Event.ID = result.MessageTS
|
||||||
|
}
|
||||||
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
||||||
result.Event.Blocks = result.Event.Nested.Blocks
|
result.Event.Blocks = result.Event.Nested.Blocks
|
||||||
result.Event.Bot = result.Event.Nested.Bot
|
result.Event.Bot = result.Event.Nested.Bot
|
||||||
|
|||||||
@@ -1,20 +1,14 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
renderAssetPattern = `(dpg|svc|red)-[a-z0-9-]*[a-z0-9]`
|
|
||||||
renderDatacenterPattern = `[a-z]{4}[a-z]*-[0-9]`
|
|
||||||
renderEventNamePattern = `(^\[[^\]]*\] *)?(?P<result>.*)`
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestParseSlackTestdata(t *testing.T) {
|
func TestParseSlackTestdata(t *testing.T) {
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
|
inCh string
|
||||||
slackMessage slackMessage
|
slackMessage slackMessage
|
||||||
message Message
|
message Message
|
||||||
}{
|
}{
|
||||||
@@ -120,6 +114,21 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Resolved: true,
|
Resolved: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"reingested_alert.json": {
|
||||||
|
inCh: "C06U1DDBBU4",
|
||||||
|
message: Message{
|
||||||
|
ID: "1712892637.037639/1712892637",
|
||||||
|
TS: 1712892637,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712892637037639",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712892637.037639",
|
||||||
|
EventName: "Alertconfig Workflow Failed",
|
||||||
|
Event: "11061",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "",
|
||||||
|
Resolved: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, d := range cases {
|
for name, d := range cases {
|
||||||
@@ -130,18 +139,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("parseSlackJSON", func(t *testing.T) {
|
t.Run("ParseSlackFromChannel "+want.inCh, func(t *testing.T) {
|
||||||
got, err := parseSlackJSON(b)
|
got, err := ParseSlackFromChannel(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern, want.inCh)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if fmt.Sprintf("%+v", got) != fmt.Sprintf("%+v", want.slackMessage) {
|
|
||||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.slackMessage, got)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ParseSlack", func(t *testing.T) {
|
|
||||||
got, err := ParseSlack(b, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
44
pipeline.go
Normal file
44
pipeline.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type (
|
||||||
|
Pipeline struct {
|
||||||
|
writer Queue
|
||||||
|
reader Queue
|
||||||
|
process processFunc
|
||||||
|
}
|
||||||
|
processFunc func(context.Context, []byte) ([]byte, error)
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: process,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p Pipeline) Process(ctx context.Context) error {
|
||||||
|
ctx, can := context.WithCancel(ctx)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
reservation, read, err := p.reader.Syn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
processed, err := p.process(ctx, read)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.writer.Enqueue(ctx, processed); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := p.reader.Ack(ctx, reservation); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
52
pipeline_test.go
Normal file
52
pipeline_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPipeline(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
driverOutput, _ := NewDriver(ctx, "")
|
||||||
|
output, err := NewQueue(ctx, "output", driverOutput)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
driverInput, _ := NewDriver(ctx, "")
|
||||||
|
input, err := NewQueue(ctx, "input", driverInput)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
found := map[string]struct{}{}
|
||||||
|
process := func(_ context.Context, v []byte) ([]byte, error) {
|
||||||
|
found[string(v)] = struct{}{}
|
||||||
|
return []byte("world"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ing := NewPipeline(output, input, process)
|
||||||
|
go func() {
|
||||||
|
defer can()
|
||||||
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if r, p, err := output.Syn(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if string(p) != "world" {
|
||||||
|
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
|
||||||
|
} else if err := output.Ack(ctx, r); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(found) != 1 {
|
||||||
|
t.Error(found)
|
||||||
|
}
|
||||||
|
}
|
||||||
117
queue.go
117
queue.go
@@ -2,57 +2,118 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-errors/errors"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Queue struct {
|
type Queue struct {
|
||||||
driver Driver
|
driver Driver
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(driver Driver) Queue {
|
func NewNoopQueue() Queue {
|
||||||
return Queue{driver: driver}
|
return Queue{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Push(ctx context.Context, m Message) error {
|
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||||
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
|
if _, err := driver.ExecContext(ctx, `
|
||||||
}
|
CREATE TABLE IF NOT EXISTS queue (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
|
topic TEXT NOT NULL,
|
||||||
for {
|
updated INTEGER NOT NULL,
|
||||||
m, err := q.peekFirst(ctx)
|
reservation TEXT,
|
||||||
if err != nil {
|
payload TEXT
|
||||||
return m, err
|
);
|
||||||
|
`); err != nil {
|
||||||
|
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
||||||
}
|
}
|
||||||
|
return Queue{topic: topic, driver: driver}, nil
|
||||||
|
}
|
||||||
|
|
||||||
if !m.Empty() {
|
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
||||||
return m, nil
|
if q.driver.DB == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
|
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
||||||
|
`,
|
||||||
|
q.topic,
|
||||||
|
time.Now().Unix(),
|
||||||
|
b,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||||
|
if q.driver.DB == nil {
|
||||||
|
return "", nil, nil
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
reservation, m, err := q.syn(ctx)
|
||||||
|
if reservation != nil || err != nil {
|
||||||
|
return string(reservation), m, err
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return Message{}, ctx.Err()
|
return "", nil, ctx.Err()
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Ack(ctx context.Context, id string) error {
|
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||||
return q.driver.Set(ctx, "q", id, nil)
|
now := time.Now().Unix()
|
||||||
|
reservation := []byte(uuid.New().String())
|
||||||
|
var payload []byte
|
||||||
|
if result, err := q.driver.ExecContext(ctx, `
|
||||||
|
UPDATE queue
|
||||||
|
SET
|
||||||
|
updated = ?, reservation = ?
|
||||||
|
WHERE
|
||||||
|
id IN (
|
||||||
|
SELECT id
|
||||||
|
FROM queue
|
||||||
|
WHERE
|
||||||
|
topic == ?
|
||||||
|
AND (
|
||||||
|
reservation IS NULL
|
||||||
|
OR ? - updated > 60
|
||||||
|
)
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
`, now, reservation, q.topic, now); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
||||||
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||||
|
} else if n == 0 {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
row := q.driver.QueryRowContext(ctx, `
|
||||||
|
SELECT payload
|
||||||
|
FROM queue
|
||||||
|
WHERE reservation==?
|
||||||
|
LIMIT 1
|
||||||
|
`, reservation)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||||
|
} else if err := row.Scan(&payload); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return reservation, payload, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
|
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
||||||
var m Message
|
if q.driver.DB == nil {
|
||||||
subctx, subcan := context.WithCancel(ctx)
|
|
||||||
defer subcan()
|
|
||||||
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
|
|
||||||
m = MustDeserialize(value)
|
|
||||||
subcan()
|
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
if errors.Is(err, subctx.Err()) {
|
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
return m, err
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
|
DELETE FROM queue
|
||||||
|
WHERE reservation==?
|
||||||
|
`, reservation)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,24 +11,64 @@ func TestQueue(t *testing.T) {
|
|||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
q := NewQueue(NewRAM())
|
driver, _ := NewDriver(ctx, "")
|
||||||
|
q, err := NewQueue(ctx, "", driver)
|
||||||
for i := 0; i < 39; i++ {
|
if err != nil {
|
||||||
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
|
t.Fatal(err)
|
||||||
t.Fatal(i, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
qOther, _ := NewQueue(ctx, "other", driver)
|
||||||
|
|
||||||
found := map[uint64]struct{}{}
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
for i := 0; i < 39; i++ {
|
t.Errorf("able to syn before any enqueues created: %v", err)
|
||||||
if m, err := q.PeekFirst(ctx); err != nil {
|
|
||||||
t.Fatal(i, err)
|
|
||||||
} else if _, ok := found[m.TS]; ok {
|
|
||||||
t.Error(i, m.TS)
|
|
||||||
} else if err := q.Ack(ctx, m.ID); err != nil {
|
|
||||||
t.Fatal(i, err)
|
|
||||||
} else {
|
} else {
|
||||||
found[m.TS] = struct{}{}
|
t.Logf("sync before enqueues: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("enqueue", func(t *testing.T) {
|
||||||
|
for i := 0; i < 39; i++ {
|
||||||
|
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
||||||
|
t.Fatal(i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("syn ack", func(t *testing.T) {
|
||||||
|
found := map[string]struct{}{}
|
||||||
|
for i := 0; i < 39; i++ {
|
||||||
|
if reservation, b, err := q.Syn(ctx); err != nil {
|
||||||
|
t.Fatal(i, "syn err", err)
|
||||||
|
} else if _, ok := found[string(b)]; ok {
|
||||||
|
t.Errorf("syn'd %q twice (%+v)", b, found)
|
||||||
|
} else if err := q.Ack(ctx, reservation); err != nil {
|
||||||
|
t.Fatal(i, "failed to ack", err)
|
||||||
|
} else {
|
||||||
|
found[string(b)] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
|
t.Errorf("able to syn 1 more message than created: %v", err)
|
||||||
|
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
|
||||||
|
t.Errorf("unable to syn from other topic: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Logf("empty q.syn = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("noop", func(t *testing.T) {
|
||||||
|
q := NewNoopQueue()
|
||||||
|
if err := q.Enqueue(nil, nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if _, _, err := q.Syn(nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if err := q.Ack(nil, ""); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
130
report.tmpl
130
report.tmpl
@@ -1,130 +0,0 @@
|
|||||||
<!DOCTYPE html>
|
|
||||||
<html>
|
|
||||||
<head>
|
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
||||||
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css">
|
|
||||||
<script type="module">
|
|
||||||
import * as d3 from "https://cdn.jsdelivr.net/npm/d3@7/+esm";
|
|
||||||
</script>
|
|
||||||
<script>
|
|
||||||
const allMessages = {{ json "Marshal" .messages }};
|
|
||||||
console.log(allMessages);
|
|
||||||
|
|
||||||
function fillForm() {
|
|
||||||
const filterableFields = [
|
|
||||||
"EventName",
|
|
||||||
"Asset",
|
|
||||||
];
|
|
||||||
const fieldsToOptions = {};
|
|
||||||
filterableFields.map((field) => {fieldsToOptions[field] = {}});
|
|
||||||
allMessages.map((message) => {
|
|
||||||
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field][message[field]] = true});
|
|
||||||
});
|
|
||||||
Object.keys(fieldsToOptions).map((field) => {fieldsToOptions[field] = Object.keys(fieldsToOptions[field]); fieldsToOptions[field].sort();});
|
|
||||||
|
|
||||||
document.getElementById("form").innerHTML = Object.keys(fieldsToOptions).map((field) => {
|
|
||||||
return `
|
|
||||||
<label for="${field}">${field}</label>
|
|
||||||
<select name="${field}" multiple ${fieldsToOptions[field].length > 10 ? "size=10" : `size=${fieldsToOptions[field].length}`}>
|
|
||||||
${fieldsToOptions[field].map((option) => `
|
|
||||||
<option selected>${option}</option>
|
|
||||||
`)}
|
|
||||||
</select>
|
|
||||||
`
|
|
||||||
}).join("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
function drawAll() {
|
|
||||||
const messages = filterMessages(allMessages)
|
|
||||||
drawEventVolumeByName(messages)
|
|
||||||
drawEventVolumeByWeekday(messages)
|
|
||||||
drawEventVolumeByHour(messages)
|
|
||||||
drawEventVolumeByAsset(messages)
|
|
||||||
}
|
|
||||||
|
|
||||||
function filterMessages(messages) {
|
|
||||||
const filters = document.getElementById("form");
|
|
||||||
console.log(filters);
|
|
||||||
return messages.map(() => {
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function drawEventVolumeByName() {}
|
|
||||||
|
|
||||||
function drawEventVolumeByWeekday() {}
|
|
||||||
|
|
||||||
function drawEventVolumeByHour() {}
|
|
||||||
|
|
||||||
function drawEventVolumeByAsset() {}
|
|
||||||
</script>
|
|
||||||
<style>
|
|
||||||
rows {
|
|
||||||
display: flex;
|
|
||||||
flex-direction: column;
|
|
||||||
flex-grow: 1;
|
|
||||||
}
|
|
||||||
columns {
|
|
||||||
display: flex;
|
|
||||||
flex-direction: row;
|
|
||||||
flex-grow: 1;
|
|
||||||
}
|
|
||||||
rows, columns { border: 1px solid red; }
|
|
||||||
</style>
|
|
||||||
</head>
|
|
||||||
<body onload="fillForm(); drawAll();">
|
|
||||||
<h1>Report</h1>
|
|
||||||
<columns>
|
|
||||||
<form style="width: 16em; flex-shrink: 0;" onsubmit="drawAll(); return false;">
|
|
||||||
<columns>
|
|
||||||
<button type="submit">Apply</button>
|
|
||||||
</columns>
|
|
||||||
<rows id="form"></rows>
|
|
||||||
</form>
|
|
||||||
<rows>
|
|
||||||
<rows>
|
|
||||||
<rows>
|
|
||||||
<h2>Event Volume by Name</h2>
|
|
||||||
<div>DRAW ME</div>
|
|
||||||
</rows>
|
|
||||||
<columns>
|
|
||||||
<rows>
|
|
||||||
<h3>by Weekday</h3>
|
|
||||||
<div>DRAW ME</div>
|
|
||||||
</rows>
|
|
||||||
<rows>
|
|
||||||
<h3>by Hour</h3>
|
|
||||||
<div>DRAW ME</div>
|
|
||||||
</rows>
|
|
||||||
</columns>
|
|
||||||
<rows>
|
|
||||||
<h3>by Asset</h3>
|
|
||||||
<div>DRAW ME</div>
|
|
||||||
</rows>
|
|
||||||
</rows>
|
|
||||||
<rows>
|
|
||||||
<div>
|
|
||||||
<h2>Events</h2>
|
|
||||||
<table>
|
|
||||||
<tr>
|
|
||||||
<th>TS</th>
|
|
||||||
<th>Event</th>
|
|
||||||
<th>EventName</th>
|
|
||||||
<th>Latest</th>
|
|
||||||
</tr>
|
|
||||||
{{ range .events.Events }}
|
|
||||||
<tr>
|
|
||||||
<td><a href="{{ .First.Source }}">{{ time "Unix" .First.TS | time "Time.Format" "Mon Jan 02" }}</a></td>
|
|
||||||
<td><a href="https://TODO">{{ .Event }}</a></td>
|
|
||||||
<td>{{ .First.EventName }}</td>
|
|
||||||
<td><a href="{{ .Last.Source }}">{{ .Last.Plaintext }}</a></td>
|
|
||||||
</tr>
|
|
||||||
{{ end }}
|
|
||||||
</table>
|
|
||||||
</div>
|
|
||||||
</rows>
|
|
||||||
</rows>
|
|
||||||
</columns>
|
|
||||||
</body>
|
|
||||||
<footer>
|
|
||||||
</footer>
|
|
||||||
</html>
|
|
||||||
36
slack.go
Normal file
36
slack.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlackToMessage struct {
|
||||||
|
pipeline Pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: newSlackToMessageProcess(cfg),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSlackToMessageProcess(cfg Config) processFunc {
|
||||||
|
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||||
|
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
||||||
|
}
|
||||||
|
return m.Serialize(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
50
slack_test.go
Normal file
50
slack_test.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gotest.tools/v3/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSlackToMessagePipeline(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
driver, _ := NewDriver(ctx, "/tmp/f")
|
||||||
|
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
if err := pipeline.Process(ctx); err != nil && ctx.Err() == nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
want := Message{
|
||||||
|
ID: "1712927439.728409/1712927439",
|
||||||
|
TS: 1712927439,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712927439.728409",
|
||||||
|
EventName: "",
|
||||||
|
Event: "11071",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
}
|
||||||
|
|
||||||
|
b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json")
|
||||||
|
if err := pipeline.reader.Enqueue(ctx, b); err != nil {
|
||||||
|
t.Fatal("failed to enqueue", err)
|
||||||
|
}
|
||||||
|
if _, b2, err := pipeline.writer.Syn(ctx); err != nil {
|
||||||
|
t.Fatal("failed to syn", err)
|
||||||
|
} else if m := MustDeserialize(b2); false {
|
||||||
|
} else {
|
||||||
|
assert.DeepEqual(t, want, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
96
storage.go
96
storage.go
@@ -1,96 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrNotFound = errors.New("not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Storage struct {
|
|
||||||
driver Driver
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStorage(driver Driver) Storage {
|
|
||||||
return Storage{driver: driver}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
|
|
||||||
return s.messagesWhere(ctx, func(m Message) bool {
|
|
||||||
return !t.After(m.Time())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) Threads(ctx context.Context) ([]string, error) {
|
|
||||||
return s.ThreadsSince(ctx, time.Unix(0, 0))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
|
|
||||||
messages, err := s.MessagesSince(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
values := map[string]struct{}{}
|
|
||||||
for _, m := range messages {
|
|
||||||
values[fielder(m)] = struct{}{}
|
|
||||||
}
|
|
||||||
result := make([]string, 0, len(values))
|
|
||||||
for k := range values {
|
|
||||||
result = append(result, k)
|
|
||||||
}
|
|
||||||
sort.Strings(result)
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
|
|
||||||
return s.messagesWhere(ctx, func(m Message) bool {
|
|
||||||
return m.Thread == thread
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
|
|
||||||
result := make([]Message, 0)
|
|
||||||
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
|
|
||||||
m := MustDeserialize(v)
|
|
||||||
if !where(m) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
result = append(result, m)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
sort.Slice(result, func(i, j int) bool {
|
|
||||||
return result[i].TS < result[j].TS
|
|
||||||
})
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) Upsert(ctx context.Context, m Message) error {
|
|
||||||
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
|
|
||||||
b, err := s.driver.Get(ctx, "m", id)
|
|
||||||
if err != nil {
|
|
||||||
return Message{}, err
|
|
||||||
}
|
|
||||||
if b == nil {
|
|
||||||
return Message{}, ErrNotFound
|
|
||||||
}
|
|
||||||
return MustDeserialize(b), nil
|
|
||||||
}
|
|
||||||
@@ -1,67 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStorage(t *testing.T) {
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
t.Run("Threads", func(t *testing.T) {
|
|
||||||
s := NewStorage(NewRAM())
|
|
||||||
|
|
||||||
mX1 := Message{ID: "1", Thread: "X", TS: 1}
|
|
||||||
mX2 := Message{ID: "2", Thread: "X", TS: 2}
|
|
||||||
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
|
|
||||||
|
|
||||||
for _, m := range []Message{mX1, mX2, mY1} {
|
|
||||||
if err := s.Upsert(ctx, m); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if threads, err := s.Threads(ctx); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
} else if len(threads) != 2 {
|
|
||||||
t.Error(threads)
|
|
||||||
} else if threads[0] != "X" {
|
|
||||||
t.Error(threads, "X")
|
|
||||||
} else if threads[1] != "Y" {
|
|
||||||
t.Error(threads, "Y")
|
|
||||||
}
|
|
||||||
|
|
||||||
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
} else if len(threads) != 1 {
|
|
||||||
t.Error(threads)
|
|
||||||
} else if threads[0] != "Y" {
|
|
||||||
t.Error(threads[0])
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Get Upsert", func(t *testing.T) {
|
|
||||||
s := NewStorage(NewRAM())
|
|
||||||
|
|
||||||
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
|
|
||||||
t.Error("failed to get 404", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m := Message{
|
|
||||||
ID: "id",
|
|
||||||
TS: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.Upsert(ctx, m); err != nil {
|
|
||||||
t.Error("failed to upsert", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if m2, err := s.Get(ctx, "id"); err != nil {
|
|
||||||
t.Error("failed to get", err)
|
|
||||||
} else if m != m2 {
|
|
||||||
t.Error(m2)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
57
testdata/slack_events/reingested_alert.json
vendored
Normal file
57
testdata/slack_events/reingested_alert.json
vendored
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
{
|
||||||
|
"user": "U03RUK7FBUY",
|
||||||
|
"type": "message",
|
||||||
|
"ts": "1712892637.037639",
|
||||||
|
"edited": {
|
||||||
|
"user": "B03RHGBPH2M",
|
||||||
|
"ts": "1712896236.000000"
|
||||||
|
},
|
||||||
|
"bot_id": "B03RHGBPH2M",
|
||||||
|
"app_id": "A286WATV2",
|
||||||
|
"text": "",
|
||||||
|
"team": "T9RQLQ0KV",
|
||||||
|
"bot_profile": {
|
||||||
|
"id": "B03RHGBPH2M",
|
||||||
|
"app_id": "A286WATV2",
|
||||||
|
"name": "Opsgenie for Alert Management",
|
||||||
|
"icons": {
|
||||||
|
"image_36": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_36.png",
|
||||||
|
"image_48": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_48.png",
|
||||||
|
"image_72": "https://avatars.slack-edge.com/2019-05-30/652285939191_7831939cc30ef7159561_72.png"
|
||||||
|
},
|
||||||
|
"deleted": false,
|
||||||
|
"updated": 1658887059,
|
||||||
|
"team_id": "T9RQLQ0KV"
|
||||||
|
},
|
||||||
|
"attachments": [
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"color": "2ecc71",
|
||||||
|
"fallback": "\"[Grafana]: Firing: Alertconfig Workflow Failed\" <https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfcb2ead-1712892636514|11061>\nTags: alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
"text": "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
"title": "#11061: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
"title_link": "https://opsg.in/a/i/render/bdbbe5a6-738b-4643-9267-39d8dfc$2ead-1712892636514",
|
||||||
|
"callback_id": "bbd4a269-08a9-470e-ba79-ce238ac03dc7_05fa2e9b-bec4-4a7e-842d-36043d267a13_11061",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"value": "P3",
|
||||||
|
"title": "Priority",
|
||||||
|
"short": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
"title": "Tags",
|
||||||
|
"short": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "Datastores Non-Critical",
|
||||||
|
"title": "Routed Teams",
|
||||||
|
"short": true
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"mrkdwn_in": [
|
||||||
|
"text"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user