sql :memory: dont work so make a helper NewTestDriver
parent
c5e1556f61
commit
c9d3b4998b
11
driver.go
11
driver.go
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
_ "github.com/glebarez/go-sqlite"
|
_ "github.com/glebarez/go-sqlite"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
@ -15,6 +17,15 @@ type Driver struct {
|
||||||
*sql.DB
|
*sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewTestDriver(t *testing.T) Driver {
|
||||||
|
driver, err := NewDriver(context.Background(), path.Join(t.TempDir(), "db"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { driver.Close() })
|
||||||
|
return driver
|
||||||
|
}
|
||||||
|
|
||||||
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
func NewDriver(ctx context.Context, conn string) (Driver, error) {
|
||||||
engine := "sqlite"
|
engine := "sqlite"
|
||||||
if conn == "" {
|
if conn == "" {
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestNewTestDriver(t *testing.T) {
|
||||||
|
NewTestDriver(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDriver(t *testing.T) {
|
func TestDriver(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ func TestRun(t *testing.T) {
|
||||||
cfg.AssetPattern = renderAssetPattern
|
cfg.AssetPattern = renderAssetPattern
|
||||||
cfg.EventNamePattern = renderEventNamePattern
|
cfg.EventNamePattern = renderEventNamePattern
|
||||||
cfg.Port = port
|
cfg.Port = port
|
||||||
cfg.driver, _ = NewDriver(ctx, "")
|
cfg.driver = NewTestDriver(t)
|
||||||
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg)
|
||||||
cfg.SlackToken = "redacted"
|
cfg.SlackToken = "redacted"
|
||||||
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
cfg.SlackChannels = []string{"C06U1DDBBU4"}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageToPersistence struct {
|
type MessageToPersistence struct {
|
||||||
|
|
@ -10,27 +10,23 @@ type MessageToPersistence struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "fromSlack", cfg.driver)
|
reader, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer, err := NewQueue(ctx, "fromMessage", cfg.driver)
|
writer, err := NewQueue(ctx, "fromPersistence", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newMessageToPersistenceProcess(cfg),
|
process: newMessageToPersistenceProcess(cfg.driver),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageToPersistenceProcess(cfg Config) processFunc {
|
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
||||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
||||||
m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
return nil, io.EOF
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to deserialize slack %w: %s", err, slack)
|
|
||||||
}
|
|
||||||
return m.Serialize(), nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMessageToPersistenceProcessor(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
cases := map[string]string{}
|
||||||
|
|
||||||
|
for given, wantd := range cases {
|
||||||
|
want := wantd
|
||||||
|
t.Run(given, func(t *testing.T) {
|
||||||
|
process := newMessageToPersistenceProcess(NewTestDriver(t))
|
||||||
|
|
||||||
|
if got, err := process(ctx, []byte(given)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(got) != string(want) {
|
||||||
|
t.Errorf("wanted %q but got %q", want, got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -10,13 +10,11 @@ func TestPipeline(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driverOutput, _ := NewDriver(ctx, "")
|
output, err := NewQueue(ctx, "output", NewTestDriver(t))
|
||||||
output, err := NewQueue(ctx, "output", driverOutput)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
driverInput, _ := NewDriver(ctx, "")
|
input, err := NewQueue(ctx, "input", NewTestDriver(t))
|
||||||
input, err := NewQueue(ctx, "input", driverInput)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,12 +11,11 @@ func TestQueue(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "")
|
q, err := NewQueue(ctx, "", NewTestDriver(t))
|
||||||
q, err := NewQueue(ctx, "", driver)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
qOther, _ := NewQueue(ctx, "other", driver)
|
qOther, _ := NewQueue(ctx, "other", q.driver)
|
||||||
|
|
||||||
if reservation, _, err := q.syn(ctx); reservation != nil {
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
t.Errorf("able to syn before any enqueues created: %v", err)
|
t.Errorf("able to syn before any enqueues created: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,7 @@ func TestSlackToMessagePipeline(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "/tmp/f")
|
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)})
|
||||||
pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: driver})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue