diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index 44cfc6d..38f3d93 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -6,7 +6,6 @@ import ( "encoding/json" "flag" "fmt" - "io" "log" "os" "time" @@ -67,9 +66,15 @@ func run(ctx context.Context) error { PRIMARY KEY (topic_name, partition, partition_offset) ) PARTITION BY HASH (topic_name, partition); - CREATE TABLE IF NOT EXISTS "pubsub".data_0 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 3, remainder 0); - CREATE TABLE IF NOT EXISTS "pubsub".data_1 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 3, remainder 1); - CREATE TABLE IF NOT EXISTS "pubsub".data_2 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 3, remainder 2); + CREATE TABLE IF NOT EXISTS "pubsub".data_0 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 3, remainder 0); + CREATE TABLE IF NOT EXISTS "pubsub".data_1 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 3, remainder 1); + CREATE TABLE IF NOT EXISTS "pubsub".data_2 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 3, remainder 2); `); err != nil { return err } @@ -107,14 +112,78 @@ func run(ctx context.Context) error { `, topic, payload) return err } - sub := func(topic, group string) (int, any, error) { - return 0, nil, io.EOF + sub := func(topic, group string) (int, int, any, error) { + if _, err := pg.ExecContext(ctx, ` + INSERT INTO "pubsub".group (name) + VALUES ($1) + ON CONFLICT DO NOTHING; + `, group); err != nil { + return 0, 0, nil, err + } + + rows, err := pg.QueryContext(ctx, ` + WITH topic AS ( + SELECT + name, + FLOOR(100 * RANDOM())::INTEGER % partitions AS partition + FROM "pubsub".topic + WHERE name=$1 + ) + , group_topic_partition_offset AS ( + SELECT + topic.name AS topic_name, + topic.partition AS partition, + COALESCE(group_topic_partition_offset.partition_offset, 0) AS partition_offset + FROM topic + LEFT JOIN "pubsub".group_topic_partition_offset group_topic_partition_offset + ON topic.name=group_topic_partition_offset.topic_name + AND topic.partition=group_topic_partition_offset.partition + AND group_topic_partition_offset.group_name=$2 + ) + SELECT + group_offset.partition, + group_offset.partition_offset, + data.payload + FROM group_topic_partition_offset group_offset + JOIN "pubsub".data data + ON group_offset.topic_name=data.topic_name + AND group_offset.partition=data.partition + AND group_offset.partition_offset=data.partition_offset + `, topic, group) + if err != nil { + return 0, 0, nil, err + } + + partition := -1 + offset := -1 + var b []byte + for rows.Next() { + if err := rows.Scan(&partition, &offset, &b); err != nil { + return 0, 0, nil, err + } + } + + var payload any + if len(b) == 0 { + } else if err := json.Unmarshal(b, &payload); err != nil { + return 0, 0, nil, err + } + + return partition, offset, payload, rows.Err() } - commit := func(topic, group string, offset int) error { - return io.EOF + commit := func(topic, group string, partition, offset int) error { + _, err := pg.ExecContext(ctx, ` + INSERT INTO "pubsub".group_topic_partition_offset + (topic_name, group_name, partition, partition_offset) + VALUES + ($1, $2, $3, $4) + ON CONFLICT (group_name, topic_name, partition) DO UPDATE + SET topic_name=$1, group_name=$2, partition=$3, partition_offset=$4 + `, topic, group, partition, offset+1) + return err } - go with.Every(ctx, time.Second, func() { + go with.Every(ctx, 100*time.Millisecond, func() { for i := 0; i < 2; i++ { topic := fmt.Sprintf("topic_%d", i) if err := pub(topic, 1); err != nil { @@ -131,12 +200,13 @@ func run(ctx context.Context) error { topic := fmt.Sprintf("topic_%d", i) for j := 0; j < 2; j++ { group := fmt.Sprintf("group_%d", i) - if offset, v, err := sub(topic, group); err != nil { - return err - } else if err := commit(topic, group, offset); err != nil { - return err + if partition, offset, v, err := sub(topic, group); err != nil { + return fmt.Errorf("failed sub: %w", err) + } else if partition == -1 { + } else if err := commit(topic, group, partition, offset); err != nil { + return fmt.Errorf("failed commit: %w", err) } else { - log.Printf("subbed/committed offset %d: %v", offset, v) + log.Printf("committed %s/%s/%d offset %d: %v", topic, group, partition, offset, v) } } }