found running locally that i dont need rest, pipeline needs a way to drop messages as garbage

main
Bel LaPointe 2024-04-16 07:56:54 -06:00
parent d87af2fadc
commit 39c0056190
9 changed files with 69 additions and 231 deletions

View File

@ -4,14 +4,13 @@ Thank you, [Sean](https://www.linkedin.com/in/sean-moore-1755a619/)
## TODO
- limit queue retries
- share postgres with Grafana
- new dash in Grafana
- what SLO/SLI can I help benoit with
- break into smaller goals
- sell to the team
- scott; like to keep state in incident.io and zendesk
- limit queue retries
```
erDiagram

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"regexp"
"strconv"
@ -106,8 +107,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, err
}
ctx, can := context.WithTimeout(ctx, time.Second*10)
ctx, can := context.WithTimeout(ctx, time.Minute)
defer can()
driver, err := NewDriver(ctx, result.DriverConn)
if err != nil {
return Config{}, err
@ -118,6 +120,9 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} else {
return Config{}, errors.New("not impl")
}
if result.Debug {
log.Printf("connected to driver at %s (%s @%s)", result.DriverConn, result.driver.engine, result.driver.conn)
}
storage, err := NewStorage(ctx, result.driver)
if err != nil {

View File

@ -14,6 +14,8 @@ import (
)
type Driver struct {
engine string
conn string
*sql.DB
}
@ -47,7 +49,7 @@ func NewDriver(ctx context.Context, conn string) (Driver, error) {
return Driver{}, err
}
driver := Driver{DB: db}
driver := Driver{DB: db, conn: conn, engine: engine}
if err := driver.setup(ctx); err != nil {
driver.Close()
return Driver{}, fmt.Errorf("failed setup: %w", err)

59
main.go
View File

@ -48,7 +48,6 @@ func run(ctx context.Context, cfg Config) error {
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first)
errs := make(chan error)
@ -86,11 +85,6 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
@ -154,59 +148,6 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
_ = thread
http.Error(w, "not impl", http.StatusNotImplemented)
}
}
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)

View File

@ -3,8 +3,6 @@ package main
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
@ -85,165 +83,4 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
})
t.Run("GET /api/v1/messages", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/messages", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Messages []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Messages) != 1 {
t.Fatal(result.Messages)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/eventnames", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/eventnames", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
EventNames []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.EventNames[0] != "Wal Receive Count Alert" {
t.Fatal(result.EventNames)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/events", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/events", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Events []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.Events[0] != "11067" {
t.Fatal(result.Events)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/threads", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Threads []string
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if result.Threads[0] != "1712911957.023359" {
t.Fatal(result.Threads)
} else {
t.Logf("%+v", result)
}
})
t.Run("GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
var result struct {
Thread []any
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err)
} else if len(result.Thread) != 1 {
t.Fatal(result.Thread)
} else {
t.Logf("%+v", result)
}
})
t.Run("CSV GET /api/v1/threads/1712911957.023359", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/threads/1712911957.023359", u), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "text/csv")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
t.Fatalf("(%d) %s", resp.StatusCode, b)
}
b, _ := io.ReadAll(resp.Body)
t.Logf("whole csv: \n%s", b)
dec := csv.NewReader(bytes.NewReader(b))
var lastLine []string
for {
line, err := dec.Read()
if err == io.EOF {
break
} else if err != nil {
t.Error("unexpected error while reading csv line:", err)
}
if lastLine == nil {
} else if len(lastLine) != len(line) {
t.Errorf("last line had %v elements but this line has %v", len(lastLine), len(line))
}
t.Logf("CSV line: %+v", line)
lastLine = line
}
if lastLine == nil {
t.Error("no lines found")
}
})
}

View File

@ -1,6 +1,9 @@
package main
import "context"
import (
"context"
"log"
)
type (
Pipeline struct {
@ -22,7 +25,14 @@ func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
func (p Pipeline) Process(ctx context.Context) error {
ctx, can := context.WithCancel(ctx)
defer can()
err := p.processUntilErr(ctx)
if err != nil {
log.Printf("pipeline failed to process: %v", err)
}
return err
}
func (p Pipeline) processUntilErr(ctx context.Context) error {
for ctx.Err() == nil {
reservation, read, err := p.reader.Syn(ctx)
if err != nil {
@ -32,7 +42,8 @@ func (p Pipeline) Process(ctx context.Context) error {
if err != nil {
return err
}
if err := p.writer.Enqueue(ctx, processed); err != nil {
if processed == nil {
} else if err := p.writer.Enqueue(ctx, processed); err != nil {
return err
}
if err := p.reader.Ack(ctx, reservation); err != nil {

View File

@ -6,6 +6,47 @@ import (
"time"
)
func TestPipelineDoesntPushEmptyMessage(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
output, _ := NewQueue(ctx, "output", NewTestDriver(t))
input, _ := NewQueue(ctx, "input", NewTestDriver(t))
calls := 0
process := func(_ context.Context, v []byte) ([]byte, error) {
calls += 1
return nil, nil
}
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
t.Error(err)
}
ing := NewPipeline(output, input, process)
go func() {
defer can()
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
for ctx.Err() == nil {
if calls != 0 {
break
}
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 100):
}
}
if r, _, _ := output.syn(ctx); len(r) != 0 {
t.Error("something was pushed to out queue even though processor didnt emit content")
}
}
func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)

View File

@ -59,7 +59,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
select {
case <-ctx.Done():
return "", nil, ctx.Err()
case <-time.After(time.Second):
case <-time.After(time.Millisecond * 500):
}
}
}

View File

@ -46,8 +46,10 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
s, err := parseSlack(slack)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
if errors.Is(err, ErrIrrelevantMessage) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %v", err)
}
for pattern, ptr := range map[string]*string{