package main import ( "context" "strconv" "testing" "time" ) func TestQueue(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() q, err := NewQueue(ctx, "", NewTestDriver(t)) if err != nil { t.Fatal(err) } qOther, _ := NewQueue(ctx, "other", q.driver) if reservation, _, err := q.syn(ctx); reservation != nil { t.Errorf("able to syn before any enqueues created: %v", err) } else { t.Logf("sync before enqueues: %v", err) } t.Run("enqueue", func(t *testing.T) { for i := 0; i < 39; i++ { if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil { t.Fatal(i, err) } } }) if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil { t.Fatal(err) } t.Run("syn ack", func(t *testing.T) { found := map[string]struct{}{} for i := 0; i < 39; i++ { if reservation, b, err := q.Syn(ctx); err != nil { t.Fatal(i, "syn err", err) } else if _, ok := found[string(b)]; ok { t.Errorf("syn'd %q twice (%+v)", b, found) } else if err := q.Ack(ctx, reservation); err != nil { t.Fatal(i, "failed to ack", err) } else { found[string(b)] = struct{}{} } } }) if reservation, _, err := q.syn(ctx); reservation != nil { t.Errorf("able to syn 1 more message than created: %v", err) } else if reservation, _, err := qOther.syn(ctx); reservation == nil { t.Errorf("unable to syn from other topic: %v", err) } else { t.Logf("empty q.syn = %v", err) } t.Run("noop", func(t *testing.T) { q := NewNoopQueue() if err := q.Enqueue(nil, nil); err != nil { t.Error(err) } if _, _, err := q.Syn(nil); err != nil { t.Error(err) } if err := q.Ack(nil, ""); err != nil { t.Error(err) } }) }