Compare commits
9 Commits
83c0ee3f53
...
3c62411927
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c62411927 | ||
|
|
c84d80e8d3 | ||
|
|
74477fc09c | ||
|
|
1fd4b72b22 | ||
|
|
d9244e4e1c | ||
|
|
c9d3b4998b | ||
|
|
c5e1556f61 | ||
|
|
d76f8e2c15 | ||
|
|
ff280997b1 |
@@ -14,18 +14,21 @@ import (
|
||||
)
|
||||
|
||||
func TestAINoop(t *testing.T) {
|
||||
t.Parallel()
|
||||
ai := NewAINoop()
|
||||
|
||||
testAI(t, ai)
|
||||
}
|
||||
|
||||
func TestAIOllama(t *testing.T) {
|
||||
t.Parallel()
|
||||
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
||||
|
||||
testAI(t, ai)
|
||||
}
|
||||
|
||||
func TestAILocal(t *testing.T) {
|
||||
t.Parallel()
|
||||
d := os.TempDir()
|
||||
checkpoints := "checkpoints"
|
||||
tokenizer := "tokenizer"
|
||||
|
||||
52
config.go
52
config.go
@@ -13,25 +13,27 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Port int
|
||||
Debug bool
|
||||
InitializeSlack bool
|
||||
SlackToken string
|
||||
SlackChannels []string
|
||||
DriverConn string
|
||||
BasicAuthUser string
|
||||
BasicAuthPassword string
|
||||
FillWithTestdata bool
|
||||
OllamaURL string
|
||||
OllamaModel string
|
||||
LocalCheckpoint string
|
||||
LocalTokenizer string
|
||||
AssetPattern string
|
||||
DatacenterPattern string
|
||||
EventNamePattern string
|
||||
driver Driver
|
||||
ai AI
|
||||
slackToMessagePipeline Pipeline
|
||||
Port int
|
||||
Debug bool
|
||||
InitializeSlack bool
|
||||
SlackToken string
|
||||
SlackChannels []string
|
||||
DriverConn string
|
||||
BasicAuthUser string
|
||||
BasicAuthPassword string
|
||||
FillWithTestdata bool
|
||||
OllamaURL string
|
||||
OllamaModel string
|
||||
LocalCheckpoint string
|
||||
LocalTokenizer string
|
||||
AssetPattern string
|
||||
DatacenterPattern string
|
||||
EventNamePattern string
|
||||
driver Driver
|
||||
ai AI
|
||||
slackToMessagePipeline Pipeline
|
||||
messageToPersistencePipeline Pipeline
|
||||
persistenceToNormalizedPipeline Pipeline
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
||||
}
|
||||
result.slackToMessagePipeline = slackToMessagePipeline
|
||||
|
||||
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.messageToPersistencePipeline = messageToPersistencePipeline
|
||||
|
||||
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
)
|
||||
|
||||
func TestNewConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
if got, err := newConfigFromEnv(context.Background(), func(k string) string {
|
||||
t.Logf("getenv(%s)", k)
|
||||
switch k {
|
||||
|
||||
11
driver.go
11
driver.go
@@ -6,6 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
_ "github.com/glebarez/go-sqlite"
|
||||
_ "github.com/lib/pq"
|
||||
@@ -15,6 +17,15 @@ type Driver struct {
|
||||
*sql.DB
|
||||
}
|
||||
|
||||
func NewTestDriver(t *testing.T) Driver {
|
||||
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { driver.Close() })
|
||||
return driver
|
||||
}
|
||||
|
||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||
engine := "sqlite"
|
||||
if conn == "" {
|
||||
|
||||
@@ -6,7 +6,13 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewTestDriver(t *testing.T) {
|
||||
t.Parallel()
|
||||
NewTestDriver(t)
|
||||
}
|
||||
|
||||
func TestDriver(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||
defer can()
|
||||
|
||||
|
||||
28
main.go
28
main.go
@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-processSlackToMessagePipeline(ctx, cfg):
|
||||
case err := <-processPipelines(ctx,
|
||||
cfg.slackToMessagePipeline,
|
||||
cfg.messageToPersistencePipeline,
|
||||
cfg.persistenceToNormalizedPipeline,
|
||||
):
|
||||
return err
|
||||
case err := <-listenAndServe(ctx, cfg):
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
|
||||
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
||||
ctx, can := context.WithCancel(ctx)
|
||||
defer can()
|
||||
|
||||
pipelines = append(pipelines, first)
|
||||
errs := make(chan error)
|
||||
go func() {
|
||||
defer close(errs)
|
||||
select {
|
||||
case errs <- cfg.slackToMessagePipeline.Process(ctx):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
for i := range pipelines {
|
||||
go func(i int) {
|
||||
defer can()
|
||||
select {
|
||||
case errs <- pipelines[i].Process(ctx):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
)
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
|
||||
cfg.AssetPattern = renderAssetPattern
|
||||
cfg.EventNamePattern = renderEventNamePattern
|
||||
cfg.Port = port
|
||||
cfg.driver, _ = NewDriver(ctx, "")
|
||||
cfg.driver = NewTestDriver(t)
|
||||
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
||||
cfg.SlackToken = "redacted"
|
||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
)
|
||||
|
||||
func TestParseSlackTestdata(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := map[string]struct {
|
||||
inCh string
|
||||
slackMessage slackMessage
|
||||
|
||||
32
normalize.go
Normal file
32
normalize.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type PersistenceToNormalized struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newPersistenceToNormalizedProcess(cfg.driver),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
|
||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||
return nil, errors.New("not impl")
|
||||
}
|
||||
}
|
||||
18
normalize_test.go
Normal file
18
normalize_test.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPersistenceToNormalizedProcessor(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
process := newPersistenceToNormalizedProcess(d)
|
||||
|
||||
_, _ = ctx, process
|
||||
}
|
||||
47
persistence.go
Normal file
47
persistence.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type MessageToPersistence struct {
|
||||
pipeline Pipeline
|
||||
}
|
||||
|
||||
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
return Pipeline{
|
||||
writer: writer,
|
||||
reader: reader,
|
||||
process: newMessageToPersistenceProcess(cfg.driver),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||
m, err := Deserialize(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result, err := driver.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
|
||||
INSERT INTO messages (id, v) VALUES (?, ?)
|
||||
ON CONFLICT(id) DO UPDATE set v = ?;
|
||||
`, m.ID, msg, msg); err != nil {
|
||||
return nil, err
|
||||
} else if n, err := result.RowsAffected(); err != nil {
|
||||
return nil, err
|
||||
} else if n != 1 {
|
||||
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
}
|
||||
34
persistence_test.go
Normal file
34
persistence_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMessageToPersistenceProcessor(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
d := NewTestDriver(t)
|
||||
process := newMessageToPersistenceProcess(d)
|
||||
|
||||
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||
t.Fatal("failed to upsert on redundant process", err)
|
||||
}
|
||||
|
||||
var id, v []byte
|
||||
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
||||
if err := row.Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := row.Scan(&id, &v); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(id) != "x" {
|
||||
t.Fatal(string(id))
|
||||
} else if string(v) != `{"ID":"x"}` {
|
||||
t.Fatal(string(v))
|
||||
}
|
||||
}
|
||||
@@ -7,16 +7,15 @@ import (
|
||||
)
|
||||
|
||||
func TestPipeline(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
driverOutput, _ := NewDriver(ctx, "")
|
||||
output, err := NewQueue(ctx, "output", driverOutput)
|
||||
output, err := NewQueue(ctx, "output", NewTestDriver(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
driverInput, _ := NewDriver(ctx, "")
|
||||
input, err := NewQueue(ctx, "input", driverInput)
|
||||
input, err := NewQueue(ctx, "input", NewTestDriver(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -8,15 +8,15 @@ import (
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer can()
|
||||
|
||||
driver, _ := NewDriver(ctx, "")
|
||||
q, err := NewQueue(ctx, "", driver)
|
||||
q, err := NewQueue(ctx, "", NewTestDriver(t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qOther, _ := NewQueue(ctx, "other", driver)
|
||||
qOther, _ := NewQueue(ctx, "other", q.driver)
|
||||
|
||||
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||
t.Errorf("able to syn before any enqueues created: %v", err)
|
||||
|
||||
4
slack.go
4
slack.go
@@ -10,11 +10,11 @@ type SlackToMessage struct {
|
||||
}
|
||||
|
||||
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
|
||||
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
||||
writer, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||
if err != nil {
|
||||
return Pipeline{}, err
|
||||
}
|
||||
|
||||
@@ -10,11 +10,11 @@ import (
|
||||
)
|
||||
|
||||
func TestSlackToMessagePipeline(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer can()
|
||||
|
||||
driver, _ := NewDriver(ctx, "/tmp/f")
|
||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
|
||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user