Compare commits

..

9 Commits

Author SHA1 Message Date
Bel LaPointe
3c62411927 wip normalize 2024-04-15 17:09:31 -06:00
Bel LaPointe
c84d80e8d3 test message to persistence 2024-04-15 17:00:15 -06:00
Bel LaPointe
74477fc09c data design time procrastination 2024-04-15 16:38:01 -06:00
Bel LaPointe
1fd4b72b22 todo 2024-04-15 16:36:25 -06:00
Bel LaPointe
d9244e4e1c t.Parallel pls 2024-04-15 16:35:51 -06:00
Bel LaPointe
c9d3b4998b sql :memory: dont work so make a helper NewTestDriver 2024-04-15 16:34:19 -06:00
Bel LaPointe
c5e1556f61 sp 2024-04-15 16:26:24 -06:00
Bel LaPointe
d76f8e2c15 stub second pipeline 2024-04-15 16:26:16 -06:00
Bel LaPointe
ff280997b1 main can run many pipelines 2024-04-15 16:23:41 -06:00
16 changed files with 217 additions and 40 deletions

View File

@@ -14,18 +14,21 @@ import (
)
func TestAINoop(t *testing.T) {
t.Parallel()
ai := NewAINoop()
testAI(t, ai)
}
func TestAIOllama(t *testing.T) {
t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai)
}
func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir()
checkpoints := "checkpoints"
tokenizer := "tokenizer"

View File

@@ -32,6 +32,8 @@ type Config struct {
driver Driver
ai AI
slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
}
var (
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
}
result.slackToMessagePipeline = slackToMessagePipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil
}

View File

@@ -6,6 +6,7 @@ import (
)
func TestNewConfig(t *testing.T) {
t.Parallel()
if got, err := newConfigFromEnv(context.Background(), func(k string) string {
t.Logf("getenv(%s)", k)
switch k {

View File

@@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"net/url"
"path"
"testing"
_ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq"
@@ -15,6 +17,15 @@ type Driver struct {
*sql.DB
}
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { driver.Close() })
return driver
}
func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {

View File

@@ -6,7 +6,13 @@ import (
"time"
)
func TestNewTestDriver(t *testing.T) {
t.Parallel()
NewTestDriver(t)
}
func TestDriver(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()

22
main.go
View File

@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg):
case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
):
return err
case err := <-listenAndServe(ctx, cfg):
return err
}
}
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first)
errs := make(chan error)
go func() {
defer close(errs)
for i := range pipelines {
go func(i int) {
defer can()
select {
case errs <- cfg.slackToMessagePipeline.Process(ctx):
case errs <- pipelines[i].Process(ctx):
case <-ctx.Done():
}
}()
}(i)
}
return errs
}

View File

@@ -19,6 +19,7 @@ import (
)
func TestRun(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port
cfg.driver, _ = NewDriver(ctx, "")
cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"}

View File

@@ -7,6 +7,7 @@ import (
)
func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct {
inCh string
slackMessage slackMessage

32
normalize.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

18
normalize_test.go Normal file
View File

@@ -0,0 +1,18 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

47
persistence.go Normal file
View File

@@ -0,0 +1,47 @@
package main
import (
"context"
"fmt"
)
type MessageToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
}
}

34
persistence_test.go Normal file
View File

@@ -0,0 +1,34 @@
package main
import (
"context"
"testing"
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
}

View File

@@ -7,16 +7,15 @@ import (
)
func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driverOutput, _ := NewDriver(ctx, "")
output, err := NewQueue(ctx, "output", driverOutput)
output, err := NewQueue(ctx, "output", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
driverInput, _ := NewDriver(ctx, "")
input, err := NewQueue(ctx, "input", driverInput)
input, err := NewQueue(ctx, "input", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}

View File

@@ -8,15 +8,15 @@ import (
)
func TestQueue(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
driver, _ := NewDriver(ctx, "")
q, err := NewQueue(ctx, "", driver)
q, err := NewQueue(ctx, "", NewTestDriver(t))
if err != nil {
t.Fatal(err)
}
qOther, _ := NewQueue(ctx, "other", driver)
qOther, _ := NewQueue(ctx, "other", q.driver)
if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn before any enqueues created: %v", err)

View File

@@ -10,11 +10,11 @@ type SlackToMessage struct {
}
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
writer, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}

View File

@@ -10,11 +10,11 @@ import (
)
func TestSlackToMessagePipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can()
driver, _ := NewDriver(ctx, "/tmp/f")
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
if err != nil {
t.Fatal(err)
}