From d2061ef2b101f4d26c5c7213edffdeefbc2cbfab Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Wed, 10 Dec 2025 13:09:24 -0700 Subject: [PATCH] i think pub works --- cmd/pg-pubsub-demo/main.go | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index 41b90cc..44cfc6d 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "encoding/json" "flag" "fmt" "io" @@ -74,7 +75,37 @@ func run(ctx context.Context) error { } pub := func(topic string, a any) error { - return io.EOF + payload, _ := json.Marshal(a) + if _, err := pg.ExecContext(ctx, ` + INSERT INTO "pubsub".topic (name) + VALUES ($1) + ON CONFLICT DO NOTHING; + `, topic); err != nil { + return err + } + _, err := pg.ExecContext(ctx, ` + WITH topic AS ( + SELECT + name, + FLOOR(100 * RANDOM())::INTEGER % partitions AS partition + FROM "pubsub".topic + WHERE name=$1 + ) + , partition_offset AS ( + SELECT + topic.partition AS partition, + COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset + FROM topic + LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition + GROUP BY topic.partition + ) + INSERT INTO "pubsub".data + (topic_name, partition, partition_offset, payload) + SELECT + $1, partition_offset.partition, partition_offset.next_offset, $2 + FROM partition_offset + `, topic, payload) + return err } sub := func(topic, group string) (int, any, error) { return 0, nil, io.EOF