pg-queue-demo
parent
18d8d9af77
commit
0db7008aac
|
|
@ -0,0 +1,164 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"pg/src/with"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if err := with.Context(run); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context) error {
|
||||
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||
log.Println("creating tables...")
|
||||
if _, err := pg.ExecContext(ctx, `
|
||||
CREATE SCHEMA IF NOT EXISTS "queue";
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "queue"."queue" (
|
||||
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
|
||||
payload BYTEA NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
attempts INTEGER DEFAULT 0,
|
||||
reservation UUID,
|
||||
checked_out_at TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "queue"."archive" (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
payload BYTEA NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
archived_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||||
);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
enqueue := func(a any) error {
|
||||
b, _ := json.Marshal(a)
|
||||
_, err := pg.ExecContext(ctx, `INSERT INTO "queue"."queue" (payload) VALUES ($1)`, b)
|
||||
return err
|
||||
}
|
||||
|
||||
mustCheckout := func() (string, []byte, error) {
|
||||
rows, err := pg.QueryContext(ctx, `
|
||||
UPDATE "queue"."queue"
|
||||
SET
|
||||
reservation = GEN_RANDOM_UUID(),
|
||||
attempts = attempts+1,
|
||||
checked_out_at = NOW()
|
||||
WHERE
|
||||
id IN (
|
||||
SELECT id
|
||||
FROM "queue"."queue"
|
||||
WHERE
|
||||
attempts < 100
|
||||
AND (reservation IS NULL OR NOW() - checked_out_at > INTERVAL '1 minute')
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING reservation, payload
|
||||
`)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
var reservation string
|
||||
var payload []byte
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&reservation, &payload); err != nil {
|
||||
return "", nil, fmt.Errorf("failed to scan reservation,payload: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return reservation, payload, rows.Err()
|
||||
}
|
||||
|
||||
resolve := func(reservation string) error {
|
||||
_, err := pg.ExecContext(ctx, `
|
||||
WITH deleted AS (
|
||||
DELETE FROM "queue"."queue"
|
||||
WHERE
|
||||
reservation=$1
|
||||
AND NOW() - checked_out_at < INTERVAL '1 minute'
|
||||
RETURNING id, payload, created_at
|
||||
)
|
||||
INSERT INTO "queue"."archive" (
|
||||
id, payload, created_at
|
||||
) SELECT * FROM deleted
|
||||
`, reservation)
|
||||
return err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
var enqueues atomic.Uint64
|
||||
var dequeues atomic.Uint64
|
||||
go with.Every(ctx, 2*time.Second, func() {
|
||||
seconds := uint64(time.Since(start).Seconds())
|
||||
if seconds < 1 {
|
||||
seconds = 1
|
||||
}
|
||||
e := enqueues.Load()
|
||||
d := dequeues.Load()
|
||||
log.Printf("%d enqueues (%d per second), %d dequeues (%d per second) over %d seconds",
|
||||
e,
|
||||
e/seconds,
|
||||
d,
|
||||
d/seconds,
|
||||
seconds,
|
||||
)
|
||||
})
|
||||
|
||||
go with.Every(ctx, 1, func() {
|
||||
if err := enqueue(1); err != nil {
|
||||
log.Printf("failed to enqueue: %v", err)
|
||||
} else {
|
||||
enqueues.Add(1)
|
||||
}
|
||||
})
|
||||
|
||||
with.Every(ctx, 1, func() {
|
||||
reservation, payload, err := mustCheckout()
|
||||
if err != nil {
|
||||
log.Println("failed checkout:", err)
|
||||
return
|
||||
}
|
||||
|
||||
if reservation == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if string(payload) != "1" {
|
||||
log.Printf("bad checkout: unexpected payload %q", payload)
|
||||
return
|
||||
}
|
||||
|
||||
if err := resolve(reservation); err != nil {
|
||||
log.Printf("failed to resolve: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
dequeues.Add(1)
|
||||
})
|
||||
return ctx.Err()
|
||||
})
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"flag"
|
||||
"os"
|
||||
|
||||
"pg/src/with"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if err := with.Context(run); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context) error {
|
||||
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue