Compare commits

..

15 Commits

Author SHA1 Message Date
Bel LaPointe
d9d91193dd oh yeah one big todo 2024-04-16 11:03:14 -06:00
Bel LaPointe
2033cefc2a todo now that we have https://grafana.render.com/d/a5df6a90-087b-4655-90d0-e51dcb89f568/2024-04-16-spocbotvr?orgId=1&from=now-7d&to=now&var-interval=1h&var-Teams=All&var-Assets=All&var-Name=All&var-Names=All 2024-04-16 11:01:45 -06:00
Bel LaPointe
600a2e0111 drop URL from Message as it is unfilled and superfluous 2024-04-16 09:13:44 -06:00
Bel LaPointe
4f3b8ec866 we are ready to ship 2024-04-16 09:09:44 -06:00
Bel LaPointe
c4a7eaf04a i am capped at 9args per a bug somewhere between pq and postgres but heck if i care 2024-04-16 09:06:32 -06:00
Bel LaPointe
5557c0920a pg passing mvp test with queue 2024-04-16 08:53:09 -06:00
Bel LaPointe
c7f5cdb040 TODO 2024-04-16 08:33:30 -06:00
Bel LaPointe
a8e8fdc451 synchronous fanout from channel to threads for scrape 2024-04-16 08:33:21 -06:00
Bel LaPointe
d88a8bb23a oof string != byte arr got it 2024-04-16 08:08:47 -06:00
Bel LaPointe
39c0056190 found running locally that i dont need rest, pipeline needs a way to drop messages as garbage 2024-04-16 07:56:54 -06:00
Bel LaPointe
d87af2fadc readme todo 2024-04-16 07:40:51 -06:00
Bel LaPointe
9bc47bfde6 added scraping Routing Team as Team 2024-04-16 07:40:43 -06:00
Bel LaPointe
098986eb07 nil ptr in main test on pipeline spinup race 2024-04-16 07:36:12 -06:00
Bel LaPointe
5bc068451f find Author from slack 2024-04-16 07:35:58 -06:00
Bel LaPointe
5fa21d0cd9 model to persist pipeline tests OK 2024-04-16 07:29:42 -06:00
16 changed files with 339 additions and 305 deletions

View File

@@ -4,18 +4,15 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
## TODO ## TODO
- to class design for joins and external tables - dedupe critical+noncritical
- dedupe multi channels
- add Team to Message
- from sync ingest to ingestQueue that retries a few times so user messages can find parent
- limit queue retries
- share postgres with Grafana
- new dash in Grafana
- what SLO/SLI can I help benoit with - what SLO/SLI can I help benoit with
- break into smaller goals - break into smaller goals
- sell to the team - sell to the team
- scott; like to keep state in incident.io and zendesk - scott; like to keep state in incident.io and zendesk
- @spoc -ignore, @spoc -s summary
- limit rps from rpc/slackscrape
- rpc/slackscrape to async
- limit queue retries
``` ```
erDiagram erDiagram

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"log"
"os" "os"
"regexp" "regexp"
"strconv" "strconv"
@@ -106,8 +107,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, err return Config{}, err
} }
ctx, can := context.WithTimeout(ctx, time.Second*10) ctx, can := context.WithTimeout(ctx, time.Minute)
defer can() defer can()
driver, err := NewDriver(ctx, result.DriverConn) driver, err := NewDriver(ctx, result.DriverConn)
if err != nil { if err != nil {
return Config{}, err return Config{}, err
@@ -118,6 +120,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} else { } else {
return Config{}, errors.New("not impl") return Config{}, errors.New("not impl")
} }
if result.Debug {
log.Printf("connected to driver at %s (%s @%s)", result.DriverConn, result.driver.engine, result.driver.conn)
}
storage, err := NewStorage(ctx, result.driver) storage, err := NewStorage(ctx, result.driver)
if err != nil { if err != nil {

View File

@@ -14,6 +14,8 @@ import (
) )
type Driver struct { type Driver struct {
engine string
conn string
*sql.DB *sql.DB
} }
@@ -47,7 +49,7 @@ func NewDriver(ctx context.Context, conn string) (Driver, error) {
return Driver{}, err return Driver{}, err
} }
driver := Driver{DB: db} driver := Driver{DB: db, conn: conn, engine: engine}
if err := driver.setup(ctx); err != nil { if err := driver.setup(ctx); err != nil {
driver.Close() driver.Close()
return Driver{}, fmt.Errorf("failed setup: %w", err) return Driver{}, fmt.Errorf("failed setup: %w", err)

View File

@@ -0,0 +1,53 @@
//go:build integration
package main
import (
"context"
"os"
"testing"
"time"
"github.com/breel-render/spoc-bot-vr/model"
)
func TestDriverIntegration(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*30)
defer can()
driver, err := NewDriver(ctx, os.Getenv("DRIVER_CONN"))
if err != nil {
t.Fatal(err)
}
defer driver.Close()
q, err := NewQueue(ctx, t.Name(), driver)
if err != nil {
t.Fatal(err)
}
qV := []byte("hello")
if err := q.Enqueue(ctx, qV); err != nil {
t.Error("q cannot enqueue:", err)
} else if reservation, v, err := q.Syn(ctx); err != nil {
t.Error("q cannot syn:", err)
} else if string(v) != string(qV) {
t.Error("q enqueued wrong:", string(v))
} else if len(reservation) == 0 {
t.Error("q didnt have reservation")
} else if err := q.Ack(ctx, reservation); err != nil {
t.Error("q cannot ack:", err)
}
s, err := NewStorage(ctx, driver)
if err != nil {
t.Fatal(err)
}
evt := model.Event{ID: "x", Name: "y"}
if err := s.UpsertEvent(ctx, evt); err != nil {
t.Error("s cannot upsert:", err)
} else if e, err := s.GetEvent(ctx, evt.ID); err != nil {
t.Error("s cannot get:", err)
} else if e != evt {
t.Error("s upserted wrong:", e)
}
}

160
main.go
View File

@@ -48,7 +48,6 @@ func run(ctx context.Context, cfg Config) error {
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error { func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx) ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first) pipelines = append(pipelines, first)
errs := make(chan error) errs := make(chan error)
@@ -86,11 +85,6 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc { func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
@@ -113,97 +107,81 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
channel := r.Header.Get("slack-channel") channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token") token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req) httpc := http.Client{Timeout: time.Second}
if err != nil { get := func(url string) ([]byte, error) {
http.Error(w, err.Error(), http.StatusBadGateway) req, err := http.NewRequest(http.MethodGet, url, nil)
return if err != nil {
} return nil, err
defer resp.Body.Close() }
defer io.Copy(io.Discard, resp.Body) req.Header.Set("Authorization", "Bearer "+token)
req = req.WithContext(r.Context())
var page struct { resp, err := httpc.Do(req)
OK bool if err != nil {
Messages []json.RawMessage return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
} }
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway) n := 0
return
} else if !page.OK { for len(urls) > 0 {
http.Error(w, "slack page was !.ok", http.StatusBadGateway) url := urls[0]
return urls = urls[1:]
} select {
errs := []error{} case <-r.Context().Done():
for _, messageJSON := range page.Messages { case <-time.After(time.Second):
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { }
errs = append(errs, err) body, err := get(url)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
} else {
n += 1
}
if !strings.Contains(url, "ts=") {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
}
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
} }
} }
if len(errs) > 0 { json.NewEncoder(w).Encode(map[string]any{"scraped": n})
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
_ = thread
http.Error(w, "not impl", http.StatusNotImplemented)
} }
} }

View File

@@ -3,8 +3,6 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/csv"
"encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@@ -45,8 +43,11 @@ func TestRun(t *testing.T) {
cfg.EventNamePattern = renderEventNamePattern cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port cfg.Port = port
cfg.driver = NewTestDriver(t) cfg.driver = NewTestDriver(t)
cfg.storage, _ = NewStorage(ctx, cfg.driver)
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
go func() { go func() {
if err := run(ctx, cfg); err != nil && ctx.Err() == nil { if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
@@ -82,165 +83,4 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b) t.Fatalf("(%d) %s", resp.StatusCode, b)
} }
}) })
t.Run("GET /api/v1/messages", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Messages []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Messages) != 1 {
t.Fatal(result.Messages)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/eventnames", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
EventNames []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.EventNames[0] != "Wal Receive Count Alert" {
t.Fatal(result.EventNames)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/events", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Events []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.Events[0] != "11067" {
t.Fatal(result.Events)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/threads", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Threads []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.Threads[0] != "1712911957.023359" {
t.Fatal(result.Threads)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Thread []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Thread) != 1 {
t.Fatal(result.Thread)
} else {
t.Logf("%+v", result)
}
})
t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "text/csv")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
b, _ := io.ReadAll(resp.Body)
t.Logf("whole csv: \n%s", b)
dec := csv.NewReader(bytes.NewReader(b))
var lastLine []string
for {
line, err := dec.Read()
if err == io.EOF {
break
} else if err != nil {
t.Error("unexpected error while reading csv line:", err)
}
if lastLine == nil {
} else if len(lastLine) != len(line) {
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
}
t.Logf("CSV line: %+v", line)
lastLine = line
}
if lastLine == nil {
t.Error("no lines found")
}
})
} }

View File

@@ -4,18 +4,16 @@ package model
type Message struct { type Message struct {
Updated uint64 Updated uint64
ID string ID string
URL string
TS uint64 TS uint64
Author string Author string
Plaintext string Plaintext string
ThreadID string ThreadID string
} }
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message { func NewMessage(ID string, TS uint64, Author, Plaintext string, ThreadID string) Message {
return Message{ return Message{
Updated: updated(), Updated: updated(),
ID: ID, ID: ID,
URL: URL,
TS: TS, TS: TS,
Author: Author, Author: Author,
Plaintext: Plaintext, Plaintext: Plaintext,

View File

@@ -2,15 +2,23 @@ package main
import ( import (
"context" "context"
"errors" "encoding/json"
"fmt"
"log"
) )
type ModelToPersistence struct { type ModelToPersistence struct {
pipeline Pipeline pipeline Pipeline
} }
type ModelIDs struct {
Event string
Message string
Thread string
}
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver) reader, err := NewQueue(ctx, "new_models", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
@@ -18,15 +26,41 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
writer = NewNoopQueue()
return Pipeline{ return Pipeline{
writer: writer, writer: writer,
reader: reader, reader: reader,
process: newModelToPersistenceProcess(cfg.driver), process: newModelToPersistenceProcess(cfg.storage),
}, nil }, nil
} }
func newModelToPersistenceProcess(driver Driver) processFunc { func newModelToPersistenceProcess(storage Storage) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) { return func(ctx context.Context, models []byte) ([]byte, error) {
return nil, errors.New("not impl") var m Models
if err := json.Unmarshal(models, &m); err != nil {
return nil, fmt.Errorf("received non models payload: %w", err)
}
if m.Event.Empty() {
} else if err := storage.UpsertEvent(ctx, m.Event); err != nil {
return nil, fmt.Errorf("failed to persist event: %w", err)
}
if m.Thread.Empty() {
} else if err := storage.UpsertThread(ctx, m.Thread); err != nil {
return nil, fmt.Errorf("failed to persist thread: %w", err)
}
if m.Message.Empty() {
} else if err := storage.UpsertMessage(ctx, m.Message); err != nil {
return nil, fmt.Errorf("failed to persist message: %w", err)
}
log.Printf("persisted models")
return json.Marshal(ModelIDs{
Event: m.Event.ID,
Thread: m.Thread.ID,
Message: m.Message.ID,
})
} }
} }

View File

@@ -2,8 +2,11 @@ package main
import ( import (
"context" "context"
"encoding/json"
"testing" "testing"
"time" "time"
"github.com/breel-render/spoc-bot-vr/model"
) )
func TestModelToPersistenceProcessor(t *testing.T) { func TestModelToPersistenceProcessor(t *testing.T) {
@@ -12,7 +15,49 @@ func TestModelToPersistenceProcessor(t *testing.T) {
defer can() defer can()
d := NewTestDriver(t) d := NewTestDriver(t)
process := newModelToPersistenceProcess(d) s, _ := NewStorage(ctx, d)
process := newModelToPersistenceProcess(s)
_, _ = ctx, process _, _ = ctx, process
inputModels := Models{
Event: model.Event{ID: "event", Asset: "event-asset"},
//Thread: {ID: "thread", Channel: "thread-channel"},
Message: model.Message{ID: "message", Plaintext: "message-plaintext"},
}
input, _ := json.Marshal(inputModels)
var outputModelIDs ModelIDs
var n int
if output, err := process(ctx, input); err != nil {
t.Fatal(err)
} else if err := json.Unmarshal(output, &outputModelIDs); err != nil {
t.Fatal(err)
} else if outputModelIDs != (ModelIDs{Event: "event", Message: "message"}) {
t.Error(outputModelIDs)
}
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`); row.Err() != nil {
t.Error("cant count events:", row.Err())
} else if err := row.Scan(&n); err != nil {
t.Error("cant count events:", err)
} else if n != 1 {
t.Error("bad event count:", n)
}
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM threads`); row.Err() != nil {
t.Error("cant count threads:", row.Err())
} else if err := row.Scan(&n); err != nil {
t.Error("cant count threads:", err)
} else if n != 0 {
t.Error("bad thread count:", n)
}
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM messages`); row.Err() != nil {
t.Error("cant count messages:", row.Err())
} else if err := row.Scan(&n); err != nil {
t.Error("cant count messages:", err)
} else if n != 1 {
t.Error("bad message count:", n)
}
} }

View File

@@ -1,6 +1,9 @@
package main package main
import "context" import (
"context"
"log"
)
type ( type (
Pipeline struct { Pipeline struct {
@@ -22,7 +25,14 @@ func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
func (p Pipeline) Process(ctx context.Context) error { func (p Pipeline) Process(ctx context.Context) error {
ctx, can := context.WithCancel(ctx) ctx, can := context.WithCancel(ctx)
defer can() defer can()
err := p.processUntilErr(ctx)
if err != nil {
log.Printf("pipeline failed to process: %v", err)
}
return err
}
func (p Pipeline) processUntilErr(ctx context.Context) error {
for ctx.Err() == nil { for ctx.Err() == nil {
reservation, read, err := p.reader.Syn(ctx) reservation, read, err := p.reader.Syn(ctx)
if err != nil { if err != nil {
@@ -32,7 +42,8 @@ func (p Pipeline) Process(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
if err := p.writer.Enqueue(ctx, processed); err != nil { if processed == nil {
} else if err := p.writer.Enqueue(ctx, processed); err != nil {
return err return err
} }
if err := p.reader.Ack(ctx, reservation); err != nil { if err := p.reader.Ack(ctx, reservation); err != nil {

View File

@@ -6,6 +6,47 @@ import (
"time" "time"
) )
func TestPipelineDoesntPushEmptyMessage(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
output, _ := NewQueue(ctx, "output", NewTestDriver(t))
input, _ := NewQueue(ctx, "input", NewTestDriver(t))
calls := 0
process := func(_ context.Context, v []byte) ([]byte, error) {
calls += 1
return nil, nil
}
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
t.Error(err)
}
ing := NewPipeline(output, input, process)
go func() {
defer can()
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
for ctx.Err() == nil {
if calls != 0 {
break
}
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 100):
}
}
if r, _, _ := output.syn(ctx); len(r) != 0 {
t.Error("something was pushed to out queue even though processor didnt emit content")
}
}
func TestPipeline(t *testing.T) { func TestPipeline(t *testing.T) {
t.Parallel() t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)

View File

@@ -20,7 +20,7 @@ func NewNoopQueue() Queue {
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) { func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
if _, err := driver.ExecContext(ctx, ` if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS queue ( CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY, id TEXT PRIMARY KEY,
topic TEXT NOT NULL, topic TEXT NOT NULL,
updated INTEGER NOT NULL, updated INTEGER NOT NULL,
reservation TEXT, reservation TEXT,
@@ -37,11 +37,12 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
return nil return nil
} }
_, err := q.driver.ExecContext(ctx, ` _, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?) INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
`, `,
q.topic, q.topic,
time.Now().Unix(), time.Now().Unix(),
b, b,
uuid.New().String(),
) )
return err return err
} }
@@ -59,7 +60,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return "", nil, ctx.Err() return "", nil, ctx.Err()
case <-time.After(time.Second): case <-time.After(time.Millisecond * 500):
} }
} }
} }
@@ -71,16 +72,16 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
if result, err := q.driver.ExecContext(ctx, ` if result, err := q.driver.ExecContext(ctx, `
UPDATE queue UPDATE queue
SET SET
updated = ?, reservation = ? updated = $1, reservation = $2
WHERE WHERE
id IN ( id IN (
SELECT id SELECT id
FROM queue FROM queue
WHERE WHERE
topic == ? topic = $3
AND ( AND (
reservation IS NULL reservation IS NULL
OR ? - updated > 60 OR $4 - updated > 60
) )
LIMIT 1 LIMIT 1
) )
@@ -95,7 +96,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
row := q.driver.QueryRowContext(ctx, ` row := q.driver.QueryRowContext(ctx, `
SELECT payload SELECT payload
FROM queue FROM queue
WHERE reservation==? WHERE reservation=$1
LIMIT 1 LIMIT 1
`, reservation) `, reservation)
if err := row.Err(); err != nil { if err := row.Err(); err != nil {
@@ -108,12 +109,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
} }
func (q Queue) Ack(ctx context.Context, reservation string) error { func (q Queue) Ack(ctx context.Context, reservation string) error {
return q.ack(ctx, []byte(reservation))
}
func (q Queue) ack(ctx context.Context, reservation []byte) error {
if q.driver.DB == nil { if q.driver.DB == nil {
return nil return nil
} }
_, err := q.driver.ExecContext(ctx, ` result, err := q.driver.ExecContext(ctx, `
DELETE FROM queue DELETE FROM queue
WHERE reservation==? WHERE reservation=$1
`, reservation) `, reservation)
if err != nil {
return err
}
if n, _ := result.RowsAffected(); n != 1 {
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
}
return err return err
} }

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"log"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@@ -32,7 +33,7 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
writer, err := NewQueue(ctx, "new_message", cfg.driver) writer, err := NewQueue(ctx, "new_models", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
@@ -46,8 +47,13 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
func newSlackToModelProcess(cfg Config) processFunc { func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) { return func(ctx context.Context, slack []byte) ([]byte, error) {
s, err := parseSlack(slack) s, err := parseSlack(slack)
if err != nil { if cfg.Debug {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack) log.Printf("%v: %s => %+v", err, slack, s)
}
if errors.Is(err, ErrIrrelevantMessage) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %v", err)
} }
for pattern, ptr := range map[string]*string{ for pattern, ptr := range map[string]*string{
@@ -67,16 +73,18 @@ func newSlackToModelProcess(cfg Config) processFunc {
event := model.Event{} event := model.Event{}
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" { if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved) event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, s.Team, s.Resolved)
} }
message := model.Message{} message := model.Message{}
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" { if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread) message = model.NewMessage(s.ID, s.TS, s.Author, s.Plaintext, s.Thread)
} }
thread := model.Thread{} thread := model.Thread{}
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" { if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event) thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
} }
log.Printf("parsed slack message into models")
return json.Marshal(Models{ return json.Marshal(Models{
Event: event, Event: event,
Message: message, Message: message,
@@ -98,6 +106,8 @@ type (
Asset string Asset string
Resolved bool Resolved bool
Datacenter string Datacenter string
Author string
Team string
} }
slackMessage struct { slackMessage struct {
@@ -118,6 +128,7 @@ type (
ParentID string `json:"thread_ts"` ParentID string `json:"thread_ts"`
Text string Text string
Blocks []slackBlock Blocks []slackBlock
User string
// bot // bot
Bot slackBot `json:"bot_profile"` Bot slackBot `json:"bot_profile"`
Attachments []slackAttachment Attachments []slackAttachment
@@ -171,9 +182,13 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
return parsedSlackMessage{}, ErrIrrelevantMessage return parsedSlackMessage{}, ErrIrrelevantMessage
} }
var tagsField string var tagsField string
var teamField string
for _, field := range s.Event.Attachments[0].Fields { for _, field := range s.Event.Attachments[0].Fields {
if field.Title == "Tags" { switch field.Title {
case "Tags":
tagsField = field.Value tagsField = field.Value
case "Routed Teams":
teamField = field.Value
} }
} }
return parsedSlackMessage{ return parsedSlackMessage{
@@ -188,6 +203,8 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
Asset: s.Event.Attachments[0].Text, Asset: s.Event.Attachments[0].Text,
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"), Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
Datacenter: tagsField, Datacenter: tagsField,
Author: s.Event.Bot.Name,
Team: teamField,
}, nil }, nil
} }
@@ -205,6 +222,7 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
Plaintext: s.Event.Text, Plaintext: s.Event.Text,
Asset: "", Asset: "",
Datacenter: "", Datacenter: "",
Author: s.Event.User,
}, nil }, nil
} }

View File

@@ -40,14 +40,13 @@ func TestSlackToModelPipeline(t *testing.T) {
"Alertconfig Workflow Failed", "Alertconfig Workflow Failed",
"", "",
"", "",
"TODO", "Datastores Non-Critical",
false, false,
), ),
Message: model.NewMessage( Message: model.NewMessage(
"1712927439.728409/1712927439", "1712927439.728409/1712927439",
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
1712927439, 1712927439,
"TODO", "Opsgenie for Alert Management",
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
"1712927439.728409", "1712927439.728409",
), ),
@@ -129,6 +128,7 @@ func TestParseSlackTestdata(t *testing.T) {
Event: "", Event: "",
Plaintext: "I gotta do this", Plaintext: "I gotta do this",
Asset: "", Asset: "",
Author: "U06868T6ADV",
}, },
}, },
"opsgenie_alert.json": { "opsgenie_alert.json": {
@@ -164,6 +164,8 @@ func TestParseSlackTestdata(t *testing.T) {
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical",
}, },
}, },
"opsgenie_alert_resolved.json": { "opsgenie_alert_resolved.json": {
@@ -200,6 +202,8 @@ func TestParseSlackTestdata(t *testing.T) {
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true, Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical",
}, },
}, },
"reingested_alert.json": { "reingested_alert.json": {
@@ -215,6 +219,8 @@ func TestParseSlackTestdata(t *testing.T) {
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>", Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&amp;viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
Resolved: true, Resolved: true,
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
Author: "Opsgenie for Alert Management",
Team: "Datastores Non-Critical",
}, },
}, },
} }

View File

@@ -43,7 +43,7 @@ func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) { func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
v := model.Event{} v := model.Event{}
err := s.selectOne(ctx, "events", &v, "ID = ?", ID) err := s.selectOne(ctx, "events", &v, "ID = $1", ID)
return v, err return v, err
} }
@@ -53,7 +53,7 @@ func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) { func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
v := model.Message{} v := model.Message{}
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID) err := s.selectOne(ctx, "messages", &v, "ID = $1", ID)
return v, err return v, err
} }
@@ -63,7 +63,7 @@ func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) { func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
v := model.Thread{} v := model.Thread{}
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID) err := s.selectOne(ctx, "threads", &v, "ID = $1", ID)
return v, err return v, err
} }
@@ -72,7 +72,7 @@ func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
} }
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error { func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
if questions := strings.Count(clause, "?"); questions != len(args) { if questions := strings.Count(clause, "$"); questions != len(args) {
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args)) return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
} }
@@ -115,10 +115,6 @@ func (s Storage) upsert(ctx context.Context, table string, v any) error {
return err return err
} }
for i := range keys {
values = append(values, values[i])
}
q := fmt.Sprintf(` q := fmt.Sprintf(`
INSERT INTO %s (%s) VALUES (%s) INSERT INTO %s (%s) VALUES (%s)
ON CONFLICT (ID) DO UPDATE SET %s ON CONFLICT (ID) DO UPDATE SET %s
@@ -144,11 +140,11 @@ func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
} }
args := make([]string, len(keys)) args := make([]string, len(keys))
for i := range args { for i := range args {
args[i] = "?" args[i] = fmt.Sprintf("$%d", i+1)
} }
keyArgs := make([]string, len(keys)) keyArgs := make([]string, len(keys))
for i := range keyArgs { for i := range keyArgs {
keyArgs[i] = fmt.Sprintf("%s=?", keys[i]) keyArgs[i] = fmt.Sprintf("%s=$%d", keys[i], i+1)
} }
values := make([]any, len(keys)) values := make([]any, len(keys))
for i := range values { for i := range values {

View File

@@ -67,7 +67,6 @@ func TestStorage(t *testing.T) {
t.Run("upsert get message", func(t *testing.T) { t.Run("upsert get message", func(t *testing.T) {
m := model.NewMessage( m := model.NewMessage(
"ID", "ID",
"URL",
1, 1,
"Author", "Author",
"Plaintext", "Plaintext",