diff --git a/config.go b/config.go index f3bfcae..e00c3a0 100644 --- a/config.go +++ b/config.go @@ -13,25 +13,26 @@ import ( ) type Config struct { - Port int - Debug bool - InitializeSlack bool - SlackToken string - SlackChannels []string - DriverConn string - BasicAuthUser string - BasicAuthPassword string - FillWithTestdata bool - OllamaURL string - OllamaModel string - LocalCheckpoint string - LocalTokenizer string - AssetPattern string - DatacenterPattern string - EventNamePattern string - driver Driver - ai AI - slackToMessagePipeline Pipeline + Port int + Debug bool + InitializeSlack bool + SlackToken string + SlackChannels []string + DriverConn string + BasicAuthUser string + BasicAuthPassword string + FillWithTestdata bool + OllamaURL string + OllamaModel string + LocalCheckpoint string + LocalTokenizer string + AssetPattern string + DatacenterPattern string + EventNamePattern string + driver Driver + ai AI + slackToMessagePipeline Pipeline + messageToPersistencePipeline Pipeline } var ( @@ -131,5 +132,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, } result.slackToMessagePipeline = slackToMessagePipeline + messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result) + if err != nil { + return Config{}, err + } + result.messageToPersistencePipeline = messageToPersistencePipeline + return result, nil } diff --git a/main.go b/main.go index baff566..b649cca 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,7 @@ func run(ctx context.Context, cfg Config) error { select { case <-ctx.Done(): return ctx.Err() - case err := <-processPipelines(ctx, cfg.slackToMessagePipeline): + case err := <-processPipelines(ctx, cfg.slackToMessagePipeline, cfg.messageToPersistencePipeline): return err case err := <-listenAndServe(ctx, cfg): return err @@ -51,7 +51,7 @@ func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline errs := make(chan error) for i := range pipelines { go func(i int) { - defer close(errs) + defer can() select { case errs <- pipelines[i].Process(ctx): case <-ctx.Done(): diff --git a/persistance.go b/persistance.go new file mode 100644 index 0000000..21a832b --- /dev/null +++ b/persistance.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" +) + +type MessageToPersistence struct { + pipeline Pipeline +} + +func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { + reader, err := NewQueue(ctx, "fromSlack", cfg.driver) + if err != nil { + return Pipeline{}, err + } + writer, err := NewQueue(ctx, "fromMessage", cfg.driver) + if err != nil { + return Pipeline{}, err + } + return Pipeline{ + writer: writer, + reader: reader, + process: newMessageToPersistenceProcess(cfg), + }, nil +} + +func newMessageToPersistenceProcess(cfg Config) processFunc { + return func(ctx context.Context, slack []byte) ([]byte, error) { + m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) + if err != nil { + return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack) + } + return m.Serialize(), nil + } +}