multi topic done
parent
e85a2d25a1
commit
42c5b7d7ad
39
.storage.go
39
.storage.go
|
|
@ -1,39 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrNotFound = errors.New("not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Storage struct {
|
|
||||||
driver Driver
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewStorage(driver Driver) Storage {
|
|
||||||
return Storage{driver: driver}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
|
|
||||||
return nil, errors.New("not impl")
|
|
||||||
}
|
|
||||||
|
|
@ -1,67 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStorage(t *testing.T) {
|
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
t.Run("Threads", func(t *testing.T) {
|
|
||||||
s := NewStorage(NewRAM())
|
|
||||||
|
|
||||||
mX1 := Message{ID: "1", Thread: "X", TS: 1}
|
|
||||||
mX2 := Message{ID: "2", Thread: "X", TS: 2}
|
|
||||||
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
|
|
||||||
|
|
||||||
for _, m := range []Message{mX1, mX2, mY1} {
|
|
||||||
if err := s.Upsert(ctx, m); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if threads, err := s.Threads(ctx); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
} else if len(threads) != 2 {
|
|
||||||
t.Error(threads)
|
|
||||||
} else if threads[0] != "X" {
|
|
||||||
t.Error(threads, "X")
|
|
||||||
} else if threads[1] != "Y" {
|
|
||||||
t.Error(threads, "Y")
|
|
||||||
}
|
|
||||||
|
|
||||||
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
} else if len(threads) != 1 {
|
|
||||||
t.Error(threads)
|
|
||||||
} else if threads[0] != "Y" {
|
|
||||||
t.Error(threads[0])
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Get Upsert", func(t *testing.T) {
|
|
||||||
s := NewStorage(NewRAM())
|
|
||||||
|
|
||||||
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
|
|
||||||
t.Error("failed to get 404", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m := Message{
|
|
||||||
ID: "id",
|
|
||||||
TS: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.Upsert(ctx, m); err != nil {
|
|
||||||
t.Error("failed to upsert", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if m2, err := s.Get(ctx, "id"); err != nil {
|
|
||||||
t.Error("failed to get", err)
|
|
||||||
} else if m != m2 {
|
|
||||||
t.Error(m2)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Ingester struct {
|
||||||
|
d Driver
|
||||||
|
q Queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIngester(d Driver, q Queue) Ingester {
|
||||||
|
return Ingester{
|
||||||
|
d: d,
|
||||||
|
q: q,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i Ingester) Ingest(ctx context.Context) error {
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
26
queue.go
26
queue.go
|
|
@ -13,31 +13,34 @@ type Queue struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(ctx context.Context, driver Driver) (Queue, error) {
|
func NewQueue(ctx context.Context, driver Driver) (Queue, error) {
|
||||||
_, err := driver.ExecContext(ctx, `
|
if _, err := driver.ExecContext(ctx, `
|
||||||
DROP TABLE IF EXISTS queue;
|
|
||||||
CREATE TABLE IF NOT EXISTS queue (
|
CREATE TABLE IF NOT EXISTS queue (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
updated INTEGER NOT NULL,
|
updated INTEGER NOT NULL,
|
||||||
reservation TEXT,
|
reservation TEXT,
|
||||||
payload TEXT
|
payload TEXT
|
||||||
);
|
);
|
||||||
`)
|
`); err != nil {
|
||||||
return Queue{driver: driver}, err
|
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
||||||
|
}
|
||||||
|
return Queue{driver: driver}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
func (q Queue) Enqueue(ctx context.Context, topic string, b []byte) error {
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
INSERT INTO queue (updated, payload) VALUES (?, ?)
|
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
||||||
`,
|
`,
|
||||||
|
topic,
|
||||||
time.Now().Unix(),
|
time.Now().Unix(),
|
||||||
b,
|
b,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
func (q Queue) Syn(ctx context.Context, topic string) (string, []byte, error) {
|
||||||
for {
|
for {
|
||||||
reservation, m, err := q.syn(ctx)
|
reservation, m, err := q.syn(ctx, topic)
|
||||||
if reservation != nil || err != nil {
|
if reservation != nil || err != nil {
|
||||||
return string(reservation), m, err
|
return string(reservation), m, err
|
||||||
}
|
}
|
||||||
|
|
@ -50,7 +53,7 @@ func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
func (q Queue) syn(ctx context.Context, topic string) ([]byte, []byte, error) {
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
reservation := []byte(uuid.New().String())
|
reservation := []byte(uuid.New().String())
|
||||||
var payload []byte
|
var payload []byte
|
||||||
|
|
@ -63,11 +66,14 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM queue
|
FROM queue
|
||||||
WHERE
|
WHERE
|
||||||
|
topic == ?
|
||||||
|
AND (
|
||||||
reservation IS NULL
|
reservation IS NULL
|
||||||
OR ? - updated > 60
|
OR ? - updated > 60
|
||||||
|
)
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
`, now, reservation, now); err != nil {
|
`, now, reservation, topic, now); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
||||||
} else if n, err := result.RowsAffected(); err != nil {
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ func TestQueue(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "/tmp/f.db")
|
driver, _ := NewDriver(ctx, "")
|
||||||
q, err := NewQueue(ctx, driver)
|
q, err := NewQueue(ctx, driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
@ -19,16 +19,20 @@ func TestQueue(t *testing.T) {
|
||||||
|
|
||||||
t.Run("enqueue", func(t *testing.T) {
|
t.Run("enqueue", func(t *testing.T) {
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
if err := q.Enqueue(ctx, "", []byte(strconv.Itoa(i))); err != nil {
|
||||||
t.Fatal(i, err)
|
t.Fatal(i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err := q.Enqueue(ctx, "other", []byte(strconv.Itoa(100))); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("syn ack", func(t *testing.T) {
|
t.Run("syn ack", func(t *testing.T) {
|
||||||
found := map[string]struct{}{}
|
found := map[string]struct{}{}
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if reservation, b, err := q.Syn(ctx); err != nil {
|
if reservation, b, err := q.Syn(ctx, ""); err != nil {
|
||||||
t.Fatal(i, "syn err", err)
|
t.Fatal(i, "syn err", err)
|
||||||
} else if _, ok := found[string(b)]; ok {
|
} else if _, ok := found[string(b)]; ok {
|
||||||
t.Errorf("syn'd %q twice (%+v)", b, found)
|
t.Errorf("syn'd %q twice (%+v)", b, found)
|
||||||
|
|
@ -39,4 +43,10 @@ func TestQueue(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if reservation, _, err := q.syn(ctx, ""); reservation != nil {
|
||||||
|
t.Errorf("able to syn 1 more message than created: %v", err)
|
||||||
|
} else if reservation, _, err := q.syn(ctx, "other"); reservation == nil {
|
||||||
|
t.Errorf("unable to syn from other topic: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue