140 lines
3.0 KiB
Go
140 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type Queue struct {
|
|
driver Driver
|
|
topic string
|
|
}
|
|
|
|
func NewNoopQueue() Queue {
|
|
return Queue{}
|
|
}
|
|
|
|
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
|
|
if _, err := driver.ExecContext(ctx, `
|
|
CREATE TABLE IF NOT EXISTS queue (
|
|
id TEXT PRIMARY KEY,
|
|
topic TEXT NOT NULL,
|
|
updated INTEGER NOT NULL,
|
|
reservation TEXT,
|
|
payload TEXT
|
|
);
|
|
`); err != nil {
|
|
return Queue{}, fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
return Queue{topic: topic, driver: driver}, nil
|
|
}
|
|
|
|
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
|
|
if q.driver.DB == nil {
|
|
return nil
|
|
}
|
|
result, err := q.driver.ExecContext(ctx, `
|
|
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
|
|
`,
|
|
uuid.New().String(),
|
|
q.topic,
|
|
time.Now().Unix(),
|
|
b,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n, err := result.RowsAffected(); err != nil {
|
|
return err
|
|
} else if n != 1 {
|
|
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
|
|
if q.driver.DB == nil {
|
|
return "", nil, nil
|
|
}
|
|
for {
|
|
reservation, m, err := q.syn(ctx)
|
|
if reservation != nil || err != nil {
|
|
return string(reservation), m, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return "", nil, ctx.Err()
|
|
case <-time.After(time.Millisecond * 500):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
|
now := time.Now().Unix()
|
|
reservation := []byte(uuid.New().String())
|
|
var payload []byte
|
|
if result, err := q.driver.ExecContext(ctx, `
|
|
UPDATE queue
|
|
SET
|
|
updated = $1, reservation = $2
|
|
WHERE
|
|
id IN (
|
|
SELECT id
|
|
FROM queue
|
|
WHERE
|
|
topic = $3
|
|
AND (
|
|
reservation IS NULL
|
|
OR $4 - updated > 600
|
|
)
|
|
LIMIT 1
|
|
)
|
|
`, now, reservation, q.topic, now); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
|
|
} else if n, err := result.RowsAffected(); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
|
|
} else if n == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
row := q.driver.QueryRowContext(ctx, `
|
|
SELECT payload
|
|
FROM queue
|
|
WHERE reservation=$1
|
|
LIMIT 1
|
|
`, reservation)
|
|
if err := row.Err(); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
|
|
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
|
|
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
|
|
}
|
|
|
|
return reservation, payload, nil
|
|
}
|
|
|
|
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
|
return q.ack(ctx, []byte(reservation))
|
|
}
|
|
|
|
func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
|
if q.driver.DB == nil {
|
|
return nil
|
|
}
|
|
result, err := q.driver.ExecContext(ctx, `
|
|
DELETE FROM queue
|
|
WHERE reservation=$1
|
|
`, reservation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n, _ := result.RowsAffected(); n != 1 {
|
|
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
|
|
}
|
|
return err
|
|
}
|