From 0db7008aac09661a9444dffd94a2e1b10dca7576 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Tue, 9 Dec 2025 16:49:46 -0700 Subject: [PATCH] pg-queue-demo --- cmd/{pg-queue => pg-queue-demo}/README.md | 0 cmd/pg-queue-demo/main.go | 164 ++++++++++++++++++++++ cmd/pg-queue/main.go | 31 ---- 3 files changed, 164 insertions(+), 31 deletions(-) rename cmd/{pg-queue => pg-queue-demo}/README.md (100%) create mode 100644 cmd/pg-queue-demo/main.go delete mode 100644 cmd/pg-queue/main.go diff --git a/cmd/pg-queue/README.md b/cmd/pg-queue-demo/README.md similarity index 100% rename from cmd/pg-queue/README.md rename to cmd/pg-queue-demo/README.md diff --git a/cmd/pg-queue-demo/main.go b/cmd/pg-queue-demo/main.go new file mode 100644 index 0000000..4d4286c --- /dev/null +++ b/cmd/pg-queue-demo/main.go @@ -0,0 +1,164 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "sync/atomic" + "time" + + "pg/src/with" + + _ "github.com/lib/pq" +) + +func main() { + if err := with.Context(run); err != nil { + panic(err) + } +} + +func run(ctx context.Context) error { + fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") + if err := fs.Parse(os.Args[1:]); err != nil { + panic(err) + } + + return with.PSQL(ctx, *c, func(pg *sql.DB) error { + log.Println("creating tables...") + if _, err := pg.ExecContext(ctx, ` + CREATE SCHEMA IF NOT EXISTS "queue"; + + CREATE TABLE IF NOT EXISTS "queue"."queue" ( + id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, + payload BYTEA NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + attempts INTEGER DEFAULT 0, + reservation UUID, + checked_out_at TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS "queue"."archive" ( + id UUID NOT NULL PRIMARY KEY, + payload BYTEA NOT NULL, + created_at TIMESTAMP NOT NULL, + archived_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + `); err != nil { + return err + } + + enqueue := func(a any) error { + b, _ := json.Marshal(a) + _, err := pg.ExecContext(ctx, `INSERT INTO "queue"."queue" (payload) VALUES ($1)`, b) + return err + } + + mustCheckout := func() (string, []byte, error) { + rows, err := pg.QueryContext(ctx, ` + UPDATE "queue"."queue" + SET + reservation = GEN_RANDOM_UUID(), + attempts = attempts+1, + checked_out_at = NOW() + WHERE + id IN ( + SELECT id + FROM "queue"."queue" + WHERE + attempts < 100 + AND (reservation IS NULL OR NOW() - checked_out_at > INTERVAL '1 minute') + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING reservation, payload + `) + if err != nil { + return "", nil, err + } + + var reservation string + var payload []byte + for rows.Next() { + if err := rows.Scan(&reservation, &payload); err != nil { + return "", nil, fmt.Errorf("failed to scan reservation,payload: %w", err) + } + } + + return reservation, payload, rows.Err() + } + + resolve := func(reservation string) error { + _, err := pg.ExecContext(ctx, ` + WITH deleted AS ( + DELETE FROM "queue"."queue" + WHERE + reservation=$1 + AND NOW() - checked_out_at < INTERVAL '1 minute' + RETURNING id, payload, created_at + ) + INSERT INTO "queue"."archive" ( + id, payload, created_at + ) SELECT * FROM deleted + `, reservation) + return err + } + + start := time.Now() + var enqueues atomic.Uint64 + var dequeues atomic.Uint64 + go with.Every(ctx, 2*time.Second, func() { + seconds := uint64(time.Since(start).Seconds()) + if seconds < 1 { + seconds = 1 + } + e := enqueues.Load() + d := dequeues.Load() + log.Printf("%d enqueues (%d per second), %d dequeues (%d per second) over %d seconds", + e, + e/seconds, + d, + d/seconds, + seconds, + ) + }) + + go with.Every(ctx, 1, func() { + if err := enqueue(1); err != nil { + log.Printf("failed to enqueue: %v", err) + } else { + enqueues.Add(1) + } + }) + + with.Every(ctx, 1, func() { + reservation, payload, err := mustCheckout() + if err != nil { + log.Println("failed checkout:", err) + return + } + + if reservation == "" { + return + } + + if string(payload) != "1" { + log.Printf("bad checkout: unexpected payload %q", payload) + return + } + + if err := resolve(reservation); err != nil { + log.Printf("failed to resolve: %v", err) + return + } + + dequeues.Add(1) + }) + return ctx.Err() + }) +} diff --git a/cmd/pg-queue/main.go b/cmd/pg-queue/main.go deleted file mode 100644 index 40f1794..0000000 --- a/cmd/pg-queue/main.go +++ /dev/null @@ -1,31 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "flag" - "os" - - "pg/src/with" - - _ "github.com/lib/pq" -) - -func main() { - if err := with.Context(run); err != nil { - panic(err) - } -} - -func run(ctx context.Context) error { - fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) - c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") - if err := fs.Parse(os.Args[1:]); err != nil { - panic(err) - } - - return with.PSQL(ctx, *c, func(pg *sql.DB) error { - <-ctx.Done() - return nil - }) -}