noop queue and topics embedded
parent
42c5b7d7ad
commit
eef78d6e39
46
ingest.go
46
ingest.go
|
|
@ -1,22 +1,42 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import "context"
|
||||||
"context"
|
|
||||||
"io"
|
type (
|
||||||
|
Ingester struct {
|
||||||
|
writer Queue
|
||||||
|
reader Queue
|
||||||
|
process processFunc
|
||||||
|
}
|
||||||
|
processFunc func(context.Context, []byte) ([]byte, error)
|
||||||
)
|
)
|
||||||
|
|
||||||
type Ingester struct {
|
func NewIngester(writer, reader Queue, process processFunc) Ingester {
|
||||||
d Driver
|
|
||||||
q Queue
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewIngester(d Driver, q Queue) Ingester {
|
|
||||||
return Ingester{
|
return Ingester{
|
||||||
d: d,
|
writer: writer,
|
||||||
q: q,
|
reader: reader,
|
||||||
|
process: process,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i Ingester) Ingest(ctx context.Context) error {
|
func (i Ingester) Process(ctx context.Context) error {
|
||||||
return io.EOF
|
ctx, can := context.WithCancel(ctx)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
for {
|
||||||
|
reservation, read, err := i.reader.Syn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
processed, err := i.process(ctx, read)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := i.writer.Enqueue(ctx, processed); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := i.reader.Ack(ctx, reservation); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
30
queue.go
30
queue.go
|
|
@ -10,9 +10,14 @@ import (
|
||||||
|
|
||||||
type Queue struct {
|
type Queue struct {
|
||||||
driver Driver
|
driver Driver
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(ctx context.Context, driver Driver) (Queue, error) {
|
func NewNoopQueue() Queue {
|
||||||
|
return Queue{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
||||||
if _, err := driver.ExecContext(ctx, `
|
if _, err := driver.ExecContext(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS queue (
|
CREATE TABLE IF NOT EXISTS queue (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
|
|
@ -24,23 +29,29 @@ func NewQueue(ctx context.Context, driver Driver) (Queue, error) {
|
||||||
`); err != nil {
|
`); err != nil {
|
||||||
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
||||||
}
|
}
|
||||||
return Queue{driver: driver}, nil
|
return Queue{topic: topic, driver: driver}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Enqueue(ctx context.Context, topic string, b []byte) error {
|
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
||||||
|
if q.driver.DB == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
INSERT INTO queue (topic, updated, payload) VALUES (?, ?, ?)
|
||||||
`,
|
`,
|
||||||
topic,
|
q.topic,
|
||||||
time.Now().Unix(),
|
time.Now().Unix(),
|
||||||
b,
|
b,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Syn(ctx context.Context, topic string) (string, []byte, error) {
|
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
||||||
|
if q.driver.DB == nil {
|
||||||
|
return "", nil, nil
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
reservation, m, err := q.syn(ctx, topic)
|
reservation, m, err := q.syn(ctx)
|
||||||
if reservation != nil || err != nil {
|
if reservation != nil || err != nil {
|
||||||
return string(reservation), m, err
|
return string(reservation), m, err
|
||||||
}
|
}
|
||||||
|
|
@ -53,7 +64,7 @@ func (q Queue) Syn(ctx context.Context, topic string) (string, []byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) syn(ctx context.Context, topic string) ([]byte, []byte, error) {
|
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
reservation := []byte(uuid.New().String())
|
reservation := []byte(uuid.New().String())
|
||||||
var payload []byte
|
var payload []byte
|
||||||
|
|
@ -73,7 +84,7 @@ func (q Queue) syn(ctx context.Context, topic string) ([]byte, []byte, error) {
|
||||||
)
|
)
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
)
|
)
|
||||||
`, now, reservation, topic, now); err != nil {
|
`, now, reservation, q.topic, now); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
||||||
} else if n, err := result.RowsAffected(); err != nil {
|
} else if n, err := result.RowsAffected(); err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
||||||
|
|
@ -104,6 +115,9 @@ func (q Queue) syn(ctx context.Context, topic string) ([]byte, []byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
||||||
|
if q.driver.DB == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
_, err := q.driver.ExecContext(ctx, `
|
||||||
DELETE FROM queue
|
DELETE FROM queue
|
||||||
WHERE reservation==?
|
WHERE reservation==?
|
||||||
|
|
|
||||||
|
|
@ -12,27 +12,28 @@ func TestQueue(t *testing.T) {
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
driver, _ := NewDriver(ctx, "")
|
driver, _ := NewDriver(ctx, "")
|
||||||
q, err := NewQueue(ctx, driver)
|
q, err := NewQueue(ctx, "", driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
qOther, _ := NewQueue(ctx, "other", driver)
|
||||||
|
|
||||||
t.Run("enqueue", func(t *testing.T) {
|
t.Run("enqueue", func(t *testing.T) {
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if err := q.Enqueue(ctx, "", []byte(strconv.Itoa(i))); err != nil {
|
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
|
||||||
t.Fatal(i, err)
|
t.Fatal(i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := q.Enqueue(ctx, "other", []byte(strconv.Itoa(100))); err != nil {
|
if err := qOther.Enqueue(ctx, []byte(strconv.Itoa(100))); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("syn ack", func(t *testing.T) {
|
t.Run("syn ack", func(t *testing.T) {
|
||||||
found := map[string]struct{}{}
|
found := map[string]struct{}{}
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if reservation, b, err := q.Syn(ctx, ""); err != nil {
|
if reservation, b, err := q.Syn(ctx); err != nil {
|
||||||
t.Fatal(i, "syn err", err)
|
t.Fatal(i, "syn err", err)
|
||||||
} else if _, ok := found[string(b)]; ok {
|
} else if _, ok := found[string(b)]; ok {
|
||||||
t.Errorf("syn'd %q twice (%+v)", b, found)
|
t.Errorf("syn'd %q twice (%+v)", b, found)
|
||||||
|
|
@ -44,9 +45,22 @@ func TestQueue(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if reservation, _, err := q.syn(ctx, ""); reservation != nil {
|
if reservation, _, err := q.syn(ctx); reservation != nil {
|
||||||
t.Errorf("able to syn 1 more message than created: %v", err)
|
t.Errorf("able to syn 1 more message than created: %v", err)
|
||||||
} else if reservation, _, err := q.syn(ctx, "other"); reservation == nil {
|
} else if reservation, _, err := qOther.syn(ctx); reservation == nil {
|
||||||
t.Errorf("unable to syn from other topic: %v", err)
|
t.Errorf("unable to syn from other topic: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Run("noop", func(t *testing.T) {
|
||||||
|
q := NewNoopQueue()
|
||||||
|
if err := q.Enqueue(nil, nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if _, _, err := q.Syn(nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if err := q.Ack(nil, ""); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue