restub i do it myself

main
Bel LaPointe 2025-12-10 12:33:13 -07:00
parent faa128400f
commit 43738a1d78
1 changed files with 23 additions and 104 deletions

View File

@ -3,13 +3,11 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"strconv"
"time"
"pg/src/with"
@ -31,109 +29,25 @@ func run(ctx context.Context) error {
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("creating tables...")
for _, sql := range []string{
`CREATE SCHEMA IF NOT EXISTS "pubsub"`,
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a" (
partition INT PRIMARY KEY,
partition_offset BIGINT NOT NULL DEFAULT 0
)`,
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_0" (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
partition_offset BIGINT UNIQUE NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)`,
`CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_1" (
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
partition_offset BIGINT UNIQUE NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)`,
`INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (0, 0) ON CONFLICT DO NOTHING`,
`INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (1, 0) ON CONFLICT DO NOTHING`,
`CREATE TABLE IF NOT EXISTS "pubsub"."consumer_partition_offset" (
consumer_group TEXT NOT NULL,
partition INT REFERENCES "pubsub"."topic_a"("partition"),
partition_offset BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (consumer_group, partition)
)`,
} {
if _, err := pg.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("failed setup sql %s: %w", sql, err)
}
}
pub := func(a any) error {
b, _ := json.Marshal(a)
partition := rand.Int() % 2
partitionS := strconv.Itoa(partition)
_, err := pg.ExecContext(ctx, `
WITH reserve AS (
UPDATE "pubsub"."topic_a"
SET partition_offset=partition_offset+1
WHERE partition=`+partitionS+`
RETURNING (partition_offset-1) AS offset
)
INSERT INTO "pubsub"."topic_a_partition_`+partitionS+`" (partition_offset, payload)
SELECT reserve.offset, $1 FROM reserve
`, b)
if _, err := pg.ExecContext(ctx, `
`); err != nil {
return err
}
sub := func(group string) (any, error) {
partition := rand.Int() % 2
partitionS := strconv.Itoa(partition)
if _, err := pg.ExecContext(ctx, `
INSERT INTO
"pubsub"."consumer_partition_offset"
(consumer_group, partition)
VALUES
($1, $2)
ON CONFLICT DO NOTHING
`, group, partition); err != nil {
return nil, err
}
row := pg.QueryRowContext(ctx, `
WITH tip AS (
SELECT partition_offset-1
FROM "pubsub"."topic_a"
WHERE partition=`+partitionS+`
)
, to_claim AS (
SELECT partition_offset
FROM "pubsub"."consumer_partition_offset"
WHERE consumer_group=$1 AND partition=$2
FOR UPDATE SKIP LOCKED
)
, claimed AS (
UPDATE "pubsub"."consumer_partition_offset" consumer_partition_offset
SET consumer_partition_offset.partition_offset=to_claim.partition_offset+1
FROM to_claim
WHERE consumer_partition_offset.consumer_group=$1 AND consumer_partition_offset.partition=$2
RETURNING consumer_partition_offset.partition_offset
)
SELECT partition_offset FROM claimed
`, group, partition)
if err := row.Err(); err != nil {
return nil, err
}
var result any
if err := row.Scan(&result); err != nil {
return nil, err
}
return result, row.Err()
pub := func(topic string, a any) error {
return io.EOF
}
sub := func(topic, group string) (int, any, error) {
return 0, nil, io.EOF
}
commit := func(topic, group string, offset int) error {
return io.EOF
}
go with.Every(ctx, time.Second, func() {
for i := 0; i < 2; i++ {
if err := pub(1); err != nil {
topic := fmt.Sprintf("topic_%d", i)
if err := pub(topic, 1); err != nil {
log.Printf("failed pub: %v", err)
} else {
log.Printf("pubbed")
@ -144,11 +58,16 @@ func run(ctx context.Context) error {
with.Every(ctx, time.Second, func() {
if err := func() error {
for i := 0; i < 2; i++ {
group := fmt.Sprintf("group_%d", i)
if v, err := sub(group); err != nil {
return err
} else {
log.Printf("%s => %v", group, v)
topic := fmt.Sprintf("topic_%d", i)
for j := 0; j < 2; j++ {
group := fmt.Sprintf("group_%d", i)
if offset, v, err := sub(topic, group); err != nil {
return err
} else if err := commit(topic, group, offset); err != nil {
return err
} else {
log.Printf("subbed/committed offset %d: %v", offset, v)
}
}
}
return nil