package main import ( "context" "database/sql" "encoding/json" "flag" "fmt" "log" "os" "sync/atomic" "time" "pg/src/with" _ "github.com/lib/pq" ) func main() { if err := with.Context(run); err != nil { panic(err) } } func run(ctx context.Context) error { fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } return with.PSQL(ctx, *c, func(pg *sql.DB) error { log.Println("creating tables...") if _, err := pg.ExecContext(ctx, ` CREATE SCHEMA IF NOT EXISTS "queue"; CREATE TABLE IF NOT EXISTS "queue"."queue" ( id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, payload BYTEA NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), attempts INTEGER DEFAULT 0, reservation UUID, checked_out_at TIMESTAMP ); CREATE TABLE IF NOT EXISTS "queue"."archive" ( id UUID NOT NULL PRIMARY KEY, payload BYTEA NOT NULL, created_at TIMESTAMP NOT NULL, archived_at TIMESTAMP NOT NULL DEFAULT NOW() ); `); err != nil { return err } enqueue := func(a any) error { b, _ := json.Marshal(a) _, err := pg.ExecContext(ctx, `INSERT INTO "queue"."queue" (payload) VALUES ($1)`, b) return err } mustCheckout := func() (string, []byte, error) { rows, err := pg.QueryContext(ctx, ` UPDATE "queue"."queue" SET reservation = GEN_RANDOM_UUID(), attempts = attempts+1, checked_out_at = NOW() WHERE id IN ( SELECT id FROM "queue"."queue" WHERE attempts < 100 AND (reservation IS NULL OR NOW() - checked_out_at > INTERVAL '1 minute') FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING reservation, payload `) if err != nil { return "", nil, err } var reservation string var payload []byte for rows.Next() { if err := rows.Scan(&reservation, &payload); err != nil { return "", nil, fmt.Errorf("failed to scan reservation,payload: %w", err) } } return reservation, payload, rows.Err() } resolve := func(reservation string) error { _, err := pg.ExecContext(ctx, ` WITH deleted AS ( DELETE FROM "queue"."queue" WHERE reservation=$1 AND NOW() - checked_out_at < INTERVAL '1 minute' RETURNING id, payload, created_at ) INSERT INTO "queue"."archive" ( id, payload, created_at ) SELECT * FROM deleted `, reservation) return err } start := time.Now() var enqueues atomic.Uint64 var dequeues atomic.Uint64 go with.Every(ctx, 2*time.Second, func() { seconds := uint64(time.Since(start).Seconds()) if seconds < 1 { seconds = 1 } e := enqueues.Load() d := dequeues.Load() log.Printf("%d enqueues (%d per second), %d dequeues (%d per second) over %d seconds", e, e/seconds, d, d/seconds, seconds, ) }) go with.Every(ctx, 1, func() { if err := enqueue(1); err != nil { log.Printf("failed to enqueue: %v", err) } else { enqueues.Add(1) } }) with.Every(ctx, 1, func() { reservation, payload, err := mustCheckout() if err != nil { log.Println("failed checkout:", err) return } if reservation == "" { return } if string(payload) != "1" { log.Printf("bad checkout: unexpected payload %q", payload) return } if err := resolve(reservation); err != nil { log.Printf("failed to resolve: %v", err) return } dequeues.Add(1) }) return ctx.Err() }) }