93 lines
1.9 KiB
Go
93 lines
1.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestPipelineDoesntPushEmptyMessage(t *testing.T) {
|
|
t.Parallel()
|
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer can()
|
|
|
|
output, _ := NewQueue(ctx, "output", NewTestDriver(t))
|
|
input, _ := NewQueue(ctx, "input", NewTestDriver(t))
|
|
|
|
calls := 0
|
|
process := func(_ context.Context, v []byte) ([]byte, error) {
|
|
calls += 1
|
|
return nil, nil
|
|
}
|
|
|
|
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
ing := NewPipeline(output, input, process)
|
|
go func() {
|
|
defer can()
|
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
|
t.Fatal(err)
|
|
}
|
|
}()
|
|
|
|
for ctx.Err() == nil {
|
|
if calls != 0 {
|
|
break
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-time.After(time.Millisecond * 100):
|
|
}
|
|
}
|
|
|
|
if r, _, _ := output.syn(ctx); len(r) != 0 {
|
|
t.Error("something was pushed to out queue even though processor didnt emit content")
|
|
}
|
|
}
|
|
|
|
func TestPipeline(t *testing.T) {
|
|
t.Parallel()
|
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer can()
|
|
|
|
output, err := NewQueue(ctx, "output", NewTestDriver(t))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
input, err := NewQueue(ctx, "input", NewTestDriver(t))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
found := map[string]struct{}{}
|
|
process := func(_ context.Context, v []byte) ([]byte, error) {
|
|
found[string(v)] = struct{}{}
|
|
return []byte("world"), nil
|
|
}
|
|
|
|
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
ing := NewPipeline(output, input, process)
|
|
go func() {
|
|
defer can()
|
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
|
t.Fatal(err)
|
|
}
|
|
}()
|
|
|
|
if r, p, err := output.Syn(ctx); err != nil {
|
|
t.Error(err)
|
|
} else if string(p) != "world" {
|
|
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
|
|
} else if err := output.Ack(ctx, r); err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
if len(found) != 1 {
|
|
t.Error(found)
|
|
}
|
|
}
|