From 43738a1d7877697f773161a435f9cb78f92a01cc Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:33:13 -0700 Subject: [PATCH] restub i do it myself --- cmd/pg-pubsub-demo/main.go | 127 +++++++------------------------------ 1 file changed, 23 insertions(+), 104 deletions(-) diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index d1782a8..ecb5505 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -3,13 +3,11 @@ package main import ( "context" "database/sql" - "encoding/json" "flag" "fmt" + "io" "log" - "math/rand" "os" - "strconv" "time" "pg/src/with" @@ -31,109 +29,25 @@ func run(ctx context.Context) error { } return with.PSQL(ctx, *c, func(pg *sql.DB) error { - log.Println("creating tables...") - for _, sql := range []string{ - `CREATE SCHEMA IF NOT EXISTS "pubsub"`, - - `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a" ( - partition INT PRIMARY KEY, - partition_offset BIGINT NOT NULL DEFAULT 0 - )`, - `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_0" ( - id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, - partition_offset BIGINT UNIQUE NOT NULL, - payload BYTEA NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT NOW() - )`, - `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_1" ( - id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, - partition_offset BIGINT UNIQUE NOT NULL, - payload BYTEA NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT NOW() - )`, - - `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (0, 0) ON CONFLICT DO NOTHING`, - `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (1, 0) ON CONFLICT DO NOTHING`, - - `CREATE TABLE IF NOT EXISTS "pubsub"."consumer_partition_offset" ( - consumer_group TEXT NOT NULL, - partition INT REFERENCES "pubsub"."topic_a"("partition"), - partition_offset BIGINT NOT NULL DEFAULT 0, - PRIMARY KEY (consumer_group, partition) - )`, - } { - if _, err := pg.ExecContext(ctx, sql); err != nil { - return fmt.Errorf("failed setup sql %s: %w", sql, err) - } - } - - pub := func(a any) error { - b, _ := json.Marshal(a) - partition := rand.Int() % 2 - partitionS := strconv.Itoa(partition) - _, err := pg.ExecContext(ctx, ` - WITH reserve AS ( - UPDATE "pubsub"."topic_a" - SET partition_offset=partition_offset+1 - WHERE partition=`+partitionS+` - RETURNING (partition_offset-1) AS offset - ) - INSERT INTO "pubsub"."topic_a_partition_`+partitionS+`" (partition_offset, payload) - SELECT reserve.offset, $1 FROM reserve - `, b) + if _, err := pg.ExecContext(ctx, ` + `); err != nil { return err } - sub := func(group string) (any, error) { - partition := rand.Int() % 2 - partitionS := strconv.Itoa(partition) - if _, err := pg.ExecContext(ctx, ` - INSERT INTO - "pubsub"."consumer_partition_offset" - (consumer_group, partition) - VALUES - ($1, $2) - ON CONFLICT DO NOTHING - `, group, partition); err != nil { - return nil, err - } - - row := pg.QueryRowContext(ctx, ` - WITH tip AS ( - SELECT partition_offset-1 - FROM "pubsub"."topic_a" - WHERE partition=`+partitionS+` - ) - , to_claim AS ( - SELECT partition_offset - FROM "pubsub"."consumer_partition_offset" - WHERE consumer_group=$1 AND partition=$2 - FOR UPDATE SKIP LOCKED - ) - , claimed AS ( - UPDATE "pubsub"."consumer_partition_offset" consumer_partition_offset - SET consumer_partition_offset.partition_offset=to_claim.partition_offset+1 - FROM to_claim - WHERE consumer_partition_offset.consumer_group=$1 AND consumer_partition_offset.partition=$2 - RETURNING consumer_partition_offset.partition_offset - ) - SELECT partition_offset FROM claimed - `, group, partition) - if err := row.Err(); err != nil { - return nil, err - } - - var result any - if err := row.Scan(&result); err != nil { - return nil, err - } - - return result, row.Err() + pub := func(topic string, a any) error { + return io.EOF + } + sub := func(topic, group string) (int, any, error) { + return 0, nil, io.EOF + } + commit := func(topic, group string, offset int) error { + return io.EOF } go with.Every(ctx, time.Second, func() { for i := 0; i < 2; i++ { - if err := pub(1); err != nil { + topic := fmt.Sprintf("topic_%d", i) + if err := pub(topic, 1); err != nil { log.Printf("failed pub: %v", err) } else { log.Printf("pubbed") @@ -144,11 +58,16 @@ func run(ctx context.Context) error { with.Every(ctx, time.Second, func() { if err := func() error { for i := 0; i < 2; i++ { - group := fmt.Sprintf("group_%d", i) - if v, err := sub(group); err != nil { - return err - } else { - log.Printf("%s => %v", group, v) + topic := fmt.Sprintf("topic_%d", i) + for j := 0; j < 2; j++ { + group := fmt.Sprintf("group_%d", i) + if offset, v, err := sub(topic, group); err != nil { + return err + } else if err := commit(topic, group, offset); err != nil { + return err + } else { + log.Printf("subbed/committed offset %d: %v", offset, v) + } } } return nil