pg/cmd/pg-pubsub-demo/main.go

262 lines
7.4 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"sync/atomic"
"time"
"pg/src/with"
_ "github.com/lib/pq"
)
func main() {
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("setup...")
if _, err := pg.ExecContext(ctx, `
CREATE SCHEMA IF NOT EXISTS "pubsub";
CREATE TABLE IF NOT EXISTS "pubsub".topic (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
partitions INTEGER NOT NULL DEFAULT 10,
name TEXT UNIQUE
);
CREATE TABLE IF NOT EXISTS "pubsub".group (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
name TEXT UNIQUE
);
CREATE UNLOGGED TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset (
group_name TEXT REFERENCES "pubsub".group(name) ON DELETE CASCADE,
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
partition INTEGER NOT NULL DEFAULT 0,
partition_offset INTEGER NOT NULL,
PRIMARY KEY (group_name, topic_name, partition)
);
CREATE TABLE IF NOT EXISTS "pubsub".data (
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
partition INTEGER NOT NULL DEFAULT 0,
partition_offset INTEGER NOT NULL DEFAULT 0,
payload BYTEA NOT NULL,
PRIMARY KEY (topic_name, partition, partition_offset)
) PARTITION BY HASH (topic_name, partition);
CREATE TABLE IF NOT EXISTS "pubsub".data_0
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 0);
CREATE TABLE IF NOT EXISTS "pubsub".data_1
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 1);
CREATE TABLE IF NOT EXISTS "pubsub".data_2
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 2);
CREATE TABLE IF NOT EXISTS "pubsub".data_3
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 3);
CREATE TABLE IF NOT EXISTS "pubsub".data_4
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 4);
CREATE TABLE IF NOT EXISTS "pubsub".data_5
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 5);
CREATE TABLE IF NOT EXISTS "pubsub".data_6
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 6);
CREATE TABLE IF NOT EXISTS "pubsub".data_7
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 7);
CREATE TABLE IF NOT EXISTS "pubsub".data_8
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 8);
CREATE TABLE IF NOT EXISTS "pubsub".data_9
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 9);
`); err != nil {
return err
}
pub := func(topic string, a any) error {
payload, _ := json.Marshal(a)
if _, err := pg.ExecContext(ctx, `
INSERT INTO "pubsub".topic (name)
VALUES ($1)
ON CONFLICT DO NOTHING;
`, topic); err != nil {
return err
}
_, err := pg.ExecContext(ctx, `
WITH topic AS (
SELECT
name,
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
FROM "pubsub".topic
WHERE name=$1
)
, partition_offset AS (
SELECT
topic.partition AS partition,
COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset
FROM topic
LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition
GROUP BY topic.partition
)
INSERT INTO "pubsub".data
(topic_name, partition, partition_offset, payload)
SELECT
$1, partition_offset.partition, partition_offset.next_offset, $2
FROM partition_offset
`, topic, payload)
return err
}
sub := func(topic, group string) (int, int, any, error) {
if _, err := pg.ExecContext(ctx, `
INSERT INTO "pubsub".group (name)
VALUES ($1)
ON CONFLICT DO NOTHING;
`, group); err != nil {
return 0, 0, nil, err
}
rows, err := pg.QueryContext(ctx, `
WITH topic AS (
SELECT
name,
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
FROM "pubsub".topic
WHERE name=$1
)
, group_topic_partition_offset AS (
SELECT
topic.name AS topic_name,
topic.partition AS partition,
COALESCE(group_topic_partition_offset.partition_offset, 0) AS partition_offset
FROM topic
LEFT JOIN "pubsub".group_topic_partition_offset group_topic_partition_offset
ON topic.name=group_topic_partition_offset.topic_name
AND topic.partition=group_topic_partition_offset.partition
AND group_topic_partition_offset.group_name=$2
)
SELECT
group_offset.partition,
group_offset.partition_offset,
data.payload
FROM group_topic_partition_offset group_offset
JOIN "pubsub".data data
ON group_offset.topic_name=data.topic_name
AND group_offset.partition=data.partition
AND group_offset.partition_offset=data.partition_offset
`, topic, group)
if err != nil {
return 0, 0, nil, err
}
partition := -1
offset := -1
var b []byte
for rows.Next() {
if err := rows.Scan(&partition, &offset, &b); err != nil {
return 0, 0, nil, err
}
}
var payload any
if len(b) == 0 {
} else if err := json.Unmarshal(b, &payload); err != nil {
return 0, 0, nil, err
}
return partition, offset, payload, rows.Err()
}
commit := func(topic, group string, partition, offset int) error {
_, err := pg.ExecContext(ctx, `
INSERT INTO "pubsub".group_topic_partition_offset
(topic_name, group_name, partition, partition_offset)
VALUES
($1, $2, $3, $4)
ON CONFLICT (group_name, topic_name, partition) DO UPDATE
SET topic_name=$1, group_name=$2, partition=$3, partition_offset=$4
`, topic, group, partition, offset+1)
return err
}
started := time.Now()
var pubs atomic.Uint64
var subs atomic.Uint64
var lastCommit string
go with.Every(ctx, 2*time.Second, func() {
pubs := pubs.Load()
subs := subs.Load()
seconds := uint64(time.Since(started).Seconds())
if seconds == 0 {
seconds = 1
}
log.Printf("pubbed %v per second (%v), subbed %v per second (%v) over %vs, last committed %s",
pubs/seconds, pubs,
subs/seconds, subs,
seconds, lastCommit,
)
})
for i := 0; i < 2; i++ {
topic := fmt.Sprintf("topic_%d", i)
go with.Every(ctx, 1, func() {
if err := pub(topic, 1); err != nil {
log.Printf("failed pub: %v", err)
} else {
pubs.Add(1)
}
})
}
for i := 0; i < 2; i++ {
topic := fmt.Sprintf("topic_%d", i)
for j := 0; j < 2; j++ {
group := fmt.Sprintf("group_%d", i)
go with.Every(ctx, 1, func() {
if err := func() error {
if partition, offset, _, err := sub(topic, group); err != nil {
return fmt.Errorf("failed sub: %w", err)
} else if partition == -1 {
} else if err := commit(topic, group, partition, offset); err != nil {
return fmt.Errorf("failed commit: %w", err)
} else {
subs.Add(1)
lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset)
}
return nil
}(); err != nil {
log.Printf("failed subs: %v", err)
}
})
}
}
<-ctx.Done()
return ctx.Err()
})
}