diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index 38f3d93..744348d 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -8,6 +8,7 @@ import ( "fmt" "log" "os" + "sync/atomic" "time" "pg/src/with" @@ -31,8 +32,7 @@ func run(ctx context.Context) error { return with.PSQL(ctx, *c, func(pg *sql.DB) error { log.Println("setup...") if _, err := pg.ExecContext(ctx, ` - DROP SCHEMA "pubsub" CASCADE; - + DROP SCHEMA IF EXISTS "pubsub" CASCADE; CREATE SCHEMA IF NOT EXISTS "pubsub"; CREATE TABLE IF NOT EXISTS "pubsub".topic ( @@ -50,7 +50,7 @@ func run(ctx context.Context) error { name TEXT UNIQUE ); - CREATE TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset ( + CREATE UNLOGGED TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset ( group_name TEXT REFERENCES "pubsub".group(name) ON DELETE CASCADE, topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE, partition INTEGER NOT NULL DEFAULT 0, @@ -68,13 +68,34 @@ func run(ctx context.Context) error { CREATE TABLE IF NOT EXISTS "pubsub".data_0 PARTITION OF "pubsub".data - FOR VALUES WITH (modulus 3, remainder 0); + FOR VALUES WITH (modulus 10, remainder 0); CREATE TABLE IF NOT EXISTS "pubsub".data_1 PARTITION OF "pubsub".data - FOR VALUES WITH (modulus 3, remainder 1); + FOR VALUES WITH (modulus 10, remainder 1); CREATE TABLE IF NOT EXISTS "pubsub".data_2 PARTITION OF "pubsub".data - FOR VALUES WITH (modulus 3, remainder 2); + FOR VALUES WITH (modulus 10, remainder 2); + CREATE TABLE IF NOT EXISTS "pubsub".data_3 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 3); + CREATE TABLE IF NOT EXISTS "pubsub".data_4 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 4); + CREATE TABLE IF NOT EXISTS "pubsub".data_5 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 5); + CREATE TABLE IF NOT EXISTS "pubsub".data_6 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 6); + CREATE TABLE IF NOT EXISTS "pubsub".data_7 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 7); + CREATE TABLE IF NOT EXISTS "pubsub".data_8 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 8); + CREATE TABLE IF NOT EXISTS "pubsub".data_9 + PARTITION OF "pubsub".data + FOR VALUES WITH (modulus 10, remainder 9); `); err != nil { return err } @@ -183,30 +204,49 @@ func run(ctx context.Context) error { return err } - go with.Every(ctx, 100*time.Millisecond, func() { + started := time.Now() + var pubs atomic.Uint64 + var subs atomic.Uint64 + var lastCommit string + go with.Every(ctx, 2*time.Second, func() { + pubs := pubs.Load() + subs := subs.Load() + seconds := uint64(time.Since(started).Seconds()) + if seconds == 0 { + seconds = 1 + } + log.Printf("pubbed %v per second (%v), subbed %v per second (%v) over %vs, last committed %s", + pubs/seconds, pubs, + subs/seconds, subs, + seconds, lastCommit, + ) + }) + + go with.Every(ctx, 1, func() { for i := 0; i < 2; i++ { topic := fmt.Sprintf("topic_%d", i) if err := pub(topic, 1); err != nil { log.Printf("failed pub: %v", err) } else { - log.Printf("pubbed") + pubs.Add(1) } } }) - with.Every(ctx, time.Second, func() { + with.Every(ctx, 1, func() { if err := func() error { for i := 0; i < 2; i++ { topic := fmt.Sprintf("topic_%d", i) for j := 0; j < 2; j++ { group := fmt.Sprintf("group_%d", i) - if partition, offset, v, err := sub(topic, group); err != nil { + if partition, offset, _, err := sub(topic, group); err != nil { return fmt.Errorf("failed sub: %w", err) } else if partition == -1 { } else if err := commit(topic, group, partition, offset); err != nil { return fmt.Errorf("failed commit: %w", err) } else { - log.Printf("committed %s/%s/%d offset %d: %v", topic, group, partition, offset, v) + subs.Add(1) + lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset) } } }