Compare commits

...

9 Commits

Author SHA1 Message Date
Bel LaPointe
3c62411927 wip normalize 2024-04-15 17:09:31 -06:00
Bel LaPointe
c84d80e8d3 test message to persistence 2024-04-15 17:00:15 -06:00
Bel LaPointe
74477fc09c data design time procrastination 2024-04-15 16:38:01 -06:00
Bel LaPointe
1fd4b72b22 todo 2024-04-15 16:36:25 -06:00
Bel LaPointe
d9244e4e1c t.Parallel pls 2024-04-15 16:35:51 -06:00
Bel LaPointe
c9d3b4998b sql :memory: dont work so make a helper NewTestDriver 2024-04-15 16:34:19 -06:00
Bel LaPointe
c5e1556f61 sp 2024-04-15 16:26:24 -06:00
Bel LaPointe
d76f8e2c15 stub second pipeline 2024-04-15 16:26:16 -06:00
Bel LaPointe
ff280997b1 main can run many pipelines 2024-04-15 16:23:41 -06:00
16 changed files with 217 additions and 40 deletions

View File

@@ -14,18 +14,21 @@ import (
) )
func TestAINoop(t *testing.T) { func TestAINoop(t *testing.T) {
t.Parallel()
ai := NewAINoop() ai := NewAINoop()
testAI(t, ai) testAI(t, ai)
} }
func TestAIOllama(t *testing.T) { func TestAIOllama(t *testing.T) {
t.Parallel()
ai := NewAIOllama("http://localhost:11434", "gemma:2b") ai := NewAIOllama("http://localhost:11434", "gemma:2b")
testAI(t, ai) testAI(t, ai)
} }
func TestAILocal(t *testing.T) { func TestAILocal(t *testing.T) {
t.Parallel()
d := os.TempDir() d := os.TempDir()
checkpoints := "checkpoints" checkpoints := "checkpoints"
tokenizer := "tokenizer" tokenizer := "tokenizer"

View File

@@ -13,25 +13,27 @@ import (
) )
type Config struct { type Config struct {
Port int Port int
Debug bool Debug bool
InitializeSlack bool InitializeSlack bool
SlackToken string SlackToken string
SlackChannels []string SlackChannels []string
DriverConn string DriverConn string
BasicAuthUser string BasicAuthUser string
BasicAuthPassword string BasicAuthPassword string
FillWithTestdata bool FillWithTestdata bool
OllamaURL string OllamaURL string
OllamaModel string OllamaModel string
LocalCheckpoint string LocalCheckpoint string
LocalTokenizer string LocalTokenizer string
AssetPattern string AssetPattern string
DatacenterPattern string DatacenterPattern string
EventNamePattern string EventNamePattern string
driver Driver driver Driver
ai AI ai AI
slackToMessagePipeline Pipeline slackToMessagePipeline Pipeline
messageToPersistencePipeline Pipeline
persistenceToNormalizedPipeline Pipeline
} }
var ( var (
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.slackToMessagePipeline = slackToMessagePipeline result.slackToMessagePipeline = slackToMessagePipeline
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.messageToPersistencePipeline = messageToPersistencePipeline
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
return result, nil return result, nil
} }

View File

@@ -6,6 +6,7 @@ import (
) )
func TestNewConfig(t *testing.T) { func TestNewConfig(t *testing.T) {
t.Parallel()
if got, err := newConfigFromEnv(context.Background(), func(k string) string { if got, err := newConfigFromEnv(context.Background(), func(k string) string {
t.Logf("getenv(%s)", k) t.Logf("getenv(%s)", k)
switch k { switch k {

View File

@@ -6,6 +6,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"path"
"testing"
_ "github.com/glebarez/go-sqlite" _ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq" _ "github.com/lib/pq"
@@ -15,6 +17,15 @@ type Driver struct {
*sql.DB *sql.DB
} }
func NewTestDriver(t *testing.T) Driver {
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { driver.Close() })
return driver
}
func NewDriver(ctx context.Context, conn string) (Driver, error) { func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite" engine := "sqlite"
if conn == "" { if conn == "" {

View File

@@ -6,7 +6,13 @@ import (
"time" "time"
) )
func TestNewTestDriver(t *testing.T) {
t.Parallel()
NewTestDriver(t)
}
func TestDriver(t *testing.T) { func TestDriver(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*15) ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can() defer can()

28
main.go
View File

@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-processSlackToMessagePipeline(ctx, cfg): case err := <-processPipelines(ctx,
cfg.slackToMessagePipeline,
cfg.messageToPersistencePipeline,
cfg.persistenceToNormalizedPipeline,
):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
return err return err
} }
} }
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error { func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
ctx, can := context.WithCancel(ctx)
defer can()
pipelines = append(pipelines, first)
errs := make(chan error) errs := make(chan error)
go func() { for i := range pipelines {
defer close(errs) go func(i int) {
select { defer can()
case errs <- cfg.slackToMessagePipeline.Process(ctx): select {
case <-ctx.Done(): case errs <- pipelines[i].Process(ctx):
} case <-ctx.Done():
}() }
}(i)
}
return errs return errs
} }

View File

@@ -19,6 +19,7 @@ import (
) )
func TestRun(t *testing.T) { func TestRun(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
cfg.AssetPattern = renderAssetPattern cfg.AssetPattern = renderAssetPattern
cfg.EventNamePattern = renderEventNamePattern cfg.EventNamePattern = renderEventNamePattern
cfg.Port = port cfg.Port = port
cfg.driver, _ = NewDriver(ctx, "") cfg.driver = NewTestDriver(t)
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg) cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}

View File

@@ -7,6 +7,7 @@ import (
) )
func TestParseSlackTestdata(t *testing.T) { func TestParseSlackTestdata(t *testing.T) {
t.Parallel()
cases := map[string]struct { cases := map[string]struct {
inCh string inCh string
slackMessage slackMessage slackMessage slackMessage

32
normalize.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"errors"
)
type PersistenceToNormalized struct {
pipeline Pipeline
}
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newPersistenceToNormalizedProcess(cfg.driver),
}, nil
}
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
return nil, errors.New("not impl")
}
}

18
normalize_test.go Normal file
View File

@@ -0,0 +1,18 @@
package main
import (
"context"
"testing"
"time"
)
func TestPersistenceToNormalizedProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newPersistenceToNormalizedProcess(d)
_, _ = ctx, process
}

47
persistence.go Normal file
View File

@@ -0,0 +1,47 @@
package main
import (
"context"
"fmt"
)
type MessageToPersistence struct {
pipeline Pipeline
}
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil {
return Pipeline{}, err
}
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: reader,
process: newMessageToPersistenceProcess(cfg.driver),
}, nil
}
func newMessageToPersistenceProcess(driver Driver) processFunc {
return func(ctx context.Context, msg []byte) ([]byte, error) {
m, err := Deserialize(msg)
if err != nil {
return nil, err
}
if result, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
INSERT INTO messages (id, v) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set v = ?;
`, m.ID, msg, msg); err != nil {
return nil, err
} else if n, err := result.RowsAffected(); err != nil {
return nil, err
} else if n != 1 {
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
}
return msg, nil
}
}

34
persistence_test.go Normal file
View File

@@ -0,0 +1,34 @@
package main
import (
"context"
"testing"
"time"
)
func TestMessageToPersistenceProcessor(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
d := NewTestDriver(t)
process := newMessageToPersistenceProcess(d)
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal(err)
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
t.Fatal("failed to upsert on redundant process", err)
}
var id, v []byte
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
if err := row.Err(); err != nil {
t.Fatal(err)
} else if err := row.Scan(&id, &v); err != nil {
t.Fatal(err)
} else if string(id) != "x" {
t.Fatal(string(id))
} else if string(v) != `{"ID":"x"}` {
t.Fatal(string(v))
}
}

View File

@@ -7,16 +7,15 @@ import (
) )
func TestPipeline(t *testing.T) { func TestPipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
driverOutput, _ := NewDriver(ctx, "") output, err := NewQueue(ctx, "output", NewTestDriver(t))
output, err := NewQueue(ctx, "output", driverOutput)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
driverInput, _ := NewDriver(ctx, "") input, err := NewQueue(ctx, "input", NewTestDriver(t))
input, err := NewQueue(ctx, "input", driverInput)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -8,15 +8,15 @@ import (
) )
func TestQueue(t *testing.T) { func TestQueue(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*10) ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can() defer can()
driver, _ := NewDriver(ctx, "") q, err := NewQueue(ctx, "", NewTestDriver(t))
q, err := NewQueue(ctx, "", driver)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
qOther, _ := NewQueue(ctx, "other", driver) qOther, _ := NewQueue(ctx, "other", q.driver)
if reservation, _, err := q.syn(ctx); reservation != nil { if reservation, _, err := q.syn(ctx); reservation != nil {
t.Errorf("able to syn before any enqueues created: %v", err) t.Errorf("able to syn before any enqueues created: %v", err)

View File

@@ -10,11 +10,11 @@ type SlackToMessage struct {
} }
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) { func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "fromSlack", cfg.driver) reader, err := NewQueue(ctx, "slack_event", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }
writer, err := NewQueue(ctx, "fromMessage", cfg.driver) writer, err := NewQueue(ctx, "new_message", cfg.driver)
if err != nil { if err != nil {
return Pipeline{}, err return Pipeline{}, err
} }

View File

@@ -10,11 +10,11 @@ import (
) )
func TestSlackToMessagePipeline(t *testing.T) { func TestSlackToMessagePipeline(t *testing.T) {
t.Parallel()
ctx, can := context.WithTimeout(context.Background(), time.Second*5) ctx, can := context.WithTimeout(context.Background(), time.Second*5)
defer can() defer can()
driver, _ := NewDriver(ctx, "/tmp/f") pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }