spoc-bot-vr/queue.go

140 lines
3.0 KiB
Go

package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/google/uuid"
)
type Queue struct {
driver Driver
topic string
}
func NewNoopQueue() Queue {
return Queue{}
}
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS queue (
id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
updated INTEGER NOT NULL,
reservation TEXT,
payload TEXT
);
`); err != nil {
return Queue{}, fmt.Errorf("failed to create table: %w", err)
}
return Queue{topic: topic, driver: driver}, nil
}
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
if q.driver.DB == nil {
return nil
}
result, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
`,
uuid.New().String(),
q.topic,
time.Now().Unix(),
b,
)
if err != nil {
return err
}
if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
}
return nil
}
func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
if q.driver.DB == nil {
return "", nil, nil
}
for {
reservation, m, err := q.syn(ctx)
if reservation != nil || err != nil {
return string(reservation), m, err
}
select {
case <-ctx.Done():
return "", nil, ctx.Err()
case <-time.After(time.Millisecond * 500):
}
}
}
func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
now := time.Now().Unix()
reservation := []byte(uuid.New().String())
var payload []byte
if result, err := q.driver.ExecContext(ctx, `
UPDATE queue
SET
updated = $1, reservation = $2
WHERE
id IN (
SELECT id
FROM queue
WHERE
topic = $3
AND (
reservation IS NULL
OR $4 - updated > 600
)
LIMIT 1
)
`, now, reservation, q.topic, now); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
} else if n, err := result.RowsAffected(); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
} else if n == 0 {
return nil, nil, nil
}
row := q.driver.QueryRowContext(ctx, `
SELECT payload
FROM queue
WHERE reservation=$1
LIMIT 1
`, reservation)
if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
}
return reservation, payload, nil
}
func (q Queue) Ack(ctx context.Context, reservation string) error {
return q.ack(ctx, []byte(reservation))
}
func (q Queue) ack(ctx context.Context, reservation []byte) error {
if q.driver.DB == nil {
return nil
}
result, err := q.driver.ExecContext(ctx, `
DELETE FROM queue
WHERE reservation=$1
`, reservation)
if err != nil {
return err
}
if n, _ := result.RowsAffected(); n != 1 {
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
}
return err
}