Compare commits
15 Commits
709f2ac254
...
d9d91193dd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9d91193dd | ||
|
|
2033cefc2a | ||
|
|
600a2e0111 | ||
|
|
4f3b8ec866 | ||
|
|
c4a7eaf04a | ||
|
|
5557c0920a | ||
|
|
c7f5cdb040 | ||
|
|
a8e8fdc451 | ||
|
|
d88a8bb23a | ||
|
|
39c0056190 | ||
|
|
d87af2fadc | ||
|
|
9bc47bfde6 | ||
|
|
098986eb07 | ||
|
|
5bc068451f | ||
|
|
5fa21d0cd9 |
13
README.md
13
README.md
@@ -4,18 +4,15 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
|
||||
|
||||
## TODO
|
||||
|
||||
- to class design for joins and external tables
|
||||
- dedupe multi channels
|
||||
- add Team to Message
|
||||
- from sync ingest to ingestQueue that retries a few times so user messages can find parent
|
||||
- limit queue retries
|
||||
- share postgres with Grafana
|
||||
- new dash in Grafana
|
||||
- dedupe critical+noncritical
|
||||
- what SLO/SLI can I help benoit with
|
||||
- break into smaller goals
|
||||
- sell to the team
|
||||
- scott; like to keep state in incident.io and zendesk
|
||||
|
||||
- @spoc -ignore, @spoc -s summary
|
||||
- limit rps from rpc/slackscrape
|
||||
- rpc/slackscrape to async
|
||||
- limit queue retries
|
||||
|
||||
```
|
||||
erDiagram
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
@@ -106,8 +107,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
ctx, can := context.WithTimeout(ctx, time.Second*10)
|
||||
ctx, can := context.WithTimeout(ctx, time.Minute)
|
||||
defer can()
|
||||
|
||||
driver, err := NewDriver(ctx, result.DriverConn)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
@@ -118,6 +120,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
} else {
|
||||
return Config{}, errors.New("not impl")
|
||||
}
|
||||
if result.Debug {
|
||||
log.Printf("connected to driver at %s (%s @%s)", result.DriverConn, result.driver.engine, result.driver.conn)
|
||||
}
|
||||
|
||||
storage, err := NewStorage(ctx, result.driver)
|
||||
if err != nil {
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
)
|
||||
|
||||
type Driver struct {
|
||||
engine string
|
||||
conn string
|
||||
*sql.DB
|
||||
}
|
||||
|
||||
@@ -47,7 +49,7 @@ func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||
return Driver{}, err
|
||||
}
|
||||
|
||||
driver := Driver{DB: db}
|
||||
driver := Driver{DB: db, conn: conn, engine: engine}
|
||||
if err := driver.setup(ctx); err != nil {
|
||||
driver.Close()
|
||||
return Driver{}, fmt.Errorf("failed setup: %w", err)
|
||||
|
||||
53
driver_integration_test.go
Normal file
53
driver_integration_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
//go:build integration
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
func TestDriverIntegration(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*30)
|
||||
defer can()
|
||||
|
||||
driver, err := NewDriver(ctx, os.Getenv("DRIVER_CONN"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer driver.Close()
|
||||
|
||||
q, err := NewQueue(ctx, t.Name(), driver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qV := []byte("hello")
|
||||
if err := q.Enqueue(ctx, qV); err != nil {
|
||||
t.Error("q cannot enqueue:", err)
|
||||
} else if reservation, v, err := q.Syn(ctx); err != nil {
|
||||
t.Error("q cannot syn:", err)
|
||||
} else if string(v) != string(qV) {
|
||||
t.Error("q enqueued wrong:", string(v))
|
||||
} else if len(reservation) == 0 {
|
||||
t.Error("q didnt have reservation")
|
||||
} else if err := q.Ack(ctx, reservation); err != nil {
|
||||
t.Error("q cannot ack:", err)
|
||||
}
|
||||
|
||||
s, err := NewStorage(ctx, driver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
evt := model.Event{ID: "x", Name: "y"}
|
||||
if err := s.UpsertEvent(ctx, evt); err != nil {
|
||||
t.Error("s cannot upsert:", err)
|
||||
} else if e, err := s.GetEvent(ctx, evt.ID); err != nil {
|
||||
t.Error("s cannot get:", err)
|
||||
} else if e != evt {
|
||||
t.Error("s upserted wrong:", e)
|
||||
}
|
||||
}
|
||||
160
main.go
160
main.go
@@ -48,7 +48,6 @@ func run(ctx context.Context, cfg Config) error {
|
||||
|
||||
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
||||
ctx, can := context.WithCancel(ctx)
|
||||
defer can()
|
||||
|
||||
pipelines = append(pipelines, first)
|
||||
errs := make(chan error)
|
||||
@@ -86,11 +85,6 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
||||
func newHandler(cfg Config) http.HandlerFunc {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
||||
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
||||
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
||||
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||
|
||||
@@ -113,97 +107,81 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||
channel := r.Header.Get("slack-channel")
|
||||
token := r.Header.Get("slack-oauth-token")
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
httpc := http.Client{Timeout: time.Second}
|
||||
get := func(url string) ([]byte, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req = req.WithContext(r.Context())
|
||||
|
||||
var page struct {
|
||||
OK bool
|
||||
Messages []json.RawMessage
|
||||
resp, err := httpc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
} else if !page.OK {
|
||||
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||
errs = append(errs, err)
|
||||
|
||||
n := 0
|
||||
|
||||
for len(urls) > 0 {
|
||||
url := urls[0]
|
||||
urls = urls[1:]
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
body, err := get(url)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
var page struct {
|
||||
Messages []json.RawMessage
|
||||
}
|
||||
if err := json.Unmarshal(body, &page); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
if cfg.Debug {
|
||||
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
||||
}
|
||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
n += 1
|
||||
}
|
||||
if !strings.Contains(url, "ts=") {
|
||||
var peek struct {
|
||||
ThreadTS string `json:"thread_ts"`
|
||||
}
|
||||
json.Unmarshal(messageJSON, &peek)
|
||||
if peek.ThreadTS != "" {
|
||||
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
||||
_ = thread
|
||||
|
||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
166
main_test.go
166
main_test.go
@@ -3,8 +3,6 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -45,8 +43,11 @@ func TestRun(t *testing.T) {
|
||||
cfg.EventNamePattern = renderEventNamePattern
|
||||
cfg.Port = port
|
||||
cfg.driver = NewTestDriver(t)
|
||||
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
||||
cfg.SlackToken = "redacted"
|
||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
||||
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
||||
|
||||
go func() {
|
||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||
@@ -82,165 +83,4 @@ func TestRun(t *testing.T) {
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/messages", func(t *testing.T) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
var result struct {
|
||||
Messages []any
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(result.Messages) != 1 {
|
||||
t.Fatal(result.Messages)
|
||||
} else {
|
||||
t.Logf("%+v", result)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/eventnames", func(t *testing.T) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
var result struct {
|
||||
EventNames []string
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if result.EventNames[0] != "Wal Receive Count Alert" {
|
||||
t.Fatal(result.EventNames)
|
||||
} else {
|
||||
t.Logf("%+v", result)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/events", func(t *testing.T) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
var result struct {
|
||||
Events []string
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if result.Events[0] != "11067" {
|
||||
t.Fatal(result.Events)
|
||||
} else {
|
||||
t.Logf("%+v", result)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/threads", func(t *testing.T) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
var result struct {
|
||||
Threads []string
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if result.Threads[0] != "1712911957.023359" {
|
||||
t.Fatal(result.Threads)
|
||||
} else {
|
||||
t.Logf("%+v", result)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Thread []any
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(result.Thread) != 1 {
|
||||
t.Fatal(result.Thread)
|
||||
} else {
|
||||
t.Logf("%+v", result)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
|
||||
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Set("Accept", "text/csv")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||
}
|
||||
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
t.Logf("whole csv: \n%s", b)
|
||||
|
||||
dec := csv.NewReader(bytes.NewReader(b))
|
||||
var lastLine []string
|
||||
for {
|
||||
line, err := dec.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
t.Error("unexpected error while reading csv line:", err)
|
||||
}
|
||||
|
||||
if lastLine == nil {
|
||||
} else if len(lastLine) != len(line) {
|
||||
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
|
||||
}
|
||||
|
||||
t.Logf("CSV line: %+v", line)
|
||||
lastLine = line
|
||||
}
|
||||
if lastLine == nil {
|
||||
t.Error("no lines found")
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,18 +4,16 @@ package model
|
||||
type Message struct {
|
||||
Updated uint64
|
||||
ID string
|
||||
URL string
|
||||
TS uint64
|
||||
Author string
|
||||
Plaintext string
|
||||
ThreadID string
|
||||
}
|
||||
|
||||
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
||||
func NewMessage(ID string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
||||
return Message{
|
||||
Updated: updated(),
|
||||
ID: ID,
|
||||
URL: URL,
|
||||
TS: TS,
|
||||
Author: Author,
|
||||
Plaintext: Plaintext,
|
||||
|
||||
@@ -2,15 +2,23 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
type ModelToPersistence struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
type ModelIDs struct {
|
||||
Event string
|
||||
Message string
|
||||
Thread string
|
||||
}
|
||||
|
||||
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
reader, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
@@ -18,15 +26,41 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer = NewNoopQueue()
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newModelToPersistenceProcess(cfg.driver),
|
||||
process: newModelToPersistenceProcess(cfg.storage),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newModelToPersistenceProcess(driver Driver) processFunc {
|
||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||
return nil, errors.New("not impl")
|
||||
func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||
var m Models
|
||||
if err := json.Unmarshal(models, &m); err != nil {
|
||||
return nil, fmt.Errorf("received non models payload: %w", err)
|
||||
}
|
||||
|
||||
if m.Event.Empty() {
|
||||
} else if err := storage.UpsertEvent(ctx, m.Event); err != nil {
|
||||
return nil, fmt.Errorf("failed to persist event: %w", err)
|
||||
}
|
||||
|
||||
if m.Thread.Empty() {
|
||||
} else if err := storage.UpsertThread(ctx, m.Thread); err != nil {
|
||||
return nil, fmt.Errorf("failed to persist thread: %w", err)
|
||||
}
|
||||
|
||||
if m.Message.Empty() {
|
||||
} else if err := storage.UpsertMessage(ctx, m.Message); err != nil {
|
||||
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("persisted models")
|
||||
return json.Marshal(ModelIDs{
|
||||
Event: m.Event.ID,
|
||||
Thread: m.Thread.ID,
|
||||
Message: m.Message.ID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,11 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
func TestModelToPersistenceProcessor(t *testing.T) {
|
||||
@@ -12,7 +15,49 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
process := newModelToPersistenceProcess(d)
|
||||
s, _ := NewStorage(ctx, d)
|
||||
process := newModelToPersistenceProcess(s)
|
||||
|
||||
_, _ = ctx, process
|
||||
|
||||
inputModels := Models{
|
||||
Event: model.Event{ID: "event", Asset: "event-asset"},
|
||||
//Thread: {ID: "thread", Channel: "thread-channel"},
|
||||
Message: model.Message{ID: "message", Plaintext: "message-plaintext"},
|
||||
}
|
||||
input, _ := json.Marshal(inputModels)
|
||||
|
||||
var outputModelIDs ModelIDs
|
||||
var n int
|
||||
if output, err := process(ctx, input); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := json.Unmarshal(output, &outputModelIDs); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if outputModelIDs != (ModelIDs{Event: "event", Message: "message"}) {
|
||||
t.Error(outputModelIDs)
|
||||
}
|
||||
|
||||
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`); row.Err() != nil {
|
||||
t.Error("cant count events:", row.Err())
|
||||
} else if err := row.Scan(&n); err != nil {
|
||||
t.Error("cant count events:", err)
|
||||
} else if n != 1 {
|
||||
t.Error("bad event count:", n)
|
||||
}
|
||||
|
||||
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM threads`); row.Err() != nil {
|
||||
t.Error("cant count threads:", row.Err())
|
||||
} else if err := row.Scan(&n); err != nil {
|
||||
t.Error("cant count threads:", err)
|
||||
} else if n != 0 {
|
||||
t.Error("bad thread count:", n)
|
||||
}
|
||||
|
||||
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM messages`); row.Err() != nil {
|
||||
t.Error("cant count messages:", row.Err())
|
||||
} else if err := row.Scan(&n); err != nil {
|
||||
t.Error("cant count messages:", err)
|
||||
} else if n != 1 {
|
||||
t.Error("bad message count:", n)
|
||||
}
|
||||
}
|
||||
|
||||
15
pipeline.go
15
pipeline.go
@@ -1,6 +1,9 @@
|
||||
package main
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
)
|
||||
|
||||
type (
|
||||
Pipeline struct {
|
||||
@@ -22,7 +25,14 @@ func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
|
||||
func (p Pipeline) Process(ctx context.Context) error {
|
||||
ctx, can := context.WithCancel(ctx)
|
||||
defer can()
|
||||
err := p.processUntilErr(ctx)
|
||||
if err != nil {
|
||||
log.Printf("pipeline failed to process: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p Pipeline) processUntilErr(ctx context.Context) error {
|
||||
for ctx.Err() == nil {
|
||||
reservation, read, err := p.reader.Syn(ctx)
|
||||
if err != nil {
|
||||
@@ -32,7 +42,8 @@ func (p Pipeline) Process(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.writer.Enqueue(ctx, processed); err != nil {
|
||||
if processed == nil {
|
||||
} else if err := p.writer.Enqueue(ctx, processed); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.reader.Ack(ctx, reservation); err != nil {
|
||||
|
||||
@@ -6,6 +6,47 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPipelineDoesntPushEmptyMessage(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
output, _ := NewQueue(ctx, "output", NewTestDriver(t))
|
||||
input, _ := NewQueue(ctx, "input", NewTestDriver(t))
|
||||
|
||||
calls := 0
|
||||
process := func(_ context.Context, v []byte) ([]byte, error) {
|
||||
calls += 1
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
ing := NewPipeline(output, input, process)
|
||||
go func() {
|
||||
defer can()
|
||||
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
for ctx.Err() == nil {
|
||||
if calls != 0 {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
}
|
||||
}
|
||||
|
||||
if r, _, _ := output.syn(ctx); len(r) != 0 {
|
||||
t.Error("something was pushed to out queue even though processor didnt emit content")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipeline(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
|
||||
29
queue.go
29
queue.go
@@ -20,7 +20,7 @@ func NewNoopQueue() Queue {
|
||||
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||
if _, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
id INTEGER PRIMARY KEY,
|
||||
id TEXT PRIMARY KEY,
|
||||
topic TEXT NOT NULL,
|
||||
updated INTEGER NOT NULL,
|
||||
reservation TEXT,
|
||||
@@ -37,11 +37,12 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
||||
return nil
|
||||
}
|
||||
_, err := q.driver.ExecContext(ctx, `
|
||||
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
||||
`,
|
||||
q.topic,
|
||||
time.Now().Unix(),
|
||||
b,
|
||||
uuid.New().String(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
@@ -59,7 +60,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", nil, ctx.Err()
|
||||
case <-time.After(time.Second):
|
||||
case <-time.After(time.Millisecond * 500):
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,16 +72,16 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||
if result, err := q.driver.ExecContext(ctx, `
|
||||
UPDATE queue
|
||||
SET
|
||||
updated = ?, reservation = ?
|
||||
updated = $1, reservation = $2
|
||||
WHERE
|
||||
id IN (
|
||||
SELECT id
|
||||
FROM queue
|
||||
WHERE
|
||||
topic == ?
|
||||
topic = $3
|
||||
AND (
|
||||
reservation IS NULL
|
||||
OR ? - updated > 60
|
||||
OR $4 - updated > 60
|
||||
)
|
||||
LIMIT 1
|
||||
)
|
||||
@@ -95,7 +96,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||
row := q.driver.QueryRowContext(ctx, `
|
||||
SELECT payload
|
||||
FROM queue
|
||||
WHERE reservation==?
|
||||
WHERE reservation=$1
|
||||
LIMIT 1
|
||||
`, reservation)
|
||||
if err := row.Err(); err != nil {
|
||||
@@ -108,12 +109,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||
}
|
||||
|
||||
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
||||
return q.ack(ctx, []byte(reservation))
|
||||
}
|
||||
|
||||
func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
||||
if q.driver.DB == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := q.driver.ExecContext(ctx, `
|
||||
result, err := q.driver.ExecContext(ctx, `
|
||||
DELETE FROM queue
|
||||
WHERE reservation==?
|
||||
WHERE reservation=$1
|
||||
`, reservation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n, _ := result.RowsAffected(); n != 1 {
|
||||
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
30
slack.go
30
slack.go
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -32,7 +33,7 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
writer, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
@@ -46,8 +47,13 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
|
||||
func newSlackToModelProcess(cfg Config) processFunc {
|
||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||
s, err := parseSlack(slack)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
||||
if cfg.Debug {
|
||||
log.Printf("%v: %s => %+v", err, slack, s)
|
||||
}
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize slack %v", err)
|
||||
}
|
||||
|
||||
for pattern, ptr := range map[string]*string{
|
||||
@@ -67,16 +73,18 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
||||
|
||||
event := model.Event{}
|
||||
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
|
||||
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
|
||||
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, s.Team, s.Resolved)
|
||||
}
|
||||
message := model.Message{}
|
||||
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
|
||||
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
|
||||
message = model.NewMessage(s.ID, s.TS, s.Author, s.Plaintext, s.Thread)
|
||||
}
|
||||
thread := model.Thread{}
|
||||
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||
}
|
||||
|
||||
log.Printf("parsed slack message into models")
|
||||
return json.Marshal(Models{
|
||||
Event: event,
|
||||
Message: message,
|
||||
@@ -98,6 +106,8 @@ type (
|
||||
Asset string
|
||||
Resolved bool
|
||||
Datacenter string
|
||||
Author string
|
||||
Team string
|
||||
}
|
||||
|
||||
slackMessage struct {
|
||||
@@ -118,6 +128,7 @@ type (
|
||||
ParentID string `json:"thread_ts"`
|
||||
Text string
|
||||
Blocks []slackBlock
|
||||
User string
|
||||
// bot
|
||||
Bot slackBot `json:"bot_profile"`
|
||||
Attachments []slackAttachment
|
||||
@@ -171,9 +182,13 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
||||
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||
}
|
||||
var tagsField string
|
||||
var teamField string
|
||||
for _, field := range s.Event.Attachments[0].Fields {
|
||||
if field.Title == "Tags" {
|
||||
switch field.Title {
|
||||
case "Tags":
|
||||
tagsField = field.Value
|
||||
case "Routed Teams":
|
||||
teamField = field.Value
|
||||
}
|
||||
}
|
||||
return parsedSlackMessage{
|
||||
@@ -188,6 +203,8 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
||||
Asset: s.Event.Attachments[0].Text,
|
||||
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||
Datacenter: tagsField,
|
||||
Author: s.Event.Bot.Name,
|
||||
Team: teamField,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -205,6 +222,7 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
||||
Plaintext: s.Event.Text,
|
||||
Asset: "",
|
||||
Datacenter: "",
|
||||
Author: s.Event.User,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -40,14 +40,13 @@ func TestSlackToModelPipeline(t *testing.T) {
|
||||
"Alertconfig Workflow Failed",
|
||||
"",
|
||||
"",
|
||||
"TODO",
|
||||
"Datastores Non-Critical",
|
||||
false,
|
||||
),
|
||||
Message: model.NewMessage(
|
||||
"1712927439.728409/1712927439",
|
||||
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
||||
1712927439,
|
||||
"TODO",
|
||||
"Opsgenie for Alert Management",
|
||||
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
"1712927439.728409",
|
||||
),
|
||||
@@ -129,6 +128,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Event: "",
|
||||
Plaintext: "I gotta do this",
|
||||
Asset: "",
|
||||
Author: "U06868T6ADV",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert.json": {
|
||||
@@ -164,6 +164,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
Author: "Opsgenie for Alert Management",
|
||||
Team: "Datastores Non-Critical",
|
||||
},
|
||||
},
|
||||
"opsgenie_alert_resolved.json": {
|
||||
@@ -200,6 +202,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Resolved: true,
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
Author: "Opsgenie for Alert Management",
|
||||
Team: "Datastores Non-Critical",
|
||||
},
|
||||
},
|
||||
"reingested_alert.json": {
|
||||
@@ -215,6 +219,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||
Resolved: true,
|
||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||
Author: "Opsgenie for Alert Management",
|
||||
Team: "Datastores Non-Critical",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
16
storage.go
16
storage.go
@@ -43,7 +43,7 @@ func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
|
||||
|
||||
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
|
||||
v := model.Event{}
|
||||
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
|
||||
err := s.selectOne(ctx, "events", &v, "ID = $1", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
|
||||
|
||||
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
|
||||
v := model.Message{}
|
||||
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
|
||||
err := s.selectOne(ctx, "messages", &v, "ID = $1", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error
|
||||
|
||||
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
|
||||
v := model.Thread{}
|
||||
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
|
||||
err := s.selectOne(ctx, "threads", &v, "ID = $1", ID)
|
||||
return v, err
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
||||
}
|
||||
|
||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||
if questions := strings.Count(clause, "?"); questions != len(args) {
|
||||
if questions := strings.Count(clause, "$"); questions != len(args) {
|
||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||
}
|
||||
|
||||
@@ -115,10 +115,6 @@ func (s Storage) upsert(ctx context.Context, table string, v any) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range keys {
|
||||
values = append(values, values[i])
|
||||
}
|
||||
|
||||
q := fmt.Sprintf(`
|
||||
INSERT INTO %s (%s) VALUES (%s)
|
||||
ON CONFLICT (ID) DO UPDATE SET %s
|
||||
@@ -144,11 +140,11 @@ func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
|
||||
}
|
||||
args := make([]string, len(keys))
|
||||
for i := range args {
|
||||
args[i] = "?"
|
||||
args[i] = fmt.Sprintf("$%d", i+1)
|
||||
}
|
||||
keyArgs := make([]string, len(keys))
|
||||
for i := range keyArgs {
|
||||
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
|
||||
keyArgs[i] = fmt.Sprintf("%s=$%d", keys[i], i+1)
|
||||
}
|
||||
values := make([]any, len(keys))
|
||||
for i := range values {
|
||||
|
||||
@@ -67,7 +67,6 @@ func TestStorage(t *testing.T) {
|
||||
t.Run("upsert get message", func(t *testing.T) {
|
||||
m := model.NewMessage(
|
||||
"ID",
|
||||
"URL",
|
||||
1,
|
||||
"Author",
|
||||
"Plaintext",
|
||||
|
||||
Reference in New Issue
Block a user