package main import ( "context" "database/sql" "encoding/json" "flag" "fmt" "log" "os" "sync/atomic" "time" "pg/src/with" _ "github.com/lib/pq" ) func main() { if err := with.Context(run); err != nil { panic(err) } } func run(ctx context.Context) error { fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } return with.PSQL(ctx, *c, func(pg *sql.DB) error { log.Println("setup...") if _, err := pg.ExecContext(ctx, ` DROP SCHEMA IF EXISTS "pubsub" CASCADE; CREATE SCHEMA IF NOT EXISTS "pubsub"; CREATE TABLE IF NOT EXISTS "pubsub".topic ( id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), partitions INTEGER NOT NULL DEFAULT 10, name TEXT UNIQUE ); CREATE TABLE IF NOT EXISTS "pubsub".group ( id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), name TEXT UNIQUE ); CREATE UNLOGGED TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset ( group_name TEXT REFERENCES "pubsub".group(name) ON DELETE CASCADE, topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE, partition INTEGER NOT NULL DEFAULT 0, partition_offset INTEGER NOT NULL, PRIMARY KEY (group_name, topic_name, partition) ); CREATE TABLE IF NOT EXISTS "pubsub".data ( topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE, partition INTEGER NOT NULL DEFAULT 0, partition_offset INTEGER NOT NULL DEFAULT 0, payload BYTEA NOT NULL, PRIMARY KEY (topic_name, partition, partition_offset) ) PARTITION BY HASH (topic_name, partition); CREATE TABLE IF NOT EXISTS "pubsub".data_0 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 0); CREATE TABLE IF NOT EXISTS "pubsub".data_1 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 1); CREATE TABLE IF NOT EXISTS "pubsub".data_2 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 2); CREATE TABLE IF NOT EXISTS "pubsub".data_3 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 3); CREATE TABLE IF NOT EXISTS "pubsub".data_4 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 4); CREATE TABLE IF NOT EXISTS "pubsub".data_5 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 5); CREATE TABLE IF NOT EXISTS "pubsub".data_6 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 6); CREATE TABLE IF NOT EXISTS "pubsub".data_7 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 7); CREATE TABLE IF NOT EXISTS "pubsub".data_8 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 8); CREATE TABLE IF NOT EXISTS "pubsub".data_9 PARTITION OF "pubsub".data FOR VALUES WITH (modulus 10, remainder 9); `); err != nil { return err } pub := func(topic string, a any) error { payload, _ := json.Marshal(a) if _, err := pg.ExecContext(ctx, ` INSERT INTO "pubsub".topic (name) VALUES ($1) ON CONFLICT DO NOTHING; `, topic); err != nil { return err } _, err := pg.ExecContext(ctx, ` WITH topic AS ( SELECT name, FLOOR(100 * RANDOM())::INTEGER % partitions AS partition FROM "pubsub".topic WHERE name=$1 ) , partition_offset AS ( SELECT topic.partition AS partition, COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset FROM topic LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition GROUP BY topic.partition ) INSERT INTO "pubsub".data (topic_name, partition, partition_offset, payload) SELECT $1, partition_offset.partition, partition_offset.next_offset, $2 FROM partition_offset `, topic, payload) return err } sub := func(topic, group string) (int, int, any, error) { if _, err := pg.ExecContext(ctx, ` INSERT INTO "pubsub".group (name) VALUES ($1) ON CONFLICT DO NOTHING; `, group); err != nil { return 0, 0, nil, err } rows, err := pg.QueryContext(ctx, ` WITH topic AS ( SELECT name, FLOOR(100 * RANDOM())::INTEGER % partitions AS partition FROM "pubsub".topic WHERE name=$1 ) , group_topic_partition_offset AS ( SELECT topic.name AS topic_name, topic.partition AS partition, COALESCE(group_topic_partition_offset.partition_offset, 0) AS partition_offset FROM topic LEFT JOIN "pubsub".group_topic_partition_offset group_topic_partition_offset ON topic.name=group_topic_partition_offset.topic_name AND topic.partition=group_topic_partition_offset.partition AND group_topic_partition_offset.group_name=$2 ) SELECT group_offset.partition, group_offset.partition_offset, data.payload FROM group_topic_partition_offset group_offset JOIN "pubsub".data data ON group_offset.topic_name=data.topic_name AND group_offset.partition=data.partition AND group_offset.partition_offset=data.partition_offset `, topic, group) if err != nil { return 0, 0, nil, err } partition := -1 offset := -1 var b []byte for rows.Next() { if err := rows.Scan(&partition, &offset, &b); err != nil { return 0, 0, nil, err } } var payload any if len(b) == 0 { } else if err := json.Unmarshal(b, &payload); err != nil { return 0, 0, nil, err } return partition, offset, payload, rows.Err() } commit := func(topic, group string, partition, offset int) error { _, err := pg.ExecContext(ctx, ` INSERT INTO "pubsub".group_topic_partition_offset (topic_name, group_name, partition, partition_offset) VALUES ($1, $2, $3, $4) ON CONFLICT (group_name, topic_name, partition) DO UPDATE SET topic_name=$1, group_name=$2, partition=$3, partition_offset=$4 `, topic, group, partition, offset+1) return err } started := time.Now() var pubs atomic.Uint64 var subs atomic.Uint64 var lastCommit string go with.Every(ctx, 2*time.Second, func() { pubs := pubs.Load() subs := subs.Load() seconds := uint64(time.Since(started).Seconds()) if seconds == 0 { seconds = 1 } log.Printf("pubbed %v per second (%v), subbed %v per second (%v) over %vs, last committed %s", pubs/seconds, pubs, subs/seconds, subs, seconds, lastCommit, ) }) go with.Every(ctx, 1, func() { for i := 0; i < 2; i++ { topic := fmt.Sprintf("topic_%d", i) if err := pub(topic, 1); err != nil { log.Printf("failed pub: %v", err) } else { pubs.Add(1) } } }) with.Every(ctx, 1, func() { if err := func() error { for i := 0; i < 2; i++ { topic := fmt.Sprintf("topic_%d", i) for j := 0; j < 2; j++ { group := fmt.Sprintf("group_%d", i) if partition, offset, _, err := sub(topic, group); err != nil { return fmt.Errorf("failed sub: %w", err) } else if partition == -1 { } else if err := commit(topic, group, partition, offset); err != nil { return fmt.Errorf("failed commit: %w", err) } else { subs.Add(1) lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset) } } } return nil }(); err != nil { log.Printf("failed subs: %v", err) } }) return ctx.Err() }) }