tests run and fail again

main
Bel LaPointe 2024-04-16 06:52:07 -06:00
parent e372be4288
commit 8ae8f47753
11 changed files with 75 additions and 152 deletions

View File

@ -32,9 +32,8 @@ type Config struct {
driver Driver driver Driver
storage Storage storage Storage
ai AI ai AI
slackToMessagePipeline Pipeline slackToModelPipeline Pipeline
messageToPersistencePipeline Pipeline modelToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
} }
var ( var (
@ -134,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
result.ai = NewAINoop() result.ai = NewAINoop()
} }
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result) slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result)
if err != nil { if err != nil {
return Config{}, err return Config{}, err
} }
result.slackToMessagePipeline = slackToMessagePipeline result.slackToModelPipeline = slackToModelPipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result) modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result)
if err != nil { if err != nil {
return Config{}, err return Config{}, err
} }
result.messageToPersistencePipeline = messageToPersistencePipeline result.modelToPersistencePipeline = modelToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil return result, nil
} }

View File

@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-processPipelines(ctx, case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline, cfg.slackToModelPipeline,
cfg.messageToPersistencePipeline, cfg.modelToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
): ):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
} }
errs := []error{} errs := []error{}
for _, messageJSON := range page.Messages { for _, messageJSON := range page.Messages {
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return return
} }
if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil { if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
log.Printf("failed to ingest: %v", err) log.Printf("failed to ingest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@ -45,7 +45,6 @@ func TestRun(t *testing.T) {
cfg.EventNamePattern = renderEventNamePattern cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port cfg.Port = port
cfg.driver = NewTestDriver(t) cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}
@ -96,7 +95,7 @@ func TestRun(t *testing.T) {
t.Fatalf("(%d) %s", resp.StatusCode, b) t.Fatalf("(%d) %s", resp.StatusCode, b)
} }
var result struct { var result struct {
Messages []Message Messages []any
} }
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err) t.Fatal(err)
@ -189,7 +188,7 @@ func TestRun(t *testing.T) {
} }
var result struct { var result struct {
Thread []Message Thread []any
} }
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -1,32 +0,0 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

View File

@ -1,18 +0,0 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

View File

@ -2,13 +2,14 @@ package main
import ( import (
"context" "context"
"errors"
) )
type MessageToPersistence struct { type ModelToPersistence struct {
pipeline Pipeline pipeline Pipeline
} }
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver) reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
@ -20,19 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
return Pipeline{ return Pipeline{
writer: writer, writer: writer,
reader: reader, reader: reader,
process: newMessageToPersistenceProcess(cfg.storage), process: newModelToPersistenceProcess(cfg.driver),
}, nil }, nil
} }
func newMessageToPersistenceProcess(storage Storage) processFunc { func newModelToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) { return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg) return nil, errors.New("not impl")
if err != nil {
return nil, err
}
if err := storage.UpsertMessage(ctx, m); err != nil {
return nil, err
}
return msg, nil
} }
} }

View File

@ -6,29 +6,13 @@ import (
"time" "time"
) )
func TestMessageToPersistenceProcessor(t *testing.T) { func TestModelToPersistenceProcessor(t *testing.T) {
t.Parallel() t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
d := NewTestDriver(t) d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d) process := newModelToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil { _, _ = ctx, process
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
} }

View File

@ -2,14 +2,15 @@ package main
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
) )
type SlackToMessage struct { type SlackToModel struct {
pipeline Pipeline pipeline Pipeline
} }
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) { func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "slack_event", cfg.driver) reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
@ -21,11 +22,11 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error
return Pipeline{ return Pipeline{
writer: writer, writer: writer,
reader: reader, reader: reader,
process: newSlackToMessageProcess(cfg), process: newSlackToModelProcess(cfg),
}, nil }, nil
} }
func newSlackToMessageProcess(cfg Config) processFunc { func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) { return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if err != nil { if err != nil {
@ -34,3 +35,7 @@ func newSlackToMessageProcess(cfg Config) processFunc {
return m.Serialize(), nil return m.Serialize(), nil
} }
} }
func ParseSlack([]byte, string, string, string) (interface{ Serialize() []byte }, error) {
return nil, errors.New("not impl")
}

View File

@ -2,19 +2,16 @@ package main
import ( import (
"context" "context"
"os"
"testing" "testing"
"time" "time"
"gotest.tools/v3/assert"
) )
func TestSlackToMessagePipeline(t *testing.T) { func TestSlackToModelPipeline(t *testing.T) {
t.Parallel() t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5) ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can() defer can()
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)}) pipeline, err := NewSlackToModelPipeline(ctx, Config{driver: NewTestDriver(t)})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -24,6 +21,7 @@ func TestSlackToMessagePipeline(t *testing.T) {
} }
}() }()
/*
want := Message{ want := Message{
ID: "1712927439.728409/1712927439", ID: "1712927439.728409/1712927439",
TS: 1712927439, TS: 1712927439,
@ -47,4 +45,5 @@ func TestSlackToMessagePipeline(t *testing.T) {
} else { } else {
assert.DeepEqual(t, want, m) assert.DeepEqual(t, want, m)
} }
*/
} }