diff --git a/persistence.go b/persistence.go index 9985a78..eacfabe 100644 --- a/persistence.go +++ b/persistence.go @@ -26,7 +26,6 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e if err != nil { return Pipeline{}, err } - writer = NewNoopQueue() return Pipeline{ writer: writer, reader: reader, diff --git a/slackscrape.go b/slackscrape.go index a46230f..8185cc1 100644 --- a/slackscrape.go +++ b/slackscrape.go @@ -23,15 +23,17 @@ type SlackScrape struct { } func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) { - reader, err := NewQueue(ctx, "slack_channels_to_scrape", cfg.driver) + writer, err := NewQueue(ctx, "new_persistence", cfg.driver) + if err != nil { + return Pipeline{}, err + } + cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver) if err != nil { return Pipeline{}, err } - cfg.slackScrapePipeline.reader = reader - writer := NewNoopQueue() return Pipeline{ writer: writer, - reader: reader, + reader: cfg.slackScrapePipeline.reader, process: newSlackScrapeProcess(cfg), }, nil }