rename ingest to pipeline
parent
d792626c2f
commit
80df07089f
|
|
@ -3,7 +3,7 @@ package main
|
||||||
import "context"
|
import "context"
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Ingester struct {
|
Pipeline struct {
|
||||||
writer Queue
|
writer Queue
|
||||||
reader Queue
|
reader Queue
|
||||||
process processFunc
|
process processFunc
|
||||||
|
|
@ -11,31 +11,31 @@ type (
|
||||||
processFunc func(context.Context, []byte) ([]byte, error)
|
processFunc func(context.Context, []byte) ([]byte, error)
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewIngester(writer, reader Queue, process processFunc) Ingester {
|
func NewPipeline(writer, reader Queue, process processFunc) Pipeline {
|
||||||
return Ingester{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: process,
|
process: process,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i Ingester) Process(ctx context.Context) error {
|
func (p Pipeline) Process(ctx context.Context) error {
|
||||||
ctx, can := context.WithCancel(ctx)
|
ctx, can := context.WithCancel(ctx)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
reservation, read, err := i.reader.Syn(ctx)
|
reservation, read, err := p.reader.Syn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
processed, err := i.process(ctx, read)
|
processed, err := p.process(ctx, read)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := i.writer.Enqueue(ctx, processed); err != nil {
|
if err := p.writer.Enqueue(ctx, processed); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := i.reader.Ack(ctx, reservation); err != nil {
|
if err := p.reader.Ack(ctx, reservation); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIngester(t *testing.T) {
|
func TestPipeline(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
|
|
@ -30,7 +30,7 @@ func TestIngester(t *testing.T) {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ing := NewIngester(output, input, process)
|
ing := NewPipeline(output, input, process)
|
||||||
go func() {
|
go func() {
|
||||||
defer can()
|
defer can()
|
||||||
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
if err := ing.Process(ctx); err != nil && ctx.Err() == nil {
|
||||||
Loading…
Reference in New Issue