From 39c005619023039ff91424e26bf3c261a82356a4 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Tue, 16 Apr 2024 07:56:54 -0600 Subject: [PATCH] found running locally that i dont need rest, pipeline needs a way to drop messages as garbage --- README.md | 3 +- config.go | 7 +- driver.go | 4 +- main.go | 59 ----------------- main_test.go | 163 ----------------------------------------------- pipeline.go | 15 ++++- pipeline_test.go | 41 ++++++++++++ queue.go | 2 +- slack.go | 6 +- 9 files changed, 69 insertions(+), 231 deletions(-) diff --git a/README.md b/README.md index 8573fcb..6f1e143 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,13 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/) ## TODO -- limit queue retries - share postgres with Grafana - new dash in Grafana - what SLO/SLI can I help benoit with - break into smaller goals - sell to the team - scott; like to keep state in incident.io and zendesk - +- limit queue retries ``` erDiagram diff --git a/config.go b/config.go index 6f2bc7c..a1365f9 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "os" "regexp" "strconv" @@ -106,8 +107,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, return Config{}, err } - ctx, can := context.WithTimeout(ctx, time.Second*10) + ctx, can := context.WithTimeout(ctx, time.Minute) defer can() + driver, err := NewDriver(ctx, result.DriverConn) if err != nil { return Config{}, err @@ -118,6 +120,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, } else { return Config{}, errors.New("not impl") } + if result.Debug { + log.Printf("connected to driver at %s (%s @%s)", result.DriverConn, result.driver.engine, result.driver.conn) + } storage, err := NewStorage(ctx, result.driver) if err != nil { diff --git a/driver.go b/driver.go index 25955c4..5faa25c 100644 --- a/driver.go +++ b/driver.go @@ -14,6 +14,8 @@ import ( ) type Driver struct { + engine string + conn string *sql.DB } @@ -47,7 +49,7 @@ func NewDriver(ctx context.Context, conn string) (Driver, error) { return Driver{}, err } - driver := Driver{DB: db} + driver := Driver{DB: db, conn: conn, engine: engine} if err := driver.setup(ctx); err != nil { driver.Close() return Driver{}, fmt.Errorf("failed setup: %w", err) diff --git a/main.go b/main.go index c1047e3..4670d23 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,6 @@ func run(ctx context.Context, cfg Config) error { func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error { ctx, can := context.WithCancel(ctx) - defer can() pipelines = append(pipelines, first) errs := make(chan error) @@ -86,11 +85,6 @@ func listenAndServe(ctx context.Context, cfg Config) chan error { func newHandler(cfg Config) http.HandlerFunc { mux := http.NewServeMux() - mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) - mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) - mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) - mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) - mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) @@ -154,59 +148,6 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { } } -func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - http.Error(w, "not impl", http.StatusNotImplemented) - } -} - -func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - http.Error(w, "not impl", http.StatusNotImplemented) - } -} - -func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - http.Error(w, "not impl", http.StatusNotImplemented) - } -} - -func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - http.Error(w, "not impl", http.StatusNotImplemented) - } -} - -func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] - _ = thread - - http.Error(w, "not impl", http.StatusNotImplemented) - } -} - func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { http.Error(w, "shoo", http.StatusForbidden) diff --git a/main_test.go b/main_test.go index 1f19a47..efdeecf 100644 --- a/main_test.go +++ b/main_test.go @@ -3,8 +3,6 @@ package main import ( "bytes" "context" - "encoding/csv" - "encoding/json" "fmt" "io" "net/http" @@ -85,165 +83,4 @@ func TestRun(t *testing.T) { t.Fatalf("(%d) %s", resp.StatusCode, b) } }) - - t.Run("GET /api/v1/messages", func(t *testing.T) { - resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u)) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - var result struct { - Messages []any - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - t.Fatal(err) - } else if len(result.Messages) != 1 { - t.Fatal(result.Messages) - } else { - t.Logf("%+v", result) - } - }) - - t.Run("GET /api/v1/eventnames", func(t *testing.T) { - resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u)) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - var result struct { - EventNames []string - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - t.Fatal(err) - } else if result.EventNames[0] != "Wal Receive Count Alert" { - t.Fatal(result.EventNames) - } else { - t.Logf("%+v", result) - } - }) - - t.Run("GET /api/v1/events", func(t *testing.T) { - resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u)) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - var result struct { - Events []string - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - t.Fatal(err) - } else if result.Events[0] != "11067" { - t.Fatal(result.Events) - } else { - t.Logf("%+v", result) - } - }) - - t.Run("GET /api/v1/threads", func(t *testing.T) { - resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u)) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - var result struct { - Threads []string - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - t.Fatal(err) - } else if result.Threads[0] != "1712911957.023359" { - t.Fatal(result.Threads) - } else { - t.Logf("%+v", result) - } - }) - - t.Run("GET /api/v1/threads/1712911957.023359", func(t *testing.T) { - resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u)) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - - var result struct { - Thread []any - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - t.Fatal(err) - } else if len(result.Thread) != 1 { - t.Fatal(result.Thread) - } else { - t.Logf("%+v", result) - } - }) - - t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil) - if err != nil { - t.Fatal(err) - } - req.Header.Set("Accept", "text/csv") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - t.Fatalf("(%d) %s", resp.StatusCode, b) - } - - b, _ := io.ReadAll(resp.Body) - t.Logf("whole csv: \n%s", b) - - dec := csv.NewReader(bytes.NewReader(b)) - var lastLine []string - for { - line, err := dec.Read() - if err == io.EOF { - break - } else if err != nil { - t.Error("unexpected error while reading csv line:", err) - } - - if lastLine == nil { - } else if len(lastLine) != len(line) { - t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line)) - } - - t.Logf("CSV line: %+v", line) - lastLine = line - } - if lastLine == nil { - t.Error("no lines found") - } - - }) } diff --git a/pipeline.go b/pipeline.go index 06f9401..8274d95 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,6 +1,9 @@ package main -import "context" +import ( + "context" + "log" +) type ( Pipeline struct { @@ -22,7 +25,14 @@ func NewPipeline(writer, reader Queue, process processFunc) Pipeline { func (p Pipeline) Process(ctx context.Context) error { ctx, can := context.WithCancel(ctx) defer can() + err := p.processUntilErr(ctx) + if err != nil { + log.Printf("pipeline failed to process: %v", err) + } + return err +} +func (p Pipeline) processUntilErr(ctx context.Context) error { for ctx.Err() == nil { reservation, read, err := p.reader.Syn(ctx) if err != nil { @@ -32,7 +42,8 @@ func (p Pipeline) Process(ctx context.Context) error { if err != nil { return err } - if err := p.writer.Enqueue(ctx, processed); err != nil { + if processed == nil { + } else if err := p.writer.Enqueue(ctx, processed); err != nil { return err } if err := p.reader.Ack(ctx, reservation); err != nil { diff --git a/pipeline_test.go b/pipeline_test.go index b2f9865..4c0c3fb 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -6,6 +6,47 @@ import ( "time" ) +func TestPipelineDoesntPushEmptyMessage(t *testing.T) { + t.Parallel() + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + + output, _ := NewQueue(ctx, "output", NewTestDriver(t)) + input, _ := NewQueue(ctx, "input", NewTestDriver(t)) + + calls := 0 + process := func(_ context.Context, v []byte) ([]byte, error) { + calls += 1 + return nil, nil + } + + if err := input.Enqueue(ctx, []byte("hello")); err != nil { + t.Error(err) + } + + ing := NewPipeline(output, input, process) + go func() { + defer can() + if err := ing.Process(ctx); err != nil && ctx.Err() == nil { + t.Fatal(err) + } + }() + + for ctx.Err() == nil { + if calls != 0 { + break + } + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond * 100): + } + } + + if r, _, _ := output.syn(ctx); len(r) != 0 { + t.Error("something was pushed to out queue even though processor didnt emit content") + } +} + func TestPipeline(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*10) diff --git a/queue.go b/queue.go index ed98d06..6593aa0 100644 --- a/queue.go +++ b/queue.go @@ -59,7 +59,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) { select { case <-ctx.Done(): return "", nil, ctx.Err() - case <-time.After(time.Second): + case <-time.After(time.Millisecond * 500): } } } diff --git a/slack.go b/slack.go index 5e58978..45a282c 100644 --- a/slack.go +++ b/slack.go @@ -46,8 +46,10 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) func newSlackToModelProcess(cfg Config) processFunc { return func(ctx context.Context, slack []byte) ([]byte, error) { s, err := parseSlack(slack) - if err != nil { - return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack) + if errors.Is(err, ErrIrrelevantMessage) { + return nil, nil + } else if err != nil { + return nil, fmt.Errorf("failed to deserialize slack %v", err) } for pattern, ptr := range map[string]*string{