From d792626c2fed9b7f78d63e71afe1fd815bf0f9f0 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:18:03 -0600 Subject: [PATCH] test ingest --- ingest_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ queue.go | 17 +++++------------ queue_test.go | 8 ++++++++ 3 files changed, 65 insertions(+), 12 deletions(-) create mode 100644 ingest_test.go diff --git a/ingest_test.go b/ingest_test.go new file mode 100644 index 0000000..4eac6fc --- /dev/null +++ b/ingest_test.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "testing" + "time" +) + +func TestIngester(t *testing.T) { + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + + driverOutput, _ := NewDriver(ctx, "") + output, err := NewQueue(ctx, "output", driverOutput) + if err != nil { + t.Fatal(err) + } + driverInput, _ := NewDriver(ctx, "") + input, err := NewQueue(ctx, "input", driverInput) + if err != nil { + t.Fatal(err) + } + found := map[string]struct{}{} + process := func(_ context.Context, v []byte) ([]byte, error) { + found[string(v)] = struct{}{} + return []byte("world"), nil + } + + if err := input.Enqueue(ctx, []byte("hello")); err != nil { + t.Error(err) + } + + ing := NewIngester(output, input, process) + go func() { + defer can() + if err := ing.Process(ctx); err != nil && ctx.Err() == nil { + t.Fatal(err) + } + }() + + if r, p, err := output.Syn(ctx); err != nil { + t.Error(err) + } else if string(p) != "world" { + t.Errorf("Syn() = (%q, %q, %v)", r, p, err) + } else if err := output.Ack(ctx, r); err != nil { + t.Error(err) + } + + if len(found) != 1 { + t.Error(found) + } +} diff --git a/queue.go b/queue.go index 7b75571..ed98d06 100644 --- a/queue.go +++ b/queue.go @@ -89,26 +89,19 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { } else if n, err := result.RowsAffected(); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) } else if n == 0 { - return nil, nil, fmt.Errorf("failed to assign reservation: zero updates") + return nil, nil, nil } - rows, err := q.driver.QueryContext(ctx, ` + row := q.driver.QueryRowContext(ctx, ` SELECT payload FROM queue WHERE reservation==? LIMIT 1 `, reservation) - if err != nil { + if err := row.Err(); err != nil { return nil, nil, fmt.Errorf("failed to query reservation: %w", err) - } - defer rows.Close() - for rows.Next() { - if err := rows.Scan(&payload); err != nil { - return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) - } - } - if err := rows.Err(); err != nil { - return nil, nil, fmt.Errorf("failed to page reservation: %w", err) + } else if err := row.Scan(&payload); err != nil { + return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) } return reservation, payload, nil diff --git a/queue_test.go b/queue_test.go index bbd41e3..dfbb328 100644 --- a/queue_test.go +++ b/queue_test.go @@ -18,6 +18,12 @@ func TestQueue(t *testing.T) { } qOther, _ := NewQueue(ctx, "other", driver) + if reservation, _, err := q.syn(ctx); reservation != nil { + t.Errorf("able to syn before any enqueues created: %v", err) + } else { + t.Logf("sync before enqueues: %v", err) + } + t.Run("enqueue", func(t *testing.T) { for i := 0; i < 39; i++ { if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil { @@ -49,6 +55,8 @@ func TestQueue(t *testing.T) { t.Errorf("able to syn 1 more message than created: %v", err) } else if reservation, _, err := qOther.syn(ctx); reservation == nil { t.Errorf("unable to syn from other topic: %v", err) + } else { + t.Logf("empty q.syn = %v", err) } t.Run("noop", func(t *testing.T) {