From faa128400f1dd2ff4a8348433c6b7620074e89e9 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:29:32 -0700 Subject: [PATCH] my example was not good tbh --- cmd/pg-pubsub-demo/README.md | 1 + cmd/pg-pubsub-demo/main.go | 162 +++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 cmd/pg-pubsub-demo/README.md create mode 100644 cmd/pg-pubsub-demo/main.go diff --git a/cmd/pg-pubsub-demo/README.md b/cmd/pg-pubsub-demo/README.md new file mode 100644 index 0000000..d434b6a --- /dev/null +++ b/cmd/pg-pubsub-demo/README.md @@ -0,0 +1 @@ +https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks \ No newline at end of file diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go new file mode 100644 index 0000000..d1782a8 --- /dev/null +++ b/cmd/pg-pubsub-demo/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "os" + "strconv" + "time" + + "pg/src/with" + + _ "github.com/lib/pq" +) + +func main() { + if err := with.Context(run); err != nil { + panic(err) + } +} + +func run(ctx context.Context) error { + fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") + if err := fs.Parse(os.Args[1:]); err != nil { + panic(err) + } + + return with.PSQL(ctx, *c, func(pg *sql.DB) error { + log.Println("creating tables...") + for _, sql := range []string{ + `CREATE SCHEMA IF NOT EXISTS "pubsub"`, + + `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a" ( + partition INT PRIMARY KEY, + partition_offset BIGINT NOT NULL DEFAULT 0 + )`, + `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_0" ( + id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, + partition_offset BIGINT UNIQUE NOT NULL, + payload BYTEA NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW() + )`, + `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_1" ( + id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, + partition_offset BIGINT UNIQUE NOT NULL, + payload BYTEA NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW() + )`, + + `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (0, 0) ON CONFLICT DO NOTHING`, + `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (1, 0) ON CONFLICT DO NOTHING`, + + `CREATE TABLE IF NOT EXISTS "pubsub"."consumer_partition_offset" ( + consumer_group TEXT NOT NULL, + partition INT REFERENCES "pubsub"."topic_a"("partition"), + partition_offset BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (consumer_group, partition) + )`, + } { + if _, err := pg.ExecContext(ctx, sql); err != nil { + return fmt.Errorf("failed setup sql %s: %w", sql, err) + } + } + + pub := func(a any) error { + b, _ := json.Marshal(a) + partition := rand.Int() % 2 + partitionS := strconv.Itoa(partition) + _, err := pg.ExecContext(ctx, ` + WITH reserve AS ( + UPDATE "pubsub"."topic_a" + SET partition_offset=partition_offset+1 + WHERE partition=`+partitionS+` + RETURNING (partition_offset-1) AS offset + ) + INSERT INTO "pubsub"."topic_a_partition_`+partitionS+`" (partition_offset, payload) + SELECT reserve.offset, $1 FROM reserve + `, b) + return err + } + sub := func(group string) (any, error) { + partition := rand.Int() % 2 + partitionS := strconv.Itoa(partition) + + if _, err := pg.ExecContext(ctx, ` + INSERT INTO + "pubsub"."consumer_partition_offset" + (consumer_group, partition) + VALUES + ($1, $2) + ON CONFLICT DO NOTHING + `, group, partition); err != nil { + return nil, err + } + + row := pg.QueryRowContext(ctx, ` + WITH tip AS ( + SELECT partition_offset-1 + FROM "pubsub"."topic_a" + WHERE partition=`+partitionS+` + ) + , to_claim AS ( + SELECT partition_offset + FROM "pubsub"."consumer_partition_offset" + WHERE consumer_group=$1 AND partition=$2 + FOR UPDATE SKIP LOCKED + ) + , claimed AS ( + UPDATE "pubsub"."consumer_partition_offset" consumer_partition_offset + SET consumer_partition_offset.partition_offset=to_claim.partition_offset+1 + FROM to_claim + WHERE consumer_partition_offset.consumer_group=$1 AND consumer_partition_offset.partition=$2 + RETURNING consumer_partition_offset.partition_offset + ) + SELECT partition_offset FROM claimed + `, group, partition) + if err := row.Err(); err != nil { + return nil, err + } + + var result any + if err := row.Scan(&result); err != nil { + return nil, err + } + + return result, row.Err() + } + + go with.Every(ctx, time.Second, func() { + for i := 0; i < 2; i++ { + if err := pub(1); err != nil { + log.Printf("failed pub: %v", err) + } else { + log.Printf("pubbed") + } + } + }) + + with.Every(ctx, time.Second, func() { + if err := func() error { + for i := 0; i < 2; i++ { + group := fmt.Sprintf("group_%d", i) + if v, err := sub(group); err != nil { + return err + } else { + log.Printf("%s => %v", group, v) + } + } + return nil + }(); err != nil { + log.Printf("failed subs: %v", err) + } + }) + + return ctx.Err() + }) +}