35 lines
812 B
Go
35 lines
812 B
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestMessageToPersistenceProcessor(t *testing.T) {
|
|
t.Parallel()
|
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer can()
|
|
|
|
d := NewTestDriver(t)
|
|
process := newMessageToPersistenceProcess(d)
|
|
|
|
if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
|
t.Fatal(err)
|
|
} else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil {
|
|
t.Fatal("failed to upsert on redundant process", err)
|
|
}
|
|
|
|
var id, v []byte
|
|
row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x")
|
|
if err := row.Err(); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := row.Scan(&id, &v); err != nil {
|
|
t.Fatal(err)
|
|
} else if string(id) != "x" {
|
|
t.Fatal(string(id))
|
|
} else if string(v) != `{"ID":"x"}` {
|
|
t.Fatal(string(v))
|
|
}
|
|
}
|