pg passing mvp test with queue
parent
c7f5cdb040
commit
5557c0920a
|
|
@ -0,0 +1,53 @@
|
|||
//go:build integration
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/breel-render/spoc-bot-vr/model"
|
||||
)
|
||||
|
||||
func TestDriverIntegration(t *testing.T) {
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*30)
|
||||
defer can()
|
||||
|
||||
driver, err := NewDriver(ctx, os.Getenv("DRIVER_CONN"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer driver.Close()
|
||||
|
||||
q, err := NewQueue(ctx, t.Name(), driver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qV := []byte("hello")
|
||||
if err := q.Enqueue(ctx, qV); err != nil {
|
||||
t.Error("q cannot enqueue:", err)
|
||||
} else if reservation, v, err := q.Syn(ctx); err != nil {
|
||||
t.Error("q cannot syn:", err)
|
||||
} else if string(v) != string(qV) {
|
||||
t.Error("q enqueued wrong:", string(v))
|
||||
} else if len(reservation) == 0 {
|
||||
t.Error("q didnt have reservation")
|
||||
} else if err := q.Ack(ctx, reservation); err != nil {
|
||||
t.Error("q cannot ack:", err)
|
||||
}
|
||||
|
||||
s, err := NewStorage(ctx, driver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
evt := model.Event{ID: "x", Name: "y"}
|
||||
if err := s.UpsertEvent(ctx, evt); err != nil {
|
||||
t.Error("s cannot upsert:", err)
|
||||
} else if e, err := s.GetEvent(ctx, evt.ID); err != nil {
|
||||
t.Error("s cannot get:", err)
|
||||
} else if e != evt {
|
||||
t.Error("s upserted wrong:", e)
|
||||
}
|
||||
}
|
||||
15
queue.go
15
queue.go
|
|
@ -20,7 +20,7 @@ func NewNoopQueue() Queue {
|
|||
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||
if _, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS queue (
|
||||
id INTEGER PRIMARY KEY,
|
||||
id TEXT PRIMARY KEY,
|
||||
topic TEXT NOT NULL,
|
||||
updated INTEGER NOT NULL,
|
||||
reservation TEXT,
|
||||
|
|
@ -37,11 +37,12 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
|||
return nil
|
||||
}
|
||||
_, err := q.driver.ExecContext(ctx, `
|
||||
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
||||
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3)
|
||||
`,
|
||||
q.topic,
|
||||
time.Now().Unix(),
|
||||
b,
|
||||
uuid.New().String(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
@ -71,16 +72,16 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||
if result, err := q.driver.ExecContext(ctx, `
|
||||
UPDATE queue
|
||||
SET
|
||||
updated = ?, reservation = ?
|
||||
updated = $1, reservation = $2
|
||||
WHERE
|
||||
id IN (
|
||||
SELECT id
|
||||
FROM queue
|
||||
WHERE
|
||||
topic == ?
|
||||
topic = $3
|
||||
AND (
|
||||
reservation IS NULL
|
||||
OR ? - updated > 60
|
||||
OR $4 - updated > 60
|
||||
)
|
||||
LIMIT 1
|
||||
)
|
||||
|
|
@ -95,7 +96,7 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|||
row := q.driver.QueryRowContext(ctx, `
|
||||
SELECT payload
|
||||
FROM queue
|
||||
WHERE reservation==?
|
||||
WHERE reservation=$1
|
||||
LIMIT 1
|
||||
`, reservation)
|
||||
if err := row.Err(); err != nil {
|
||||
|
|
@ -117,7 +118,7 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
|||
}
|
||||
result, err := q.driver.ExecContext(ctx, `
|
||||
DELETE FROM queue
|
||||
WHERE reservation==?
|
||||
WHERE reservation=$1
|
||||
`, reservation)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
Loading…
Reference in New Issue