From 5fa21d0cd9f956ccba91c3c90d110ce36e989066 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Tue, 16 Apr 2024 07:29:42 -0600 Subject: [PATCH] model to persist pipeline tests OK --- persistence.go | 43 +++++++++++++++++++++++++++++++++++------ persistence_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++- slack.go | 2 +- 3 files changed, 84 insertions(+), 8 deletions(-) diff --git a/persistence.go b/persistence.go index 7ce003d..933fd10 100644 --- a/persistence.go +++ b/persistence.go @@ -2,15 +2,22 @@ package main import ( "context" - "errors" + "encoding/json" + "fmt" ) type ModelToPersistence struct { pipeline Pipeline } +type ModelIDs struct { + Event string + Message string + Thread string +} + func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { - reader, err := NewQueue(ctx, "new_message", cfg.driver) + reader, err := NewQueue(ctx, "new_models", cfg.driver) if err != nil { return Pipeline{}, err } @@ -21,12 +28,36 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e return Pipeline{ writer: writer, reader: reader, - process: newModelToPersistenceProcess(cfg.driver), + process: newModelToPersistenceProcess(cfg.storage), }, nil } -func newModelToPersistenceProcess(driver Driver) processFunc { - return func(ctx context.Context, msg []byte) ([]byte, error) { - return nil, errors.New("not impl") +func newModelToPersistenceProcess(storage Storage) processFunc { + return func(ctx context.Context, models []byte) ([]byte, error) { + var m Models + if err := json.Unmarshal(models, &m); err != nil { + return nil, fmt.Errorf("received non models payload: %w", err) + } + + if m.Event.Empty() { + } else if err := storage.UpsertEvent(ctx, m.Event); err != nil { + return nil, fmt.Errorf("failed to persist event: %w", err) + } + + if m.Thread.Empty() { + } else if err := storage.UpsertThread(ctx, m.Thread); err != nil { + return nil, fmt.Errorf("failed to persist thread: %w", err) + } + + if m.Message.Empty() { + } else if err := storage.UpsertMessage(ctx, m.Message); err != nil { + return nil, fmt.Errorf("failed to persist message: %w", err) + } + + return json.Marshal(ModelIDs{ + Event: m.Event.ID, + Thread: m.Thread.ID, + Message: m.Message.ID, + }) } } diff --git a/persistence_test.go b/persistence_test.go index 4256c91..8c15803 100644 --- a/persistence_test.go +++ b/persistence_test.go @@ -2,8 +2,11 @@ package main import ( "context" + "encoding/json" "testing" "time" + + "github.com/breel-render/spoc-bot-vr/model" ) func TestModelToPersistenceProcessor(t *testing.T) { @@ -12,7 +15,49 @@ func TestModelToPersistenceProcessor(t *testing.T) { defer can() d := NewTestDriver(t) - process := newModelToPersistenceProcess(d) + s, _ := NewStorage(ctx, d) + process := newModelToPersistenceProcess(s) _, _ = ctx, process + + inputModels := Models{ + Event: model.Event{ID: "event", Asset: "event-asset"}, + //Thread: {ID: "thread", Channel: "thread-channel"}, + Message: model.Message{ID: "message", Plaintext: "message-plaintext"}, + } + input, _ := json.Marshal(inputModels) + + var outputModelIDs ModelIDs + var n int + if output, err := process(ctx, input); err != nil { + t.Fatal(err) + } else if err := json.Unmarshal(output, &outputModelIDs); err != nil { + t.Fatal(err) + } else if outputModelIDs != (ModelIDs{Event: "event", Message: "message"}) { + t.Error(outputModelIDs) + } + + if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`); row.Err() != nil { + t.Error("cant count events:", row.Err()) + } else if err := row.Scan(&n); err != nil { + t.Error("cant count events:", err) + } else if n != 1 { + t.Error("bad event count:", n) + } + + if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM threads`); row.Err() != nil { + t.Error("cant count threads:", row.Err()) + } else if err := row.Scan(&n); err != nil { + t.Error("cant count threads:", err) + } else if n != 0 { + t.Error("bad thread count:", n) + } + + if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM messages`); row.Err() != nil { + t.Error("cant count messages:", row.Err()) + } else if err := row.Scan(&n); err != nil { + t.Error("cant count messages:", err) + } else if n != 1 { + t.Error("bad message count:", n) + } } diff --git a/slack.go b/slack.go index 8499e4d..e249dc4 100644 --- a/slack.go +++ b/slack.go @@ -32,7 +32,7 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) if err != nil { return Pipeline{}, err } - writer, err := NewQueue(ctx, "new_message", cfg.driver) + writer, err := NewQueue(ctx, "new_models", cfg.driver) if err != nil { return Pipeline{}, err }