package main import ( "context" "log" ) type ( Pipeline struct { writer Queue reader Queue process processFunc } processFunc func(context.Context, []byte) ([]byte, error) ) func NewPipeline(writer, reader Queue, process processFunc) Pipeline { return Pipeline{ writer: writer, reader: reader, process: process, } } func (p Pipeline) Process(ctx context.Context) error { ctx, can := context.WithCancel(ctx) defer can() err := p.processUntilErr(ctx) if err != nil { log.Printf("pipeline failed to process: %v", err) } return err } func (p Pipeline) processUntilErr(ctx context.Context) error { for ctx.Err() == nil { reservation, read, err := p.reader.Syn(ctx) if err != nil { return err } processed, err := p.process(ctx, read) if err != nil { return err } if processed == nil { } else if err := p.writer.Enqueue(ctx, processed); err != nil { return err } if err := p.reader.Ack(ctx, reservation); err != nil { return err } } return ctx.Err() }