Compare commits

...

5 Commits

Author SHA1 Message Date
Bel LaPointe c16f222e84 kinda slow, even with an unlogged consumer group table 2025-12-10 13:40:23 -07:00
Bel LaPointe 24de57aba0 logs imply it works neat 2025-12-10 13:29:13 -07:00
Bel LaPointe d2061ef2b1 i think pub works 2025-12-10 13:09:24 -07:00
Bel LaPointe 6f0b268321 i think i like tables 2025-12-10 12:47:00 -07:00
Bel LaPointe 43738a1d78 restub i do it myself 2025-12-10 12:33:13 -07:00
1 changed files with 191 additions and 92 deletions

View File

@ -7,9 +7,8 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
"pg/src/with"
@ -31,124 +30,224 @@ func run(ctx context.Context) error {
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("creating tables...")
for _, sql := range []string{
`CREATE SCHEMA IF NOT EXISTS "pubsub"`,
log.Println("setup...")
if _, err := pg.ExecContext(ctx, `
DROP SCHEMA IF EXISTS "pubsub" CASCADE;
CREATE SCHEMA IF NOT EXISTS "pubsub";
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a" (
partition INT PRIMARY KEY,
partition_offset BIGINT NOT NULL DEFAULT 0
)`,
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_0" (
CREATE TABLE IF NOT EXISTS "pubsub".topic (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
partition_offset BIGINT UNIQUE NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)`,
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_1" (
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
partitions INTEGER NOT NULL DEFAULT 10,
name TEXT UNIQUE
);
CREATE TABLE IF NOT EXISTS "pubsub".group (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
partition_offset BIGINT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
name TEXT UNIQUE
);
CREATE UNLOGGED TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset (
group_name TEXT REFERENCES "pubsub".group(name) ON DELETE CASCADE,
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
partition INTEGER NOT NULL DEFAULT 0,
partition_offset INTEGER NOT NULL,
PRIMARY KEY (group_name, topic_name, partition)
);
CREATE TABLE IF NOT EXISTS "pubsub".data (
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
partition INTEGER NOT NULL DEFAULT 0,
partition_offset INTEGER NOT NULL DEFAULT 0,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)`,
PRIMARY KEY (topic_name, partition, partition_offset)
) PARTITION BY HASH (topic_name, partition);
`INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (0, 0) ON CONFLICT DO NOTHING`,
`INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (1, 0) ON CONFLICT DO NOTHING`,
`CREATE TABLE IF NOT EXISTS "pubsub"."consumer_partition_offset" (
consumer_group TEXT NOT NULL,
partition INT REFERENCES "pubsub"."topic_a"("partition"),
partition_offset BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (consumer_group, partition)
)`,
} {
if _, err := pg.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("failed setup sql %s: %w", sql, err)
}
}
pub := func(a any) error {
b, _ := json.Marshal(a)
partition := rand.Int() % 2
partitionS := strconv.Itoa(partition)
_, err := pg.ExecContext(ctx, `
WITH reserve AS (
UPDATE "pubsub"."topic_a"
SET partition_offset=partition_offset+1
WHERE partition=`+partitionS+`
RETURNING (partition_offset-1) AS offset
)
INSERT INTO "pubsub"."topic_a_partition_`+partitionS+`" (partition_offset, payload)
SELECT reserve.offset, $1 FROM reserve
`, b)
CREATE TABLE IF NOT EXISTS "pubsub".data_0
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 0);
CREATE TABLE IF NOT EXISTS "pubsub".data_1
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 1);
CREATE TABLE IF NOT EXISTS "pubsub".data_2
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 2);
CREATE TABLE IF NOT EXISTS "pubsub".data_3
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 3);
CREATE TABLE IF NOT EXISTS "pubsub".data_4
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 4);
CREATE TABLE IF NOT EXISTS "pubsub".data_5
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 5);
CREATE TABLE IF NOT EXISTS "pubsub".data_6
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 6);
CREATE TABLE IF NOT EXISTS "pubsub".data_7
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 7);
CREATE TABLE IF NOT EXISTS "pubsub".data_8
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 8);
CREATE TABLE IF NOT EXISTS "pubsub".data_9
PARTITION OF "pubsub".data
FOR VALUES WITH (modulus 10, remainder 9);
`); err != nil {
return err
}
sub := func(group string) (any, error) {
partition := rand.Int() % 2
partitionS := strconv.Itoa(partition)
pub := func(topic string, a any) error {
payload, _ := json.Marshal(a)
if _, err := pg.ExecContext(ctx, `
INSERT INTO
"pubsub"."consumer_partition_offset"
(consumer_group, partition)
VALUES
($1, $2)
ON CONFLICT DO NOTHING
`, group, partition); err != nil {
return nil, err
INSERT INTO "pubsub".topic (name)
VALUES ($1)
ON CONFLICT DO NOTHING;
`, topic); err != nil {
return err
}
_, err := pg.ExecContext(ctx, `
WITH topic AS (
SELECT
name,
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
FROM "pubsub".topic
WHERE name=$1
)
, partition_offset AS (
SELECT
topic.partition AS partition,
COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset
FROM topic
LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition
GROUP BY topic.partition
)
INSERT INTO "pubsub".data
(topic_name, partition, partition_offset, payload)
SELECT
$1, partition_offset.partition, partition_offset.next_offset, $2
FROM partition_offset
`, topic, payload)
return err
}
sub := func(topic, group string) (int, int, any, error) {
if _, err := pg.ExecContext(ctx, `
INSERT INTO "pubsub".group (name)
VALUES ($1)
ON CONFLICT DO NOTHING;
`, group); err != nil {
return 0, 0, nil, err
}
row := pg.QueryRowContext(ctx, `
WITH tip AS (
SELECT partition_offset-1
FROM "pubsub"."topic_a"
WHERE partition=`+partitionS+`
rows, err := pg.QueryContext(ctx, `
WITH topic AS (
SELECT
name,
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
FROM "pubsub".topic
WHERE name=$1
)
, to_claim AS (
SELECT partition_offset
FROM "pubsub"."consumer_partition_offset"
WHERE consumer_group=$1 AND partition=$2
FOR UPDATE SKIP LOCKED
, group_topic_partition_offset AS (
SELECT
topic.name AS topic_name,
topic.partition AS partition,
COALESCE(group_topic_partition_offset.partition_offset, 0) AS partition_offset
FROM topic
LEFT JOIN "pubsub".group_topic_partition_offset group_topic_partition_offset
ON topic.name=group_topic_partition_offset.topic_name
AND topic.partition=group_topic_partition_offset.partition
AND group_topic_partition_offset.group_name=$2
)
, claimed AS (
UPDATE "pubsub"."consumer_partition_offset" consumer_partition_offset
SET consumer_partition_offset.partition_offset=to_claim.partition_offset+1
FROM to_claim
WHERE consumer_partition_offset.consumer_group=$1 AND consumer_partition_offset.partition=$2
RETURNING consumer_partition_offset.partition_offset
)
SELECT partition_offset FROM claimed
`, group, partition)
if err := row.Err(); err != nil {
return nil, err
SELECT
group_offset.partition,
group_offset.partition_offset,
data.payload
FROM group_topic_partition_offset group_offset
JOIN "pubsub".data data
ON group_offset.topic_name=data.topic_name
AND group_offset.partition=data.partition
AND group_offset.partition_offset=data.partition_offset
`, topic, group)
if err != nil {
return 0, 0, nil, err
}
var result any
if err := row.Scan(&result); err != nil {
return nil, err
partition := -1
offset := -1
var b []byte
for rows.Next() {
if err := rows.Scan(&partition, &offset, &b); err != nil {
return 0, 0, nil, err
}
}
return result, row.Err()
var payload any
if len(b) == 0 {
} else if err := json.Unmarshal(b, &payload); err != nil {
return 0, 0, nil, err
}
return partition, offset, payload, rows.Err()
}
commit := func(topic, group string, partition, offset int) error {
_, err := pg.ExecContext(ctx, `
INSERT INTO "pubsub".group_topic_partition_offset
(topic_name, group_name, partition, partition_offset)
VALUES
($1, $2, $3, $4)
ON CONFLICT (group_name, topic_name, partition) DO UPDATE
SET topic_name=$1, group_name=$2, partition=$3, partition_offset=$4
`, topic, group, partition, offset+1)
return err
}
go with.Every(ctx, time.Second, func() {
started := time.Now()
var pubs atomic.Uint64
var subs atomic.Uint64
var lastCommit string
go with.Every(ctx, 2*time.Second, func() {
pubs := pubs.Load()
subs := subs.Load()
seconds := uint64(time.Since(started).Seconds())
if seconds == 0 {
seconds = 1
}
log.Printf("pubbed %v per second (%v), subbed %v per second (%v) over %vs, last committed %s",
pubs/seconds, pubs,
subs/seconds, subs,
seconds, lastCommit,
)
})
go with.Every(ctx, 1, func() {
for i := 0; i < 2; i++ {
if err := pub(1); err != nil {
topic := fmt.Sprintf("topic_%d", i)
if err := pub(topic, 1); err != nil {
log.Printf("failed pub: %v", err)
} else {
log.Printf("pubbed")
pubs.Add(1)
}
}
})
with.Every(ctx, time.Second, func() {
with.Every(ctx, 1, func() {
if err := func() error {
for i := 0; i < 2; i++ {
group := fmt.Sprintf("group_%d", i)
if v, err := sub(group); err != nil {
return err
} else {
log.Printf("%s => %v", group, v)
topic := fmt.Sprintf("topic_%d", i)
for j := 0; j < 2; j++ {
group := fmt.Sprintf("group_%d", i)
if partition, offset, _, err := sub(topic, group); err != nil {
return fmt.Errorf("failed sub: %w", err)
} else if partition == -1 {
} else if err := commit(topic, group, partition, offset); err != nil {
return fmt.Errorf("failed commit: %w", err)
} else {
subs.Add(1)
lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset)
}
}
}
return nil