From 5557c0920a3b33020cd8cba899bf1dc9a17694a3 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Tue, 16 Apr 2024 08:53:09 -0600 Subject: [PATCH] pg passing mvp test with queue --- driver_integration_test.go | 53 ++++++++++++++++++++++++++++++++++++++ queue.go | 15 ++++++----- 2 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 driver_integration_test.go diff --git a/driver_integration_test.go b/driver_integration_test.go new file mode 100644 index 0000000..308a497 --- /dev/null +++ b/driver_integration_test.go @@ -0,0 +1,53 @@ +//go:build integration + +package main + +import ( + "context" + "os" + "testing" + "time" + + "github.com/breel-render/spoc-bot-vr/model" +) + +func TestDriverIntegration(t *testing.T) { + ctx, can := context.WithTimeout(context.Background(), time.Second*30) + defer can() + + driver, err := NewDriver(ctx, os.Getenv("DRIVER_CONN")) + if err != nil { + t.Fatal(err) + } + defer driver.Close() + + q, err := NewQueue(ctx, t.Name(), driver) + if err != nil { + t.Fatal(err) + } + qV := []byte("hello") + if err := q.Enqueue(ctx, qV); err != nil { + t.Error("q cannot enqueue:", err) + } else if reservation, v, err := q.Syn(ctx); err != nil { + t.Error("q cannot syn:", err) + } else if string(v) != string(qV) { + t.Error("q enqueued wrong:", string(v)) + } else if len(reservation) == 0 { + t.Error("q didnt have reservation") + } else if err := q.Ack(ctx, reservation); err != nil { + t.Error("q cannot ack:", err) + } + + s, err := NewStorage(ctx, driver) + if err != nil { + t.Fatal(err) + } + evt := model.Event{ID: "x", Name: "y"} + if err := s.UpsertEvent(ctx, evt); err != nil { + t.Error("s cannot upsert:", err) + } else if e, err := s.GetEvent(ctx, evt.ID); err != nil { + t.Error("s cannot get:", err) + } else if e != evt { + t.Error("s upserted wrong:", e) + } +} diff --git a/queue.go b/queue.go index 8ae6878..ad2118a 100644 --- a/queue.go +++ b/queue.go @@ -20,7 +20,7 @@ func NewNoopQueue() Queue { func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) { if _, err := driver.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS queue ( - id INTEGER PRIMARY KEY, + id TEXT PRIMARY KEY, topic TEXT NOT NULL, updated INTEGER NOT NULL, reservation TEXT, @@ -37,11 +37,12 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error { return nil } _, err := q.driver.ExecContext(ctx, ` - INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?) + INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3) `, q.topic, time.Now().Unix(), b, + uuid.New().String(), ) return err } @@ -71,16 +72,16 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { if result, err := q.driver.ExecContext(ctx, ` UPDATE queue SET - updated = ?, reservation = ? + updated = $1, reservation = $2 WHERE id IN ( SELECT id FROM queue WHERE - topic == ? + topic = $3 AND ( reservation IS NULL - OR ? - updated > 60 + OR $4 - updated > 60 ) LIMIT 1 ) @@ -95,7 +96,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { row := q.driver.QueryRowContext(ctx, ` SELECT payload FROM queue - WHERE reservation==? + WHERE reservation=$1 LIMIT 1 `, reservation) if err := row.Err(); err != nil { @@ -117,7 +118,7 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error { } result, err := q.driver.ExecContext(ctx, ` DELETE FROM queue - WHERE reservation==? + WHERE reservation=$1 `, reservation) if err != nil { return err