From 80df07089fe7ff8ca99fa358d506f3da64cc7d2b Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:22:23 -0600 Subject: [PATCH] rename ingest to pipeline --- ingest.go => pipeline.go | 16 ++++++++-------- ingest_test.go => pipeline_test.go | 4 ++-- slack.go | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 10 deletions(-) rename ingest.go => pipeline.go (53%) rename ingest_test.go => pipeline_test.go (92%) create mode 100644 slack.go diff --git a/ingest.go b/pipeline.go similarity index 53% rename from ingest.go rename to pipeline.go index 3f07f58..06f9401 100644 --- a/ingest.go +++ b/pipeline.go @@ -3,7 +3,7 @@ package main import "context" type ( - Ingester struct { + Pipeline struct { writer Queue reader Queue process processFunc @@ -11,31 +11,31 @@ type ( processFunc func(context.Context, []byte) ([]byte, error) ) -func NewIngester(writer, reader Queue, process processFunc) Ingester { - return Ingester{ +func NewPipeline(writer, reader Queue, process processFunc) Pipeline { + return Pipeline{ writer: writer, reader: reader, process: process, } } -func (i Ingester) Process(ctx context.Context) error { +func (p Pipeline) Process(ctx context.Context) error { ctx, can := context.WithCancel(ctx) defer can() for ctx.Err() == nil { - reservation, read, err := i.reader.Syn(ctx) + reservation, read, err := p.reader.Syn(ctx) if err != nil { return err } - processed, err := i.process(ctx, read) + processed, err := p.process(ctx, read) if err != nil { return err } - if err := i.writer.Enqueue(ctx, processed); err != nil { + if err := p.writer.Enqueue(ctx, processed); err != nil { return err } - if err := i.reader.Ack(ctx, reservation); err != nil { + if err := p.reader.Ack(ctx, reservation); err != nil { return err } } diff --git a/ingest_test.go b/pipeline_test.go similarity index 92% rename from ingest_test.go rename to pipeline_test.go index 4eac6fc..5c0622a 100644 --- a/ingest_test.go +++ b/pipeline_test.go @@ -6,7 +6,7 @@ import ( "time" ) -func TestIngester(t *testing.T) { +func TestPipeline(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() @@ -30,7 +30,7 @@ func TestIngester(t *testing.T) { t.Error(err) } - ing := NewIngester(output, input, process) + ing := NewPipeline(output, input, process) go func() { defer can() if err := ing.Process(ctx); err != nil && ctx.Err() == nil { diff --git a/slack.go b/slack.go new file mode 100644 index 0000000..eae6505 --- /dev/null +++ b/slack.go @@ -0,0 +1,14 @@ +package main + +import ( + "context" + "io" +) + +type SlackIngestion struct { + slackToMessage Pipeline +} + +func NewSlackIngestion(ctx context.Context, driver Driver) (SlackIngestion, error) { + return SlackIngestion{}, io.EOF +}