package main import ( "context" "testing" "time" ) func TestPipeline(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() driverOutput, _ := NewDriver(ctx, "") output, err := NewQueue(ctx, "output", driverOutput) if err != nil { t.Fatal(err) } driverInput, _ := NewDriver(ctx, "") input, err := NewQueue(ctx, "input", driverInput) if err != nil { t.Fatal(err) } found := map[string]struct{}{} process := func(_ context.Context, v []byte) ([]byte, error) { found[string(v)] = struct{}{} return []byte("world"), nil } if err := input.Enqueue(ctx, []byte("hello")); err != nil { t.Error(err) } ing := NewPipeline(output, input, process) go func() { defer can() if err := ing.Process(ctx); err != nil && ctx.Err() == nil { t.Fatal(err) } }() if r, p, err := output.Syn(ctx); err != nil { t.Error(err) } else if string(p) != "world" { t.Errorf("Syn() = (%q, %q, %v)", r, p, err) } else if err := output.Ack(ctx, r); err != nil { t.Error(err) } if len(found) != 1 { t.Error(found) } }