i think pub works
parent
6f0b268321
commit
d2061ef2b1
|
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
@ -74,7 +75,37 @@ func run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub := func(topic string, a any) error {
|
pub := func(topic string, a any) error {
|
||||||
return io.EOF
|
payload, _ := json.Marshal(a)
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO "pubsub".topic (name)
|
||||||
|
VALUES ($1)
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
`, topic); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err := pg.ExecContext(ctx, `
|
||||||
|
WITH topic AS (
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
|
||||||
|
FROM "pubsub".topic
|
||||||
|
WHERE name=$1
|
||||||
|
)
|
||||||
|
, partition_offset AS (
|
||||||
|
SELECT
|
||||||
|
topic.partition AS partition,
|
||||||
|
COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset
|
||||||
|
FROM topic
|
||||||
|
LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition
|
||||||
|
GROUP BY topic.partition
|
||||||
|
)
|
||||||
|
INSERT INTO "pubsub".data
|
||||||
|
(topic_name, partition, partition_offset, payload)
|
||||||
|
SELECT
|
||||||
|
$1, partition_offset.partition, partition_offset.next_offset, $2
|
||||||
|
FROM partition_offset
|
||||||
|
`, topic, payload)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
sub := func(topic, group string) (int, any, error) {
|
sub := func(topic, group string) (int, any, error) {
|
||||||
return 0, nil, io.EOF
|
return 0, nil, io.EOF
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue