Compare commits
15 Commits
709f2ac254
...
d9d91193dd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9d91193dd | ||
|
|
2033cefc2a | ||
|
|
600a2e0111 | ||
|
|
4f3b8ec866 | ||
|
|
c4a7eaf04a | ||
|
|
5557c0920a | ||
|
|
c7f5cdb040 | ||
|
|
a8e8fdc451 | ||
|
|
d88a8bb23a | ||
|
|
39c0056190 | ||
|
|
d87af2fadc | ||
|
|
9bc47bfde6 | ||
|
|
098986eb07 | ||
|
|
5bc068451f | ||
|
|
5fa21d0cd9 |
13
README.md
13
README.md
@@ -4,18 +4,15 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
|
|||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- to class design for joins and external tables
|
- dedupe critical+noncritical
|
||||||
- dedupe multi channels
|
|
||||||
- add Team to Message
|
|
||||||
- from sync ingest to ingestQueue that retries a few times so user messages can find parent
|
|
||||||
- limit queue retries
|
|
||||||
- share postgres with Grafana
|
|
||||||
- new dash in Grafana
|
|
||||||
- what SLO/SLI can I help benoit with
|
- what SLO/SLI can I help benoit with
|
||||||
- break into smaller goals
|
- break into smaller goals
|
||||||
- sell to the team
|
- sell to the team
|
||||||
- scott; like to keep state in incident.io and zendesk
|
- scott; like to keep state in incident.io and zendesk
|
||||||
|
- @spoc -ignore, @spoc -s summary
|
||||||
|
- limit rps from rpc/slackscrape
|
||||||
|
- rpc/slackscrape to async
|
||||||
|
- limit queue retries
|
||||||
|
|
||||||
```
|
```
|
||||||
erDiagram
|
erDiagram
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -106,8 +107,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, can := context.WithTimeout(ctx, time.Second*10)
|
ctx, can := context.WithTimeout(ctx, time.Minute)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, err := NewDriver(ctx, result.DriverConn)
|
driver, err := NewDriver(ctx, result.DriverConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
@@ -118,6 +120,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
} else {
|
} else {
|
||||||
return Config{}, errors.New("not impl")
|
return Config{}, errors.New("not impl")
|
||||||
}
|
}
|
||||||
|
if result.Debug {
|
||||||
|
log.Printf("connected to driver at %s (%s @%s)", result.DriverConn, result.driver.engine, result.driver.conn)
|
||||||
|
}
|
||||||
|
|
||||||
storage, err := NewStorage(ctx, result.driver)
|
storage, err := NewStorage(ctx, result.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
|
engine string
|
||||||
|
conn string
|
||||||
*sql.DB
|
*sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,7 +49,7 @@ func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
|||||||
return Driver{}, err
|
return Driver{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
driver := Driver{DB: db}
|
driver := Driver{DB: db, conn: conn, engine: engine}
|
||||||
if err := driver.setup(ctx); err != nil {
|
if err := driver.setup(ctx); err != nil {
|
||||||
driver.Close()
|
driver.Close()
|
||||||
return Driver{}, fmt.Errorf("failed setup: %w", err)
|
return Driver{}, fmt.Errorf("failed setup: %w", err)
|
||||||
|
|||||||
53
driver_integration_test.go
Normal file
53
driver_integration_test.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDriverIntegration(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*30)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
driver, err := NewDriver(ctx, os.Getenv("DRIVER_CONN"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer driver.Close()
|
||||||
|
|
||||||
|
q, err := NewQueue(ctx, t.Name(), driver)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
qV := []byte("hello")
|
||||||
|
if err := q.Enqueue(ctx, qV); err != nil {
|
||||||
|
t.Error("q cannot enqueue:", err)
|
||||||
|
} else if reservation, v, err := q.Syn(ctx); err != nil {
|
||||||
|
t.Error("q cannot syn:", err)
|
||||||
|
} else if string(v) != string(qV) {
|
||||||
|
t.Error("q enqueued wrong:", string(v))
|
||||||
|
} else if len(reservation) == 0 {
|
||||||
|
t.Error("q didnt have reservation")
|
||||||
|
} else if err := q.Ack(ctx, reservation); err != nil {
|
||||||
|
t.Error("q cannot ack:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := NewStorage(ctx, driver)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
evt := model.Event{ID: "x", Name: "y"}
|
||||||
|
if err := s.UpsertEvent(ctx, evt); err != nil {
|
||||||
|
t.Error("s cannot upsert:", err)
|
||||||
|
} else if e, err := s.GetEvent(ctx, evt.ID); err != nil {
|
||||||
|
t.Error("s cannot get:", err)
|
||||||
|
} else if e != evt {
|
||||||
|
t.Error("s upserted wrong:", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
160
main.go
160
main.go
@@ -48,7 +48,6 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
|
|
||||||
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
||||||
ctx, can := context.WithCancel(ctx)
|
ctx, can := context.WithCancel(ctx)
|
||||||
defer can()
|
|
||||||
|
|
||||||
pipelines = append(pipelines, first)
|
pipelines = append(pipelines, first)
|
||||||
errs := make(chan error)
|
errs := make(chan error)
|
||||||
@@ -86,11 +85,6 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|||||||
func newHandler(cfg Config) http.HandlerFunc {
|
func newHandler(cfg Config) http.HandlerFunc {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
|
||||||
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
|
||||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
|
||||||
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
|
||||||
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||||
|
|
||||||
@@ -113,97 +107,81 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
|||||||
channel := r.Header.Get("slack-channel")
|
channel := r.Header.Get("slack-channel")
|
||||||
token := r.Header.Get("slack-oauth-token")
|
token := r.Header.Get("slack-oauth-token")
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
|
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
req.Header.Set("Authorization", "Bearer "+token)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
httpc := http.Client{Timeout: time.Second}
|
||||||
if err != nil {
|
get := func(url string) ([]byte, error) {
|
||||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
return
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
defer resp.Body.Close()
|
}
|
||||||
defer io.Copy(io.Discard, resp.Body)
|
req.Header.Set("Authorization", "Bearer "+token)
|
||||||
|
req = req.WithContext(r.Context())
|
||||||
|
|
||||||
var page struct {
|
resp, err := httpc.Do(req)
|
||||||
OK bool
|
if err != nil {
|
||||||
Messages []json.RawMessage
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
defer io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
||||||
|
}
|
||||||
|
return io.ReadAll(resp.Body)
|
||||||
}
|
}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
n := 0
|
||||||
return
|
|
||||||
} else if !page.OK {
|
for len(urls) > 0 {
|
||||||
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
|
url := urls[0]
|
||||||
return
|
urls = urls[1:]
|
||||||
}
|
select {
|
||||||
errs := []error{}
|
case <-r.Context().Done():
|
||||||
for _, messageJSON := range page.Messages {
|
case <-time.After(time.Second):
|
||||||
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
}
|
||||||
errs = append(errs, err)
|
body, err := get(url)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var page struct {
|
||||||
|
Messages []json.RawMessage
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &page); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
errs := []error{}
|
||||||
|
for _, messageJSON := range page.Messages {
|
||||||
|
if cfg.Debug {
|
||||||
|
log.Printf("rpc/scrapeslack => %s", messageJSON)
|
||||||
|
}
|
||||||
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
} else {
|
||||||
|
n += 1
|
||||||
|
}
|
||||||
|
if !strings.Contains(url, "ts=") {
|
||||||
|
var peek struct {
|
||||||
|
ThreadTS string `json:"thread_ts"`
|
||||||
|
}
|
||||||
|
json.Unmarshal(messageJSON, &peek)
|
||||||
|
if peek.ThreadTS != "" {
|
||||||
|
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) > 0 {
|
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
|
||||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !basicAuth(cfg, w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !basicAuth(cfg, w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !basicAuth(cfg, w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !basicAuth(cfg, w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !basicAuth(cfg, w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
|
||||||
_ = thread
|
|
||||||
|
|
||||||
http.Error(w, "not impl", http.StatusNotImplemented)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
166
main_test.go
166
main_test.go
@@ -3,8 +3,6 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/csv"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -45,8 +43,11 @@ func TestRun(t *testing.T) {
|
|||||||
cfg.EventNamePattern = renderEventNamePattern
|
cfg.EventNamePattern = renderEventNamePattern
|
||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver = NewTestDriver(t)
|
cfg.driver = NewTestDriver(t)
|
||||||
|
cfg.storage, _ = NewStorage(ctx, cfg.driver)
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
|
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
|
||||||
|
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||||
@@ -82,165 +83,4 @@ func TestRun(t *testing.T) {
|
|||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("GET /api/v1/messages", func(t *testing.T) {
|
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
var result struct {
|
|
||||||
Messages []any
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if len(result.Messages) != 1 {
|
|
||||||
t.Fatal(result.Messages)
|
|
||||||
} else {
|
|
||||||
t.Logf("%+v", result)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("GET /api/v1/eventnames", func(t *testing.T) {
|
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
var result struct {
|
|
||||||
EventNames []string
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if result.EventNames[0] != "Wal Receive Count Alert" {
|
|
||||||
t.Fatal(result.EventNames)
|
|
||||||
} else {
|
|
||||||
t.Logf("%+v", result)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("GET /api/v1/events", func(t *testing.T) {
|
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
var result struct {
|
|
||||||
Events []string
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if result.Events[0] != "11067" {
|
|
||||||
t.Fatal(result.Events)
|
|
||||||
} else {
|
|
||||||
t.Logf("%+v", result)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("GET /api/v1/threads", func(t *testing.T) {
|
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
var result struct {
|
|
||||||
Threads []string
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if result.Threads[0] != "1712911957.023359" {
|
|
||||||
t.Fatal(result.Threads)
|
|
||||||
} else {
|
|
||||||
t.Logf("%+v", result)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
|
|
||||||
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
var result struct {
|
|
||||||
Thread []any
|
|
||||||
}
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if len(result.Thread) != 1 {
|
|
||||||
t.Fatal(result.Thread)
|
|
||||||
} else {
|
|
||||||
t.Logf("%+v", result)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
|
|
||||||
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
req.Header.Set("Accept", "text/csv")
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Fatalf("(%d) %s", resp.StatusCode, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
t.Logf("whole csv: \n%s", b)
|
|
||||||
|
|
||||||
dec := csv.NewReader(bytes.NewReader(b))
|
|
||||||
var lastLine []string
|
|
||||||
for {
|
|
||||||
line, err := dec.Read()
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
t.Error("unexpected error while reading csv line:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastLine == nil {
|
|
||||||
} else if len(lastLine) != len(line) {
|
|
||||||
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("CSV line: %+v", line)
|
|
||||||
lastLine = line
|
|
||||||
}
|
|
||||||
if lastLine == nil {
|
|
||||||
t.Error("no lines found")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,18 +4,16 @@ package model
|
|||||||
type Message struct {
|
type Message struct {
|
||||||
Updated uint64
|
Updated uint64
|
||||||
ID string
|
ID string
|
||||||
URL string
|
|
||||||
TS uint64
|
TS uint64
|
||||||
Author string
|
Author string
|
||||||
Plaintext string
|
Plaintext string
|
||||||
ThreadID string
|
ThreadID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessage(ID, URL string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
func NewMessage(ID string, TS uint64, Author, Plaintext string, ThreadID string) Message {
|
||||||
return Message{
|
return Message{
|
||||||
Updated: updated(),
|
Updated: updated(),
|
||||||
ID: ID,
|
ID: ID,
|
||||||
URL: URL,
|
|
||||||
TS: TS,
|
TS: TS,
|
||||||
Author: Author,
|
Author: Author,
|
||||||
Plaintext: Plaintext,
|
Plaintext: Plaintext,
|
||||||
|
|||||||
@@ -2,15 +2,23 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ModelToPersistence struct {
|
type ModelToPersistence struct {
|
||||||
pipeline Pipeline
|
pipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ModelIDs struct {
|
||||||
|
Event string
|
||||||
|
Message string
|
||||||
|
Thread string
|
||||||
|
}
|
||||||
|
|
||||||
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
reader, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
@@ -18,15 +26,41 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
|
writer = NewNoopQueue()
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newModelToPersistenceProcess(cfg.driver),
|
process: newModelToPersistenceProcess(cfg.storage),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newModelToPersistenceProcess(driver Driver) processFunc {
|
func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||||
return nil, errors.New("not impl")
|
var m Models
|
||||||
|
if err := json.Unmarshal(models, &m); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non models payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Event.Empty() {
|
||||||
|
} else if err := storage.UpsertEvent(ctx, m.Event); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Thread.Empty() {
|
||||||
|
} else if err := storage.UpsertThread(ctx, m.Thread); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist thread: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.Empty() {
|
||||||
|
} else if err := storage.UpsertMessage(ctx, m.Message); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("persisted models")
|
||||||
|
return json.Marshal(ModelIDs{
|
||||||
|
Event: m.Event.ID,
|
||||||
|
Thread: m.Thread.ID,
|
||||||
|
Message: m.Message.ID,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,8 +2,11 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestModelToPersistenceProcessor(t *testing.T) {
|
func TestModelToPersistenceProcessor(t *testing.T) {
|
||||||
@@ -12,7 +15,49 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
|||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
d := NewTestDriver(t)
|
d := NewTestDriver(t)
|
||||||
process := newModelToPersistenceProcess(d)
|
s, _ := NewStorage(ctx, d)
|
||||||
|
process := newModelToPersistenceProcess(s)
|
||||||
|
|
||||||
_, _ = ctx, process
|
_, _ = ctx, process
|
||||||
|
|
||||||
|
inputModels := Models{
|
||||||
|
Event: model.Event{ID: "event", Asset: "event-asset"},
|
||||||
|
//Thread: {ID: "thread", Channel: "thread-channel"},
|
||||||
|
Message: model.Message{ID: "message", Plaintext: "message-plaintext"},
|
||||||
|
}
|
||||||
|
input, _ := json.Marshal(inputModels)
|
||||||
|
|
||||||
|
var outputModelIDs ModelIDs
|
||||||
|
var n int
|
||||||
|
if output, err := process(ctx, input); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := json.Unmarshal(output, &outputModelIDs); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if outputModelIDs != (ModelIDs{Event: "event", Message: "message"}) {
|
||||||
|
t.Error(outputModelIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`); row.Err() != nil {
|
||||||
|
t.Error("cant count events:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count events:", err)
|
||||||
|
} else if n != 1 {
|
||||||
|
t.Error("bad event count:", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM threads`); row.Err() != nil {
|
||||||
|
t.Error("cant count threads:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count threads:", err)
|
||||||
|
} else if n != 0 {
|
||||||
|
t.Error("bad thread count:", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM messages`); row.Err() != nil {
|
||||||
|
t.Error("cant count messages:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count messages:", err)
|
||||||
|
} else if n != 1 {
|
||||||
|
t.Error("bad message count:", n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
15
pipeline.go
15
pipeline.go
@@ -1,6 +1,9 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Pipeline struct {
|
Pipeline struct {
|
||||||
@@ -22,7 +25,14 @@ func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
|
|||||||
func (p Pipeline) Process(ctx context.Context) error {
|
func (p Pipeline) Process(ctx context.Context) error {
|
||||||
ctx, can := context.WithCancel(ctx)
|
ctx, can := context.WithCancel(ctx)
|
||||||
defer can()
|
defer can()
|
||||||
|
err := p.processUntilErr(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("pipeline failed to process: %v", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p Pipeline) processUntilErr(ctx context.Context) error {
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
reservation, read, err := p.reader.Syn(ctx)
|
reservation, read, err := p.reader.Syn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -32,7 +42,8 @@ func (p Pipeline) Process(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p.writer.Enqueue(ctx, processed); err != nil {
|
if processed == nil {
|
||||||
|
} else if err := p.writer.Enqueue(ctx, processed); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := p.reader.Ack(ctx, reservation); err != nil {
|
if err := p.reader.Ack(ctx, reservation); err != nil {
|
||||||
|
|||||||
@@ -6,6 +6,47 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestPipelineDoesntPushEmptyMessage(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
output, _ := NewQueue(ctx, "output", NewTestDriver(t))
|
||||||
|
input, _ := NewQueue(ctx, "input", NewTestDriver(t))
|
||||||
|
|
||||||
|
calls := 0
|
||||||
|
process := func(_ context.Context, v []byte) ([]byte, error) {
|
||||||
|
calls += 1
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ing := NewPipeline(output, input, process)
|
||||||
|
go func() {
|
||||||
|
defer can()
|
||||||
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
if calls != 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if r, _, _ := output.syn(ctx); len(r) != 0 {
|
||||||
|
t.Error("something was pushed to out queue even though processor didnt emit content")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPipeline(t *testing.T) {
|
func TestPipeline(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
|||||||
29
queue.go
29
queue.go
@@ -20,7 +20,7 @@ func NewNoopQueue() Queue {
|
|||||||
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||||
if _, err := driver.ExecContext(ctx, `
|
if _, err := driver.ExecContext(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS queue (
|
CREATE TABLE IF NOT EXISTS queue (
|
||||||
id INTEGER PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
topic TEXT NOT NULL,
|
topic TEXT NOT NULL,
|
||||||
updated INTEGER NOT NULL,
|
updated INTEGER NOT NULL,
|
||||||
reservation TEXT,
|
reservation TEXT,
|
||||||
@@ -37,11 +37,12 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
||||||
`,
|
`,
|
||||||
q.topic,
|
q.topic,
|
||||||
time.Now().Unix(),
|
time.Now().Unix(),
|
||||||
b,
|
b,
|
||||||
|
uuid.New().String(),
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -59,7 +60,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return "", nil, ctx.Err()
|
return "", nil, ctx.Err()
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Millisecond * 500):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -71,16 +72,16 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
if result, err := q.driver.ExecContext(ctx, `
|
if result, err := q.driver.ExecContext(ctx, `
|
||||||
UPDATE queue
|
UPDATE queue
|
||||||
SET
|
SET
|
||||||
updated = ?, reservation = ?
|
updated = $1, reservation = $2
|
||||||
WHERE
|
WHERE
|
||||||
id IN (
|
id IN (
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE
|
WHERE
|
||||||
topic == ?
|
topic = $3
|
||||||
AND (
|
AND (
|
||||||
reservation IS NULL
|
reservation IS NULL
|
||||||
OR ? - updated > 60
|
OR $4 - updated > 60
|
||||||
)
|
)
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
@@ -95,7 +96,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
row := q.driver.QueryRowContext(ctx, `
|
row := q.driver.QueryRowContext(ctx, `
|
||||||
SELECT payload
|
SELECT payload
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE reservation==?
|
WHERE reservation=$1
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`, reservation)
|
`, reservation)
|
||||||
if err := row.Err(); err != nil {
|
if err := row.Err(); err != nil {
|
||||||
@@ -108,12 +109,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
||||||
|
return q.ack(ctx, []byte(reservation))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
||||||
if q.driver.DB == nil {
|
if q.driver.DB == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
result, err := q.driver.ExecContext(ctx, `
|
||||||
DELETE FROM queue
|
DELETE FROM queue
|
||||||
WHERE reservation==?
|
WHERE reservation=$1
|
||||||
`, reservation)
|
`, reservation)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if n, _ := result.RowsAffected(); n != 1 {
|
||||||
|
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
30
slack.go
30
slack.go
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -32,7 +33,7 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer, err := NewQueue(ctx, "new_message", cfg.driver)
|
writer, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
@@ -46,8 +47,13 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
|
|||||||
func newSlackToModelProcess(cfg Config) processFunc {
|
func newSlackToModelProcess(cfg Config) processFunc {
|
||||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||||
s, err := parseSlack(slack)
|
s, err := parseSlack(slack)
|
||||||
if err != nil {
|
if cfg.Debug {
|
||||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
log.Printf("%v: %s => %+v", err, slack, s)
|
||||||
|
}
|
||||||
|
if errors.Is(err, ErrIrrelevantMessage) {
|
||||||
|
return nil, nil
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to deserialize slack %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for pattern, ptr := range map[string]*string{
|
for pattern, ptr := range map[string]*string{
|
||||||
@@ -67,16 +73,18 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
|||||||
|
|
||||||
event := model.Event{}
|
event := model.Event{}
|
||||||
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
|
if s.Event != "" && s.Source != "" && s.TS > 0 && s.EventName != "" {
|
||||||
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, "TODO", s.Resolved)
|
event = model.NewEvent(s.Event, s.Source, s.TS, s.EventName, s.Asset, s.Datacenter, s.Team, s.Resolved)
|
||||||
}
|
}
|
||||||
message := model.Message{}
|
message := model.Message{}
|
||||||
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
|
if s.ID != "" && s.Source != "" && s.TS > 0 && s.Thread != "" {
|
||||||
message = model.NewMessage(s.ID, s.Source, s.TS, "TODO", s.Plaintext, s.Thread)
|
message = model.NewMessage(s.ID, s.TS, s.Author, s.Plaintext, s.Thread)
|
||||||
}
|
}
|
||||||
thread := model.Thread{}
|
thread := model.Thread{}
|
||||||
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
||||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("parsed slack message into models")
|
||||||
return json.Marshal(Models{
|
return json.Marshal(Models{
|
||||||
Event: event,
|
Event: event,
|
||||||
Message: message,
|
Message: message,
|
||||||
@@ -98,6 +106,8 @@ type (
|
|||||||
Asset string
|
Asset string
|
||||||
Resolved bool
|
Resolved bool
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
Author string
|
||||||
|
Team string
|
||||||
}
|
}
|
||||||
|
|
||||||
slackMessage struct {
|
slackMessage struct {
|
||||||
@@ -118,6 +128,7 @@ type (
|
|||||||
ParentID string `json:"thread_ts"`
|
ParentID string `json:"thread_ts"`
|
||||||
Text string
|
Text string
|
||||||
Blocks []slackBlock
|
Blocks []slackBlock
|
||||||
|
User string
|
||||||
// bot
|
// bot
|
||||||
Bot slackBot `json:"bot_profile"`
|
Bot slackBot `json:"bot_profile"`
|
||||||
Attachments []slackAttachment
|
Attachments []slackAttachment
|
||||||
@@ -171,9 +182,13 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
|||||||
return parsedSlackMessage{}, ErrIrrelevantMessage
|
return parsedSlackMessage{}, ErrIrrelevantMessage
|
||||||
}
|
}
|
||||||
var tagsField string
|
var tagsField string
|
||||||
|
var teamField string
|
||||||
for _, field := range s.Event.Attachments[0].Fields {
|
for _, field := range s.Event.Attachments[0].Fields {
|
||||||
if field.Title == "Tags" {
|
switch field.Title {
|
||||||
|
case "Tags":
|
||||||
tagsField = field.Value
|
tagsField = field.Value
|
||||||
|
case "Routed Teams":
|
||||||
|
teamField = field.Value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return parsedSlackMessage{
|
return parsedSlackMessage{
|
||||||
@@ -188,6 +203,8 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
|||||||
Asset: s.Event.Attachments[0].Text,
|
Asset: s.Event.Attachments[0].Text,
|
||||||
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||||
Datacenter: tagsField,
|
Datacenter: tagsField,
|
||||||
|
Author: s.Event.Bot.Name,
|
||||||
|
Team: teamField,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,6 +222,7 @@ func parseSlack(b []byte) (parsedSlackMessage, error) {
|
|||||||
Plaintext: s.Event.Text,
|
Plaintext: s.Event.Text,
|
||||||
Asset: "",
|
Asset: "",
|
||||||
Datacenter: "",
|
Datacenter: "",
|
||||||
|
Author: s.Event.User,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,14 +40,13 @@ func TestSlackToModelPipeline(t *testing.T) {
|
|||||||
"Alertconfig Workflow Failed",
|
"Alertconfig Workflow Failed",
|
||||||
"",
|
"",
|
||||||
"",
|
"",
|
||||||
"TODO",
|
"Datastores Non-Critical",
|
||||||
false,
|
false,
|
||||||
),
|
),
|
||||||
Message: model.NewMessage(
|
Message: model.NewMessage(
|
||||||
"1712927439.728409/1712927439",
|
"1712927439.728409/1712927439",
|
||||||
"https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409",
|
|
||||||
1712927439,
|
1712927439,
|
||||||
"TODO",
|
"Opsgenie for Alert Management",
|
||||||
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
"At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
"1712927439.728409",
|
"1712927439.728409",
|
||||||
),
|
),
|
||||||
@@ -129,6 +128,7 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Event: "",
|
Event: "",
|
||||||
Plaintext: "I gotta do this",
|
Plaintext: "I gotta do this",
|
||||||
Asset: "",
|
Asset: "",
|
||||||
|
Author: "U06868T6ADV",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"opsgenie_alert.json": {
|
"opsgenie_alert.json": {
|
||||||
@@ -164,6 +164,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
Author: "Opsgenie for Alert Management",
|
||||||
|
Team: "Datastores Non-Critical",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"opsgenie_alert_resolved.json": {
|
"opsgenie_alert_resolved.json": {
|
||||||
@@ -200,6 +202,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Resolved: true,
|
Resolved: true,
|
||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
Author: "Opsgenie for Alert Management",
|
||||||
|
Team: "Datastores Non-Critical",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"reingested_alert.json": {
|
"reingested_alert.json": {
|
||||||
@@ -215,6 +219,8 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
Resolved: true,
|
Resolved: true,
|
||||||
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb",
|
||||||
|
Author: "Opsgenie for Alert Management",
|
||||||
|
Team: "Datastores Non-Critical",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
16
storage.go
16
storage.go
@@ -43,7 +43,7 @@ func NewStorage(ctx context.Context, driver Driver) (Storage, error) {
|
|||||||
|
|
||||||
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
|
func (s Storage) GetEvent(ctx context.Context, ID string) (model.Event, error) {
|
||||||
v := model.Event{}
|
v := model.Event{}
|
||||||
err := s.selectOne(ctx, "events", &v, "ID = ?", ID)
|
err := s.selectOne(ctx, "events", &v, "ID = $1", ID)
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ func (s Storage) UpsertEvent(ctx context.Context, event model.Event) error {
|
|||||||
|
|
||||||
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
|
func (s Storage) GetMessage(ctx context.Context, ID string) (model.Message, error) {
|
||||||
v := model.Message{}
|
v := model.Message{}
|
||||||
err := s.selectOne(ctx, "messages", &v, "ID = ?", ID)
|
err := s.selectOne(ctx, "messages", &v, "ID = $1", ID)
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +63,7 @@ func (s Storage) UpsertMessage(ctx context.Context, message model.Message) error
|
|||||||
|
|
||||||
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
|
func (s Storage) GetThread(ctx context.Context, ID string) (model.Thread, error) {
|
||||||
v := model.Thread{}
|
v := model.Thread{}
|
||||||
err := s.selectOne(ctx, "threads", &v, "ID = ?", ID)
|
err := s.selectOne(ctx, "threads", &v, "ID = $1", ID)
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,7 +72,7 @@ func (s Storage) UpsertThread(ctx context.Context, thread model.Thread) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
func (s Storage) selectOne(ctx context.Context, table string, v any, clause string, args ...any) error {
|
||||||
if questions := strings.Count(clause, "?"); questions != len(args) {
|
if questions := strings.Count(clause, "$"); questions != len(args) {
|
||||||
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
return fmt.Errorf("expected %v args for clause but found %v", questions, len(args))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,10 +115,6 @@ func (s Storage) upsert(ctx context.Context, table string, v any) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range keys {
|
|
||||||
values = append(values, values[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
q := fmt.Sprintf(`
|
q := fmt.Sprintf(`
|
||||||
INSERT INTO %s (%s) VALUES (%s)
|
INSERT INTO %s (%s) VALUES (%s)
|
||||||
ON CONFLICT (ID) DO UPDATE SET %s
|
ON CONFLICT (ID) DO UPDATE SET %s
|
||||||
@@ -144,11 +140,11 @@ func keysArgsKeyargsValues(v any) ([]string, []string, []string, []any, error) {
|
|||||||
}
|
}
|
||||||
args := make([]string, len(keys))
|
args := make([]string, len(keys))
|
||||||
for i := range args {
|
for i := range args {
|
||||||
args[i] = "?"
|
args[i] = fmt.Sprintf("$%d", i+1)
|
||||||
}
|
}
|
||||||
keyArgs := make([]string, len(keys))
|
keyArgs := make([]string, len(keys))
|
||||||
for i := range keyArgs {
|
for i := range keyArgs {
|
||||||
keyArgs[i] = fmt.Sprintf("%s=?", keys[i])
|
keyArgs[i] = fmt.Sprintf("%s=$%d", keys[i], i+1)
|
||||||
}
|
}
|
||||||
values := make([]any, len(keys))
|
values := make([]any, len(keys))
|
||||||
for i := range values {
|
for i := range values {
|
||||||
|
|||||||
@@ -67,7 +67,6 @@ func TestStorage(t *testing.T) {
|
|||||||
t.Run("upsert get message", func(t *testing.T) {
|
t.Run("upsert get message", func(t *testing.T) {
|
||||||
m := model.NewMessage(
|
m := model.NewMessage(
|
||||||
"ID",
|
"ID",
|
||||||
"URL",
|
|
||||||
1,
|
1,
|
||||||
"Author",
|
"Author",
|
||||||
"Plaintext",
|
"Plaintext",
|
||||||
|
|||||||
Reference in New Issue
Block a user