stub second pipeline

main
Bel LaPointe 2024-04-15 16:26:16 -06:00
parent ff280997b1
commit d76f8e2c15
3 changed files with 64 additions and 21 deletions

View File

@ -32,6 +32,7 @@ type Config struct {
driver Driver driver Driver
ai AI ai AI
slackToMessagePipeline Pipeline slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
} }
var ( var (
@ -131,5 +132,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.slackToMessagePipeline = slackToMessagePipeline result.slackToMessagePipeline = slackToMessagePipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
return result, nil return result, nil
} }

View File

@ -36,7 +36,7 @@ func run(ctx context.Context, cfg Config) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-processPipelines(ctx, cfg.slackToMessagePipeline): case err := <-processPipelines(ctx, cfg.slackToMessagePipeline, cfg.messageToPersistencePipeline):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
return err return err
@ -51,7 +51,7 @@ func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline
errs := make(chan error) errs := make(chan error)
for i := range pipelines { for i := range pipelines {
go func(i int) { go func(i int) {
defer close(errs) defer can()
select { select {
case errs <- pipelines[i].Process(ctx): case errs <- pipelines[i].Process(ctx):
case <-ctx.Done(): case <-ctx.Done():

36
persistance.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"context"
"fmt"
)
type MessageToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg),
}, nil
}
func newMessageToPersistenceProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if err != nil {
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
}
return m.Serialize(), nil
}
}