diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index 41b90cc..44cfc6d 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "encoding/json" "flag" "fmt" "io" @@ -74,7 +75,37 @@ func run(ctx context.Context) error { } pub := func(topic string, a any) error { - return io.EOF + payload, _ := json.Marshal(a) + if _, err := pg.ExecContext(ctx, ` + INSERT INTO "pubsub".topic (name) + VALUES ($1) + ON CONFLICT DO NOTHING; + `, topic); err != nil { + return err + } + _, err := pg.ExecContext(ctx, ` + WITH topic AS ( + SELECT + name, + FLOOR(100 * RANDOM())::INTEGER % partitions AS partition + FROM "pubsub".topic + WHERE name=$1 + ) + , partition_offset AS ( + SELECT + topic.partition AS partition, + COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset + FROM topic + LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition + GROUP BY topic.partition + ) + INSERT INTO "pubsub".data + (topic_name, partition, partition_offset, payload) + SELECT + $1, partition_offset.partition, partition_offset.next_offset, $2 + FROM partition_offset + `, topic, payload) + return err } sub := func(topic, group string) (int, any, error) { return 0, nil, io.EOF