package main import ( "context" "database/sql" "encoding/json" "flag" "fmt" "log" "math/rand" "os" "strconv" "time" "pg/src/with" _ "github.com/lib/pq" ) func main() { if err := with.Context(run); err != nil { panic(err) } } func run(ctx context.Context) error { fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } return with.PSQL(ctx, *c, func(pg *sql.DB) error { log.Println("creating tables...") for _, sql := range []string{ `CREATE SCHEMA IF NOT EXISTS "pubsub"`, `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a" ( partition INT PRIMARY KEY, partition_offset BIGINT NOT NULL DEFAULT 0 )`, `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_0" ( id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, partition_offset BIGINT UNIQUE NOT NULL, payload BYTEA NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() )`, `CREATE TABLE IF NOT EXISTS "pubsub"."topic_a_partition_1" ( id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY, partition_offset BIGINT UNIQUE NOT NULL, payload BYTEA NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() )`, `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (0, 0) ON CONFLICT DO NOTHING`, `INSERT INTO "pubsub"."topic_a" (partition, partition_offset) VALUES (1, 0) ON CONFLICT DO NOTHING`, `CREATE TABLE IF NOT EXISTS "pubsub"."consumer_partition_offset" ( consumer_group TEXT NOT NULL, partition INT REFERENCES "pubsub"."topic_a"("partition"), partition_offset BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (consumer_group, partition) )`, } { if _, err := pg.ExecContext(ctx, sql); err != nil { return fmt.Errorf("failed setup sql %s: %w", sql, err) } } pub := func(a any) error { b, _ := json.Marshal(a) partition := rand.Int() % 2 partitionS := strconv.Itoa(partition) _, err := pg.ExecContext(ctx, ` WITH reserve AS ( UPDATE "pubsub"."topic_a" SET partition_offset=partition_offset+1 WHERE partition=`+partitionS+` RETURNING (partition_offset-1) AS offset ) INSERT INTO "pubsub"."topic_a_partition_`+partitionS+`" (partition_offset, payload) SELECT reserve.offset, $1 FROM reserve `, b) return err } sub := func(group string) (any, error) { partition := rand.Int() % 2 partitionS := strconv.Itoa(partition) if _, err := pg.ExecContext(ctx, ` INSERT INTO "pubsub"."consumer_partition_offset" (consumer_group, partition) VALUES ($1, $2) ON CONFLICT DO NOTHING `, group, partition); err != nil { return nil, err } row := pg.QueryRowContext(ctx, ` WITH tip AS ( SELECT partition_offset-1 FROM "pubsub"."topic_a" WHERE partition=`+partitionS+` ) , to_claim AS ( SELECT partition_offset FROM "pubsub"."consumer_partition_offset" WHERE consumer_group=$1 AND partition=$2 FOR UPDATE SKIP LOCKED ) , claimed AS ( UPDATE "pubsub"."consumer_partition_offset" consumer_partition_offset SET consumer_partition_offset.partition_offset=to_claim.partition_offset+1 FROM to_claim WHERE consumer_partition_offset.consumer_group=$1 AND consumer_partition_offset.partition=$2 RETURNING consumer_partition_offset.partition_offset ) SELECT partition_offset FROM claimed `, group, partition) if err := row.Err(); err != nil { return nil, err } var result any if err := row.Scan(&result); err != nil { return nil, err } return result, row.Err() } go with.Every(ctx, time.Second, func() { for i := 0; i < 2; i++ { if err := pub(1); err != nil { log.Printf("failed pub: %v", err) } else { log.Printf("pubbed") } } }) with.Every(ctx, time.Second, func() { if err := func() error { for i := 0; i < 2; i++ { group := fmt.Sprintf("group_%d", i) if v, err := sub(group); err != nil { return err } else { log.Printf("%s => %v", group, v) } } return nil }(); err != nil { log.Printf("failed subs: %v", err) } }) return ctx.Err() }) }