test ingest
parent
acac2a60b0
commit
d792626c2f
|
|
@ -0,0 +1,52 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIngester(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
driverOutput, _ := NewDriver(ctx, "")
|
||||||
|
output, err := NewQueue(ctx, "output", driverOutput)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
driverInput, _ := NewDriver(ctx, "")
|
||||||
|
input, err := NewQueue(ctx, "input", driverInput)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
found := map[string]struct{}{}
|
||||||
|
process := func(_ context.Context, v []byte) ([]byte, error) {
|
||||||
|
found[string(v)] = struct{}{}
|
||||||
|
return []byte("world"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ing := NewIngester(output, input, process)
|
||||||
|
go func() {
|
||||||
|
defer can()
|
||||||
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if r, p, err := output.Syn(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if string(p) != "world" {
|
||||||
|
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
|
||||||
|
} else if err := output.Ack(ctx, r); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(found) != 1 {
|
||||||
|
t.Error(found)
|
||||||
|
}
|
||||||
|
}
|
||||||
15
queue.go
15
queue.go
|
|
@ -89,27 +89,20 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||||
} else if n, err := result.RowsAffected(); err != nil {
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||||
} else if n == 0 {
|
} else if n == 0 {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: zero updates")
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := q.driver.QueryContext(ctx, `
|
row := q.driver.QueryRowContext(ctx, `
|
||||||
SELECT payload
|
SELECT payload
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE reservation==?
|
WHERE reservation==?
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`, reservation)
|
`, reservation)
|
||||||
if err != nil {
|
if err := row.Err(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
||||||
}
|
} else if err := row.Scan(&payload); err != nil {
|
||||||
defer rows.Close()
|
|
||||||
for rows.Next() {
|
|
||||||
if err := rows.Scan(&payload); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to page reservation: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return reservation, payload, nil
|
return reservation, payload, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,12 @@ func TestQueue(t *testing.T) {
|
||||||
}
|
}
|
||||||
qOther, _ := NewQueue(ctx, "other", driver)
|
qOther, _ := NewQueue(ctx, "other", driver)
|
||||||
|
|
||||||
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
|
t.Errorf("able to syn before any enqueues created: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Logf("sync before enqueues: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("enqueue", func(t *testing.T) {
|
t.Run("enqueue", func(t *testing.T) {
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
||||||
|
|
@ -49,6 +55,8 @@ func TestQueue(t *testing.T) {
|
||||||
t.Errorf("able to syn 1 more message than created: %v", err)
|
t.Errorf("able to syn 1 more message than created: %v", err)
|
||||||
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
|
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
|
||||||
t.Errorf("unable to syn from other topic: %v", err)
|
t.Errorf("unable to syn from other topic: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Logf("empty q.syn = %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("noop", func(t *testing.T) {
|
t.Run("noop", func(t *testing.T) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue