package main import ( "context" "fmt" "strings" "time" "github.com/google/uuid" ) type Queue struct { driver Driver topic string } func NewNoopQueue() Queue { return Queue{} } func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) { if _, err := driver.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS queue ( id TEXT PRIMARY KEY, topic TEXT NOT NULL, updated INTEGER NOT NULL, reservation TEXT, payload TEXT ); `); err != nil { return Queue{}, fmt.Errorf("failed to create table: %w", err) } return Queue{topic: topic, driver: driver}, nil } func (q Queue) Enqueue(ctx context.Context, b []byte) error { if q.driver.DB == nil { return nil } result, err := q.driver.ExecContext(ctx, ` INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4) `, uuid.New().String(), q.topic, time.Now().Unix(), b, ) if err != nil { return err } if n, err := result.RowsAffected(); err != nil { return err } else if n != 1 { return fmt.Errorf("insert into queue %s affected %v rows", b, n) } return nil } func (q Queue) Syn(ctx context.Context) (string, []byte, error) { if q.driver.DB == nil { return "", nil, nil } for { reservation, m, err := q.syn(ctx) if reservation != nil || err != nil { return string(reservation), m, err } select { case <-ctx.Done(): return "", nil, ctx.Err() case <-time.After(time.Millisecond * 500): } } } func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { now := time.Now().Unix() reservation := []byte(uuid.New().String()) var payload []byte if result, err := q.driver.ExecContext(ctx, ` UPDATE queue SET updated = $1, reservation = $2 WHERE id IN ( SELECT id FROM queue WHERE topic = $3 AND ( reservation IS NULL OR $4 - updated > 600 ) LIMIT 1 ) `, now, reservation, q.topic, now); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: %w", err) } else if n, err := result.RowsAffected(); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) } else if n == 0 { return nil, nil, nil } row := q.driver.QueryRowContext(ctx, ` SELECT payload FROM queue WHERE reservation=$1 LIMIT 1 `, reservation) if err := row.Err(); err != nil { return nil, nil, fmt.Errorf("failed to query reservation: %w", err) } else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") { return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) } return reservation, payload, nil } func (q Queue) Ack(ctx context.Context, reservation string) error { return q.ack(ctx, []byte(reservation)) } func (q Queue) ack(ctx context.Context, reservation []byte) error { if q.driver.DB == nil { return nil } result, err := q.driver.ExecContext(ctx, ` DELETE FROM queue WHERE reservation=$1 `, reservation) if err != nil { return err } if n, _ := result.RowsAffected(); n != 1 { return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n) } return err }