Compare commits

...

9 Commits

Author SHA1 Message Date
Bel LaPointe
3c62411927 wip normalize 2024-04-15 17:09:31 -06:00
Bel LaPointe
c84d80e8d3 test message to persistence 2024-04-15 17:00:15 -06:00
Bel LaPointe
74477fc09c data design time procrastination 2024-04-15 16:38:01 -06:00
Bel LaPointe
1fd4b72b22 todo 2024-04-15 16:36:25 -06:00
Bel LaPointe
d9244e4e1c t.Parallel pls 2024-04-15 16:35:51 -06:00
Bel LaPointe
c9d3b4998b sql :memory: dont work so make a helper NewTestDriver 2024-04-15 16:34:19 -06:00
Bel LaPointe
c5e1556f61 sp 2024-04-15 16:26:24 -06:00
Bel LaPointe
d76f8e2c15 stub second pipeline 2024-04-15 16:26:16 -06:00
Bel LaPointe
ff280997b1 main can run many pipelines 2024-04-15 16:23:41 -06:00
16 changed files with 217 additions and 40 deletions

View File

@@ -14,18 +14,21 @@ import (
) )
func TestAINoop(t *testing.T) { func TestAINoop(t *testing.T) {
t.Parallel()
ai := NewAINoop() ai := NewAINoop()
testAI(t, ai) testAI(t, ai)
} }
func TestAIOllama(t *testing.T) { func TestAIOllama(t *testing.T) {
t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b") ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai) testAI(t, ai)
} }
func TestAILocal(t *testing.T) { func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir() d := os.TempDir()
checkpoints := "checkpoints" checkpoints := "checkpoints"
tokenizer := "tokenizer" tokenizer := "tokenizer"

View File

@@ -32,6 +32,8 @@ type Config struct {
driver Driver driver Driver
ai AI ai AI
slackToMessagePipeline Pipeline slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
} }
var ( var (
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.slackToMessagePipeline = slackToMessagePipeline result.slackToMessagePipeline = slackToMessagePipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil return result, nil
} }

View File

@@ -6,6 +6,7 @@ import (
) )
func TestNewConfig(t *testing.T) { func TestNewConfig(t *testing.T) {
t.Parallel()
if got, err := newConfigFromEnv(context.Background(), func(k string) string { if got, err := newConfigFromEnv(context.Background(), func(k string) string {
t.Logf("getenv(%s)", k) t.Logf("getenv(%s)", k)
switch k { switch k {

View File

@@ -6,6 +6,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"path"
"testing"
_ "github.com/glebarez/go-sqlite" _ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq" _ "github.com/lib/pq"
@@ -15,6 +17,15 @@ type Driver struct {
*sql.DB *sql.DB
} }
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { driver.Close() })
return driver
}
func NewDriver(ctx context.Context, conn string) (Driver, error) { func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite" engine := "sqlite"
if conn == "" { if conn == "" {

View File

@@ -6,7 +6,13 @@ import (
"time" "time"
) )
func TestNewTestDriver(t *testing.T) {
t.Parallel()
NewTestDriver(t)
}
func TestDriver(t *testing.T) { func TestDriver(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*15) ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can() defer can()

22
main.go
View File

@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg): case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
return err return err
} }
} }
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error { func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first)
errs := make(chan error) errs := make(chan error)
go func() { for i := range pipelines {
defer close(errs) go func(i int) {
defer can()
select { select {
case errs <- cfg.slackToMessagePipeline.Process(ctx): case errs <- pipelines[i].Process(ctx):
case <-ctx.Done(): case <-ctx.Done():
} }
}() }(i)
}
return errs return errs
} }

View File

@@ -19,6 +19,7 @@ import (
) )
func TestRun(t *testing.T) { func TestRun(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
cfg.AssetPattern = renderAssetPattern cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port cfg.Port = port
cfg.driver, _ = NewDriver(ctx, "") cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg) cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}

View File

@@ -7,6 +7,7 @@ import (
) )
func TestParseSlackTestdata(t *testing.T) { func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct { cases := map[string]struct {
inCh string inCh string
slackMessage slackMessage slackMessage slackMessage

32
normalize.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

18
normalize_test.go Normal file
View File

@@ -0,0 +1,18 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

47
persistence.go Normal file
View File

@@ -0,0 +1,47 @@
package main
import (
"context"
"fmt"
)
type MessageToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
}
}

34
persistence_test.go Normal file
View File

@@ -0,0 +1,34 @@
package main
import (
"context"
"testing"
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
}

View File

@@ -7,16 +7,15 @@ import (
) )
func TestPipeline(t *testing.T) { func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
driverOutput, _ := NewDriver(ctx, "") output, err := NewQueue(ctx, "output", NewTestDriver(t))
output, err := NewQueue(ctx, "output", driverOutput)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
driverInput, _ := NewDriver(ctx, "") input, err := NewQueue(ctx, "input", NewTestDriver(t))
input, err := NewQueue(ctx, "input", driverInput)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -8,15 +8,15 @@ import (
) )
func TestQueue(t *testing.T) { func TestQueue(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
driver, _ := NewDriver(ctx, "") q, err := NewQueue(ctx, "", NewTestDriver(t))
q, err := NewQueue(ctx, "", driver)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
qOther, _ := NewQueue(ctx, "other", driver) qOther, _ := NewQueue(ctx, "other", q.driver)
if reservation, _, err := q.syn(ctx); reservation != nil { if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn before any enqueues created: %v", err) t.Errorf("able to syn before any enqueues created: %v", err)

View File

@@ -10,11 +10,11 @@ type SlackToMessage struct {
} }
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) { func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver) reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
writer, err := NewQueue(ctx, "fromMessage", cfg.driver) writer, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }

View File

@@ -10,11 +10,11 @@ import (
) )
func TestSlackToMessagePipeline(t *testing.T) { func TestSlackToMessagePipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5) ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can() defer can()
driver, _ := NewDriver(ctx, "/tmp/f") pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }