package main import ( "context" "testing" "time" ) func TestPipelineDoesntPushEmptyMessage(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() output, _ := NewQueue(ctx, "output", NewTestDriver(t)) input, _ := NewQueue(ctx, "input", NewTestDriver(t)) calls := 0 process := func(_ context.Context, v []byte) ([]byte, error) { calls += 1 return nil, nil } if err := input.Enqueue(ctx, []byte("hello")); err != nil { t.Error(err) } ing := NewPipeline(output, input, process) go func() { defer can() if err := ing.Process(ctx); err != nil && ctx.Err() == nil { t.Fatal(err) } }() for ctx.Err() == nil { if calls != 0 { break } select { case <-ctx.Done(): case <-time.After(time.Millisecond * 100): } } if r, _, _ := output.syn(ctx); len(r) != 0 { t.Error("something was pushed to out queue even though processor didnt emit content") } } func TestPipeline(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() output, err := NewQueue(ctx, "output", NewTestDriver(t)) if err != nil { t.Fatal(err) } input, err := NewQueue(ctx, "input", NewTestDriver(t)) if err != nil { t.Fatal(err) } found := map[string]struct{}{} process := func(_ context.Context, v []byte) ([]byte, error) { found[string(v)] = struct{}{} return []byte("world"), nil } if err := input.Enqueue(ctx, []byte("hello")); err != nil { t.Error(err) } ing := NewPipeline(output, input, process) go func() { defer can() if err := ing.Process(ctx); err != nil && ctx.Err() == nil { t.Fatal(err) } }() if r, p, err := output.Syn(ctx); err != nil { t.Error(err) } else if string(p) != "world" { t.Errorf("Syn() = (%q, %q, %v)", r, p, err) } else if err := output.Ack(ctx, r); err != nil { t.Error(err) } if len(found) != 1 { t.Error(found) } }