From c84d80e8d3a6f39898f18498baeaa8cae1bc800a Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 17:00:15 -0600 Subject: [PATCH] test message to persistence --- persistence.go | 22 ++++++++++++++++++---- persistence_test.go | 28 +++++++++++++++++----------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/persistence.go b/persistence.go index a17c04c..f34264a 100644 --- a/persistence.go +++ b/persistence.go @@ -2,7 +2,7 @@ package main import ( "context" - "io" + "fmt" ) type MessageToPersistence struct { @@ -26,8 +26,22 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, } func newMessageToPersistenceProcess(driver Driver) processFunc { - return func(ctx context.Context, slack []byte) ([]byte, error) { - // TODO table design lets go - return nil, io.EOF + return func(ctx context.Context, msg []byte) ([]byte, error) { + m, err := Deserialize(msg) + if err != nil { + return nil, err + } + if result, err := driver.ExecContext(ctx, ` + CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT); + INSERT INTO messages (id, v) VALUES (?, ?) + ON CONFLICT(id) DO UPDATE set v = ?; + `, m.ID, msg, msg); err != nil { + return nil, err + } else if n, err := result.RowsAffected(); err != nil { + return nil, err + } else if n != 1 { + return nil, fmt.Errorf("upserting event to persistence modified %v rows", n) + } + return msg, nil } } diff --git a/persistence_test.go b/persistence_test.go index 4899b15..d12adf3 100644 --- a/persistence_test.go +++ b/persistence_test.go @@ -11,18 +11,24 @@ func TestMessageToPersistenceProcessor(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() - cases := map[string]string{} + d := NewTestDriver(t) + process := newMessageToPersistenceProcess(d) - for given, wantd := range cases { - want := wantd - t.Run(given, func(t *testing.T) { - process := newMessageToPersistenceProcess(NewTestDriver(t)) + if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil { + t.Fatal(err) + } else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil { + t.Fatal("failed to upsert on redundant process", err) + } - if got, err := process(ctx, []byte(given)); err != nil { - t.Fatal(err) - } else if string(got) != string(want) { - t.Errorf("wanted %q but got %q", want, got) - } - }) + var id, v []byte + row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x") + if err := row.Err(); err != nil { + t.Fatal(err) + } else if err := row.Scan(&id, &v); err != nil { + t.Fatal(err) + } else if string(id) != "x" { + t.Fatal(string(id)) + } else if string(v) != `{"ID":"x"}` { + t.Fatal(string(v)) } }