Compare commits
9 Commits
83c0ee3f53
...
3c62411927
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c62411927 | ||
|
|
c84d80e8d3 | ||
|
|
74477fc09c | ||
|
|
1fd4b72b22 | ||
|
|
d9244e4e1c | ||
|
|
c9d3b4998b | ||
|
|
c5e1556f61 | ||
|
|
d76f8e2c15 | ||
|
|
ff280997b1 |
@@ -14,18 +14,21 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestAINoop(t *testing.T) {
|
func TestAINoop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ai := NewAINoop()
|
ai := NewAINoop()
|
||||||
|
|
||||||
testAI(t, ai)
|
testAI(t, ai)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAIOllama(t *testing.T) {
|
func TestAIOllama(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
ai := NewAIOllama("http://localhost:11434", "gemma:2b")
|
||||||
|
|
||||||
testAI(t, ai)
|
testAI(t, ai)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAILocal(t *testing.T) {
|
func TestAILocal(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
d := os.TempDir()
|
d := os.TempDir()
|
||||||
checkpoints := "checkpoints"
|
checkpoints := "checkpoints"
|
||||||
tokenizer := "tokenizer"
|
tokenizer := "tokenizer"
|
||||||
|
|||||||
14
config.go
14
config.go
@@ -32,6 +32,8 @@ type Config struct {
|
|||||||
driver Driver
|
driver Driver
|
||||||
ai AI
|
ai AI
|
||||||
slackToMessagePipeline Pipeline
|
slackToMessagePipeline Pipeline
|
||||||
|
messageToPersistencePipeline Pipeline
|
||||||
|
persistenceToNormalizedPipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -131,5 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
|
|||||||
}
|
}
|
||||||
result.slackToMessagePipeline = slackToMessagePipeline
|
result.slackToMessagePipeline = slackToMessagePipeline
|
||||||
|
|
||||||
|
messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.messageToPersistencePipeline = messageToPersistencePipeline
|
||||||
|
|
||||||
|
persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNewConfig(t *testing.T) {
|
func TestNewConfig(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
if got, err := newConfigFromEnv(context.Background(), func(k string) string {
|
if got, err := newConfigFromEnv(context.Background(), func(k string) string {
|
||||||
t.Logf("getenv(%s)", k)
|
t.Logf("getenv(%s)", k)
|
||||||
switch k {
|
switch k {
|
||||||
|
|||||||
11
driver.go
11
driver.go
@@ -6,6 +6,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
_ "github.com/glebarez/go-sqlite"
|
_ "github.com/glebarez/go-sqlite"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
@@ -15,6 +17,15 @@ type Driver struct {
|
|||||||
*sql.DB
|
*sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewTestDriver(t *testing.T) Driver {
|
||||||
|
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { driver.Close() })
|
||||||
|
return driver
|
||||||
|
}
|
||||||
|
|
||||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||||
engine := "sqlite"
|
engine := "sqlite"
|
||||||
if conn == "" {
|
if conn == "" {
|
||||||
|
|||||||
@@ -6,7 +6,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestNewTestDriver(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
NewTestDriver(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDriver(t *testing.T) {
|
func TestDriver(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
|
|||||||
22
main.go
22
main.go
@@ -36,22 +36,32 @@ func run(ctx context.Context, cfg Config) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case err := <-processSlackToMessagePipeline(ctx, cfg):
|
case err := <-processPipelines(ctx,
|
||||||
|
cfg.slackToMessagePipeline,
|
||||||
|
cfg.messageToPersistencePipeline,
|
||||||
|
cfg.persistenceToNormalizedPipeline,
|
||||||
|
):
|
||||||
return err
|
return err
|
||||||
case err := <-listenAndServe(ctx, cfg):
|
case err := <-listenAndServe(ctx, cfg):
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
|
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
||||||
|
ctx, can := context.WithCancel(ctx)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
pipelines = append(pipelines, first)
|
||||||
errs := make(chan error)
|
errs := make(chan error)
|
||||||
go func() {
|
for i := range pipelines {
|
||||||
defer close(errs)
|
go func(i int) {
|
||||||
|
defer can()
|
||||||
select {
|
select {
|
||||||
case errs <- cfg.slackToMessagePipeline.Process(ctx):
|
case errs <- pipelines[i].Process(ctx):
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}()
|
}(i)
|
||||||
|
}
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestRun(t *testing.T) {
|
func TestRun(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
@@ -43,7 +44,7 @@ func TestRun(t *testing.T) {
|
|||||||
cfg.AssetPattern = renderAssetPattern
|
cfg.AssetPattern = renderAssetPattern
|
||||||
cfg.EventNamePattern = renderEventNamePattern
|
cfg.EventNamePattern = renderEventNamePattern
|
||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver, _ = NewDriver(ctx, "")
|
cfg.driver = NewTestDriver(t)
|
||||||
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestParseSlackTestdata(t *testing.T) {
|
func TestParseSlackTestdata(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
inCh string
|
inCh string
|
||||||
slackMessage slackMessage
|
slackMessage slackMessage
|
||||||
|
|||||||
32
normalize.go
Normal file
32
normalize.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PersistenceToNormalized struct {
|
||||||
|
pipeline Pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: newPersistenceToNormalizedProcess(cfg.driver),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPersistenceToNormalizedProcess(driver Driver) processFunc {
|
||||||
|
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||||
|
return nil, errors.New("not impl")
|
||||||
|
}
|
||||||
|
}
|
||||||
18
normalize_test.go
Normal file
18
normalize_test.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPersistenceToNormalizedProcessor(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
d := NewTestDriver(t)
|
||||||
|
process := newPersistenceToNormalizedProcess(d)
|
||||||
|
|
||||||
|
_, _ = ctx, process
|
||||||
|
}
|
||||||
47
persistence.go
Normal file
47
persistence.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MessageToPersistence struct {
|
||||||
|
pipeline Pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
|
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
||||||
|
if err != nil {
|
||||||
|
return Pipeline{}, err
|
||||||
|
}
|
||||||
|
return Pipeline{
|
||||||
|
writer: writer,
|
||||||
|
reader: reader,
|
||||||
|
process: newMessageToPersistenceProcess(cfg.driver),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
||||||
|
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||||
|
m, err := Deserialize(msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if result, err := driver.ExecContext(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
|
||||||
|
INSERT INTO messages (id, v) VALUES (?, ?)
|
||||||
|
ON CONFLICT(id) DO UPDATE set v = ?;
|
||||||
|
`, m.ID, msg, msg); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if n != 1 {
|
||||||
|
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
|
||||||
|
}
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
34
persistence_test.go
Normal file
34
persistence_test.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMessageToPersistenceProcessor(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
d := NewTestDriver(t)
|
||||||
|
process := newMessageToPersistenceProcess(d)
|
||||||
|
|
||||||
|
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||||
|
t.Fatal("failed to upsert on redundant process", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var id, v []byte
|
||||||
|
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := row.Scan(&id, &v); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(id) != "x" {
|
||||||
|
t.Fatal(string(id))
|
||||||
|
} else if string(v) != `{"ID":"x"}` {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,16 +7,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPipeline(t *testing.T) {
|
func TestPipeline(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driverOutput, _ := NewDriver(ctx, "")
|
output, err := NewQueue(ctx, "output", NewTestDriver(t))
|
||||||
output, err := NewQueue(ctx, "output", driverOutput)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
driverInput, _ := NewDriver(ctx, "")
|
input, err := NewQueue(ctx, "input", NewTestDriver(t))
|
||||||
input, err := NewQueue(ctx, "input", driverInput)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,15 +8,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestQueue(t *testing.T) {
|
func TestQueue(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "")
|
q, err := NewQueue(ctx, "", NewTestDriver(t))
|
||||||
q, err := NewQueue(ctx, "", driver)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
qOther, _ := NewQueue(ctx, "other", driver)
|
qOther, _ := NewQueue(ctx, "other", q.driver)
|
||||||
|
|
||||||
if reservation, _, err := q.syn(ctx); reservation != nil {
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
t.Errorf("able to syn before any enqueues created: %v", err)
|
t.Errorf("able to syn before any enqueues created: %v", err)
|
||||||
|
|||||||
4
slack.go
4
slack.go
@@ -10,11 +10,11 @@ type SlackToMessage struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
|
reader, err := NewQueue(ctx, "slack_event", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
writer, err := NewQueue(ctx, "new_message", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,11 +10,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestSlackToMessagePipeline(t *testing.T) {
|
func TestSlackToMessagePipeline(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "/tmp/f")
|
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
|
||||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user