75 lines
1.7 KiB
Go
75 lines
1.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestQueue(t *testing.T) {
|
|
t.Parallel()
|
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer can()
|
|
|
|
q, err := NewQueue(ctx, "", NewTestDriver(t))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
qOther, _ := NewQueue(ctx, "other", q.driver)
|
|
|
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
|
t.Errorf("able to syn before any enqueues created: %v", err)
|
|
} else {
|
|
t.Logf("sync before enqueues: %v", err)
|
|
}
|
|
|
|
t.Run("enqueue", func(t *testing.T) {
|
|
for i := 0; i < 39; i++ {
|
|
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
|
t.Fatal(i, err)
|
|
}
|
|
}
|
|
})
|
|
|
|
if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
t.Run("syn ack", func(t *testing.T) {
|
|
found := map[string]struct{}{}
|
|
for i := 0; i < 39; i++ {
|
|
if reservation, b, err := q.Syn(ctx); err != nil {
|
|
t.Fatal(i, "syn err", err)
|
|
} else if _, ok := found[string(b)]; ok {
|
|
t.Errorf("syn'd %q twice (%+v)", b, found)
|
|
} else if err := q.Ack(ctx, reservation); err != nil {
|
|
t.Fatal(i, "failed to ack", err)
|
|
} else {
|
|
found[string(b)] = struct{}{}
|
|
}
|
|
}
|
|
})
|
|
|
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
|
t.Errorf("able to syn 1 more message than created: %v", err)
|
|
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
|
|
t.Errorf("unable to syn from other topic: %v", err)
|
|
} else {
|
|
t.Logf("empty q.syn = %v", err)
|
|
}
|
|
|
|
t.Run("noop", func(t *testing.T) {
|
|
q := NewNoopQueue()
|
|
if err := q.Enqueue(nil, nil); err != nil {
|
|
t.Error(err)
|
|
}
|
|
if _, _, err := q.Syn(nil); err != nil {
|
|
t.Error(err)
|
|
}
|
|
if err := q.Ack(nil, ""); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
}
|