test message to persistence
parent
74477fc09c
commit
c84d80e8d3
|
|
@ -2,7 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageToPersistence struct {
|
type MessageToPersistence struct {
|
||||||
|
|
@ -26,8 +26,22 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline,
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
func newMessageToPersistenceProcess(driver Driver) processFunc {
|
||||||
return func(ctx context.Context, slack []byte) ([]byte, error) {
|
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
||||||
// TODO table design lets go
|
m, err := Deserialize(msg)
|
||||||
return nil, io.EOF
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if result, err := driver.ExecContext(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT);
|
||||||
|
INSERT INTO messages (id, v) VALUES (?, ?)
|
||||||
|
ON CONFLICT(id) DO UPDATE set v = ?;
|
||||||
|
`, m.ID, msg, msg); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if n != 1 {
|
||||||
|
return nil, fmt.Errorf("upserting event to persistence modified %v rows", n)
|
||||||
|
}
|
||||||
|
return msg, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,18 +11,24 @@ func TestMessageToPersistenceProcessor(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
cases := map[string]string{}
|
d := NewTestDriver(t)
|
||||||
|
process := newMessageToPersistenceProcess(d)
|
||||||
|
|
||||||
for given, wantd := range cases {
|
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||||
want := wantd
|
|
||||||
t.Run(given, func(t *testing.T) {
|
|
||||||
process := newMessageToPersistenceProcess(NewTestDriver(t))
|
|
||||||
|
|
||||||
if got, err := process(ctx, []byte(given)); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if string(got) != string(want) {
|
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
||||||
t.Errorf("wanted %q but got %q", want, got)
|
t.Fatal("failed to upsert on redundant process", err)
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
var id, v []byte
|
||||||
|
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := row.Scan(&id, &v); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(id) != "x" {
|
||||||
|
t.Fatal(string(id))
|
||||||
|
} else if string(v) != `{"ID":"x"}` {
|
||||||
|
t.Fatal(string(v))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue