wip normalize

main
Bel LaPointe 2024-04-15 17:09:31 -06:00
parent c84d80e8d3
commit 3c62411927
4 changed files with 82 additions and 21 deletions

View File

@ -13,26 +13,27 @@ import (
)
type Config struct {
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
}
var (
@ -138,5 +139,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil
}

View File

@ -36,7 +36,11 @@ func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-processPipelines(ctx, cfg.slackToMessagePipeline, cfg.messageToPersistencePipeline):
case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
return err

32
normalize.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

18
normalize_test.go Normal file
View File

@ -0,0 +1,18 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}