From 3c62411927e4dd6b5d25c1ea2340d25bdbf2cd60 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 17:09:31 -0600 Subject: [PATCH] wip normalize --- config.go | 47 +++++++++++++++++++++++++++-------------------- main.go | 6 +++++- normalize.go | 32 ++++++++++++++++++++++++++++++++ normalize_test.go | 18 ++++++++++++++++++ 4 files changed, 82 insertions(+), 21 deletions(-) create mode 100644 normalize.go create mode 100644 normalize_test.go diff --git a/config.go b/config.go index e00c3a0..dea18ab 100644 --- a/config.go +++ b/config.go @@ -13,26 +13,27 @@ import ( ) type Config struct { - Port int - Debug bool - InitializeSlack bool - SlackToken string - SlackChannels []string - DriverConn string - BasicAuthUser string - BasicAuthPassword string - FillWithTestdata bool - OllamaURL string - OllamaModel string - LocalCheckpoint string - LocalTokenizer string - AssetPattern string - DatacenterPattern string - EventNamePattern string - driver Driver - ai AI - slackToMessagePipeline Pipeline - messageToPersistencePipeline Pipeline + Port int + Debug bool + InitializeSlack bool + SlackToken string + SlackChannels []string + DriverConn string + BasicAuthUser string + BasicAuthPassword string + FillWithTestdata bool + OllamaURL string + OllamaModel string + LocalCheckpoint string + LocalTokenizer string + AssetPattern string + DatacenterPattern string + EventNamePattern string + driver Driver + ai AI + slackToMessagePipeline Pipeline + messageToPersistencePipeline Pipeline + persistenceToNormalizedPipeline Pipeline } var ( @@ -138,5 +139,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, } result.messageToPersistencePipeline = messageToPersistencePipeline + persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result) + if err != nil { + return Config{}, err + } + result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline + return result, nil } diff --git a/main.go b/main.go index b649cca..ad7a27a 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,11 @@ func run(ctx context.Context, cfg Config) error { select { case <-ctx.Done(): return ctx.Err() - case err := <-processPipelines(ctx, cfg.slackToMessagePipeline, cfg.messageToPersistencePipeline): + case err := <-processPipelines(ctx, + cfg.slackToMessagePipeline, + cfg.messageToPersistencePipeline, + cfg.persistenceToNormalizedPipeline, + ): return err case err := <-listenAndServe(ctx, cfg): return err diff --git a/normalize.go b/normalize.go new file mode 100644 index 0000000..1f4a474 --- /dev/null +++ b/normalize.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "errors" +) + +type PersistenceToNormalized struct { + pipeline Pipeline +} + +func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) { + reader, err := NewQueue(ctx, "new_message", cfg.driver) + if err != nil { + return Pipeline{}, err + } + writer, err := NewQueue(ctx, "new_persistence", cfg.driver) + if err != nil { + return Pipeline{}, err + } + return Pipeline{ + writer: writer, + reader: reader, + process: newPersistenceToNormalizedProcess(cfg.driver), + }, nil +} + +func newPersistenceToNormalizedProcess(driver Driver) processFunc { + return func(ctx context.Context, msg []byte) ([]byte, error) { + return nil, errors.New("not impl") + } +} diff --git a/normalize_test.go b/normalize_test.go new file mode 100644 index 0000000..4523d5a --- /dev/null +++ b/normalize_test.go @@ -0,0 +1,18 @@ +package main + +import ( + "context" + "testing" + "time" +) + +func TestPersistenceToNormalizedProcessor(t *testing.T) { + t.Parallel() + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + + d := NewTestDriver(t) + process := newPersistenceToNormalizedProcess(d) + + _, _ = ctx, process +}