Compare commits

...

9 Commits

Author SHA1 Message Date
Bel LaPointe
3c62411927 wip normalize 2024-04-15 17:09:31 -06:00
Bel LaPointe
c84d80e8d3 test message to persistence 2024-04-15 17:00:15 -06:00
Bel LaPointe
74477fc09c data design time procrastination 2024-04-15 16:38:01 -06:00
Bel LaPointe
1fd4b72b22 todo 2024-04-15 16:36:25 -06:00
Bel LaPointe
d9244e4e1c t.Parallel pls 2024-04-15 16:35:51 -06:00
Bel LaPointe
c9d3b4998b sql :memory: dont work so make a helper NewTestDriver 2024-04-15 16:34:19 -06:00
Bel LaPointe
c5e1556f61 sp 2024-04-15 16:26:24 -06:00
Bel LaPointe
d76f8e2c15 stub second pipeline 2024-04-15 16:26:16 -06:00
Bel LaPointe
ff280997b1 main can run many pipelines 2024-04-15 16:23:41 -06:00
16 changed files with 217 additions and 40 deletions

View File

@@ -14,18 +14,21 @@ import (
)
func TestAINoop(t *testing.T) {
t.Parallel()
ai := NewAINoop()
testAI(t, ai)
}
func TestAIOllama(t *testing.T) {
t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai)
}
func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir()
checkpoints := "checkpoints"
tokenizer := "tokenizer"

View File

@@ -13,25 +13,27 @@ import (
)
type Config struct {
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
Port int
Debug bool
InitializeSlack bool
SlackToken string
SlackChannels []string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
OllamaURL string
OllamaModel string
LocalCheckpoint string
LocalTokenizer string
AssetPattern string
DatacenterPattern string
EventNamePattern string
driver Driver
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
}
var (
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.slackToMessagePipeline = slackToMessagePipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil
}

View File

@@ -6,6 +6,7 @@ import (
)
func TestNewConfig(t *testing.T) {
t.Parallel()
if got, err := newConfigFromEnv(context.Background(), func(k string) string {
t.Logf("getenv(%s)", k)
switch k {

View File

@@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"net/url"
"path"
"testing"
_ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq"
@@ -15,6 +17,15 @@ type Driver struct {
*sql.DB
}
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { driver.Close() })
return driver
}
func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {

View File

@@ -6,7 +6,13 @@ import (
"time"
)
func TestNewTestDriver(t *testing.T) {
t.Parallel()
NewTestDriver(t)
}
func TestDriver(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()

28
main.go
View File

@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg):
case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
return err
}
}
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first)
errs := make(chan error)
go func() {
defer close(errs)
select {
case errs <- cfg.slackToMessagePipeline.Process(ctx):
case <-ctx.Done():
}
}()
for i := range pipelines {
go func(i int) {
defer can()
select {
case errs <- pipelines[i].Process(ctx):
case <-ctx.Done():
}
}(i)
}
return errs
}

View File

@@ -19,6 +19,7 @@ import (
)
func TestRun(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port
cfg.driver, _ = NewDriver(ctx, "")
cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}

View File

@@ -7,6 +7,7 @@ import (
)
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
inCh string
slackMessage slackMessage

32
normalize.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

18
normalize_test.go Normal file
View File

@@ -0,0 +1,18 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

47
persistence.go Normal file
View File

@@ -0,0 +1,47 @@
package main
import (
"context"
"fmt"
)
type MessageToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
}
}

34
persistence_test.go Normal file
View File

@@ -0,0 +1,34 @@
package main
import (
"context"
"testing"
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
}

View File

@@ -7,16 +7,15 @@ import (
)
func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driverOutput, _ := NewDriver(ctx, "")
output, err := NewQueue(ctx, "output", driverOutput)
output, err := NewQueue(ctx, "output", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
driverInput, _ := NewDriver(ctx, "")
input, err := NewQueue(ctx, "input", driverInput)
input, err := NewQueue(ctx, "input", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}

View File

@@ -8,15 +8,15 @@ import (
)
func TestQueue(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driver, _ := NewDriver(ctx, "")
q, err := NewQueue(ctx, "", driver)
q, err := NewQueue(ctx, "", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
qOther, _ := NewQueue(ctx, "other", driver)
qOther, _ := NewQueue(ctx, "other", q.driver)
if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn before any enqueues created: %v", err)

View File

@@ -10,11 +10,11 @@ type SlackToMessage struct {
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
writer, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}

View File

@@ -10,11 +10,11 @@ import (
)
func TestSlackToMessagePipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
driver, _ := NewDriver(ctx, "/tmp/f")
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
if err != nil {
t.Fatal(err)
}