From 42c5b7d7ad8fa4c8771006d970b7e9b7d00cb3cd Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:30:35 -0600 Subject: [PATCH] multi topic done --- .storage.go | 39 ---------------------------- .storage_test.go | 67 ------------------------------------------------ ingest.go | 22 ++++++++++++++++ queue.go | 30 +++++++++++++--------- queue_test.go | 16 +++++++++--- 5 files changed, 53 insertions(+), 121 deletions(-) delete mode 100644 .storage.go delete mode 100644 .storage_test.go create mode 100644 ingest.go diff --git a/.storage.go b/.storage.go deleted file mode 100644 index ca3089b..0000000 --- a/.storage.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "context" - "errors" - "time" -) - -var ( - ErrNotFound = errors.New("not found") -) - -type Storage struct { - driver Driver -} - -func NewStorage(driver Driver) Storage { - return Storage{driver: driver} -} - -func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) { - return nil, errors.New("not impl") -} - -func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) { - return nil, errors.New("not impl") -} - -func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) { - return nil, errors.New("not impl") -} - -func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) { - return nil, errors.New("not impl") -} - -func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) { - return nil, errors.New("not impl") -} diff --git a/.storage_test.go b/.storage_test.go deleted file mode 100644 index 072b32b..0000000 --- a/.storage_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package main - -import ( - "context" - "testing" - "time" -) - -func TestStorage(t *testing.T) { - ctx, can := context.WithTimeout(context.Background(), time.Second) - defer can() - - t.Run("Threads", func(t *testing.T) { - s := NewStorage(NewRAM()) - - mX1 := Message{ID: "1", Thread: "X", TS: 1} - mX2 := Message{ID: "2", Thread: "X", TS: 2} - mY1 := Message{ID: "1", Thread: "Y", TS: 3} - - for _, m := range []Message{mX1, mX2, mY1} { - if err := s.Upsert(ctx, m); err != nil { - t.Fatal(err) - } - } - - if threads, err := s.Threads(ctx); err != nil { - t.Error(err) - } else if len(threads) != 2 { - t.Error(threads) - } else if threads[0] != "X" { - t.Error(threads, "X") - } else if threads[1] != "Y" { - t.Error(threads, "Y") - } - - if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil { - t.Error(err) - } else if len(threads) != 1 { - t.Error(threads) - } else if threads[0] != "Y" { - t.Error(threads[0]) - } - }) - - t.Run("Get Upsert", func(t *testing.T) { - s := NewStorage(NewRAM()) - - if _, err := s.Get(ctx, "id"); err != ErrNotFound { - t.Error("failed to get 404", err) - } - - m := Message{ - ID: "id", - TS: 1, - } - - if err := s.Upsert(ctx, m); err != nil { - t.Error("failed to upsert", err) - } - - if m2, err := s.Get(ctx, "id"); err != nil { - t.Error("failed to get", err) - } else if m != m2 { - t.Error(m2) - } - }) -} diff --git a/ingest.go b/ingest.go new file mode 100644 index 0000000..98da739 --- /dev/null +++ b/ingest.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + "io" +) + +type Ingester struct { + d Driver + q Queue +} + +func NewIngester(d Driver, q Queue) Ingester { + return Ingester{ + d: d, + q: q, + } +} + +func (i Ingester) Ingest(ctx context.Context) error { + return io.EOF +} diff --git a/queue.go b/queue.go index 446defd..a9b1364 100644 --- a/queue.go +++ b/queue.go @@ -13,31 +13,34 @@ type Queue struct { } func NewQueue(ctx context.Context, driver Driver) (Queue, error) { - _, err := driver.ExecContext(ctx, ` - DROP TABLE IF EXISTS queue; + if _, err := driver.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS queue ( id INTEGER PRIMARY KEY, + topic TEXT NOT NULL, updated INTEGER NOT NULL, reservation TEXT, payload TEXT ); - `) - return Queue{driver: driver}, err + `); err != nil { + return Queue{}, fmt.Errorf("failed to create table: %w", err) + } + return Queue{driver: driver}, nil } -func (q Queue) Enqueue(ctx context.Context, b []byte) error { +func (q Queue) Enqueue(ctx context.Context, topic string, b []byte) error { _, err := q.driver.ExecContext(ctx, ` - INSERT INTO queue (updated, payload) VALUES (?, ?) + INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?) `, + topic, time.Now().Unix(), b, ) return err } -func (q Queue) Syn(ctx context.Context) (string, []byte, error) { +func (q Queue) Syn(ctx context.Context, topic string) (string, []byte, error) { for { - reservation, m, err := q.syn(ctx) + reservation, m, err := q.syn(ctx, topic) if reservation != nil || err != nil { return string(reservation), m, err } @@ -50,7 +53,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) { } } -func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { +func (q Queue) syn(ctx context.Context, topic string) ([]byte, []byte, error) { now := time.Now().Unix() reservation := []byte(uuid.New().String()) var payload []byte @@ -63,11 +66,14 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { SELECT id FROM queue WHERE - reservation IS NULL - OR ? - updated > 60 + topic == ? + AND ( + reservation IS NULL + OR ? - updated > 60 + ) LIMIT 1 ) - `, now, reservation, now); err != nil { + `, now, reservation, topic, now); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: %w", err) } else if n, err := result.RowsAffected(); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) diff --git a/queue_test.go b/queue_test.go index 25edb68..1b8a304 100644 --- a/queue_test.go +++ b/queue_test.go @@ -11,7 +11,7 @@ func TestQueue(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() - driver, _ := NewDriver(ctx, "/tmp/f.db") + driver, _ := NewDriver(ctx, "") q, err := NewQueue(ctx, driver) if err != nil { t.Fatal(err) @@ -19,16 +19,20 @@ func TestQueue(t *testing.T) { t.Run("enqueue", func(t *testing.T) { for i := 0; i < 39; i++ { - if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil { + if err := q.Enqueue(ctx, "", []byte(strconv.Itoa(i))); err != nil { t.Fatal(i, err) } } }) + if err := q.Enqueue(ctx, "other", []byte(strconv.Itoa(100))); err != nil { + t.Fatal(err) + } + t.Run("syn ack", func(t *testing.T) { found := map[string]struct{}{} for i := 0; i < 39; i++ { - if reservation, b, err := q.Syn(ctx); err != nil { + if reservation, b, err := q.Syn(ctx, ""); err != nil { t.Fatal(i, "syn err", err) } else if _, ok := found[string(b)]; ok { t.Errorf("syn'd %q twice (%+v)", b, found) @@ -39,4 +43,10 @@ func TestQueue(t *testing.T) { } } }) + + if reservation, _, err := q.syn(ctx, ""); reservation != nil { + t.Errorf("able to syn 1 more message than created: %v", err) + } else if reservation, _, err := q.syn(ctx, "other"); reservation == nil { + t.Errorf("unable to syn from other topic: %v", err) + } }