spoc-bot-vr/pipeline_test.go

52 lines
1.0 KiB
Go

package main
import (
"context"
"testing"
"time"
)
func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
output, err := NewQueue(ctx, "output", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
input, err := NewQueue(ctx, "input", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
found := map[string]struct{}{}
process := func(_ context.Context, v []byte) ([]byte, error) {
found[string(v)] = struct{}{}
return []byte("world"), nil
}
if err := input.Enqueue(ctx, []byte("hello")); err != nil {
t.Error(err)
}
ing := NewPipeline(output, input, process)
go func() {
defer can()
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}()
if r, p, err := output.Syn(ctx); err != nil {
t.Error(err)
} else if string(p) != "world" {
t.Errorf("Syn() = (%q, %q, %v)", r, p, err)
} else if err := output.Ack(ctx, r); err != nil {
t.Error(err)
}
if len(found) != 1 {
t.Error(found)
}
}