Compare commits
32 Commits
9fabbcb761
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ccfe60f0ac | ||
|
|
3ae725edad | ||
|
|
46e247a9e1 | ||
|
|
558d2f7f9d | ||
|
|
3f69a2d5d5 | ||
|
|
352d8e1fbc | ||
|
|
5f39cdad19 | ||
|
|
ab05743d4e | ||
|
|
90a3256b7b | ||
|
|
959e326329 | ||
|
|
77ef938408 | ||
|
|
c16f222e84 | ||
|
|
24de57aba0 | ||
|
|
d2061ef2b1 | ||
|
|
6f0b268321 | ||
|
|
43738a1d78 | ||
|
|
faa128400f | ||
|
|
0db7008aac | ||
|
|
18d8d9af77 | ||
|
|
8740f890da | ||
|
|
67879cf7b0 | ||
|
|
8b748280a9 | ||
|
|
72a63afdee | ||
|
|
7252c96dab | ||
|
|
bb75883800 | ||
|
|
47be0a513a | ||
|
|
633128e5c0 | ||
|
|
0298eeaa84 | ||
|
|
de8cb91f84 | ||
|
|
f7c25c6603 | ||
|
|
39123044b2 | ||
|
|
5b11ae1648 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1 +1,5 @@
|
|||||||
**/*.sw*
|
**/*.sw*
|
||||||
|
/cmd/pg-lo-demo/pg-lo-demo
|
||||||
|
/cmd/pg-pulse/pg-pulse
|
||||||
|
/cmd/pg-walspam/pg-walspam
|
||||||
|
/cmd/pg-template
|
||||||
|
|||||||
43
cmd/.template/main.go
Normal file
43
cmd/.template/main.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"flag"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "sqlite://", "conn string")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.SQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
|
with.Every(ctx, 5*time.Second, func() {
|
||||||
|
row := pg.QueryRowContext(ctx, `SELECT 1`)
|
||||||
|
var n int
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
log.Println("query err:", err)
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
log.Println("scan err:", err)
|
||||||
|
} else {
|
||||||
|
log.Println("pinged")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return ctx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
3
cmd/generate.go
Normal file
3
cmd/generate.go
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
//go:generate sh -c "if ! [ -d ./pg-template ]; then cp -r ./.template ./pg-template; fi"
|
||||||
73
cmd/pg-fill/main.go
Normal file
73
cmd/pg-fill/main.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||||
|
n := fs.Int("n", 12, "mb to generate")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
|
presently := func() int {
|
||||||
|
dbname := path.Base(*c)
|
||||||
|
|
||||||
|
var n2 int
|
||||||
|
if row := pg.QueryRowContext(ctx, `SELECT pg_database_size('`+dbname+`')/1024/1024`); row.Err() != nil {
|
||||||
|
return 0
|
||||||
|
} else if err := row.Scan(&n2); err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return n2
|
||||||
|
}
|
||||||
|
report := func() {
|
||||||
|
log.Printf("filled %vMiB of requested %vMiB", presently(), *n)
|
||||||
|
}
|
||||||
|
|
||||||
|
go with.Every(ctx, 5*time.Second, func() {
|
||||||
|
report()
|
||||||
|
})
|
||||||
|
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS fill_with_data(x BIGINT, y DOUBLE PRECISION, z DOUBLE PRECISION);
|
||||||
|
`); err != nil {
|
||||||
|
return fmt.Errorf("failed initial table setup: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://gist.github.com/ololobus/5b25c432f208d7eb31051a5f238dffff
|
||||||
|
// 2e6=1GB, so 2e6/8=12MB
|
||||||
|
for ctx.Err() == nil && presently() < *n {
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO fill_with_data (x, y, z)
|
||||||
|
SELECT ROUND(RANDOM()), RANDOM(), RANDOM()
|
||||||
|
FROM generate_series(1, 2e6/8)
|
||||||
|
`); err != nil {
|
||||||
|
return fmt.Errorf("failed lo_from_bytea: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
report()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
86
cmd/pg-lo-demo/main.go
Normal file
86
cmd/pg-lo-demo/main.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
|
cleanup := func() error {
|
||||||
|
rows, err := pg.QueryContext(ctx, `SELECT oid FROM oids`)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
for rows.Next() {
|
||||||
|
var oid any
|
||||||
|
if err := rows.Scan(&oid); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pg.ExecContext(ctx, `SELECT lo_unlink($1)`, oid)
|
||||||
|
}
|
||||||
|
return rows.Err()
|
||||||
|
}
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
if err := cleanup(); err != nil {
|
||||||
|
return fmt.Errorf("failed initial cleanup: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
DROP TABLE IF EXISTS oids;
|
||||||
|
CREATE TABLE IF NOT EXISTS oids (id INTEGER PRIMARY KEY, oid OID);
|
||||||
|
`); err != nil {
|
||||||
|
return fmt.Errorf("failed initial table setup: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO oids (
|
||||||
|
id,
|
||||||
|
oid
|
||||||
|
)
|
||||||
|
VALUES (
|
||||||
|
1,
|
||||||
|
lo_from_bytea(
|
||||||
|
0,
|
||||||
|
$1
|
||||||
|
)
|
||||||
|
) ON CONFLICT DO NOTHING
|
||||||
|
`, []byte("hello world")); err != nil {
|
||||||
|
return fmt.Errorf("failed lo_from_bytea: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var got []byte
|
||||||
|
row := pg.QueryRowContext(ctx, `SELECT lo_get((SELECT oid FROM oids WHERE id=1))`)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
} else if err := row.Scan(&got); err != nil {
|
||||||
|
return err
|
||||||
|
} else if !bytes.Equal(got, []byte("hello world")) {
|
||||||
|
return fmt.Errorf("weird lo_get: %q", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cleanup()
|
||||||
|
})
|
||||||
|
}
|
||||||
7
cmd/pg-lockless-fifo-demo/README.md
Normal file
7
cmd/pg-lockless-fifo-demo/README.md
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
https://trychroma.com/engineering/wal3
|
||||||
|
|
||||||
|
(generally with S3 but this is fine)
|
||||||
|
|
||||||
|
1. GET (current_node, cksum) FROM head_pointer
|
||||||
|
1. INSERT node (previous_node) VALUES (head_pointer) RETURNING node.id
|
||||||
|
1. UPDATE head_pointer (current_node, cksum) VALUES (node.id, blockchain) IF head_pointer.cksum=cksum
|
||||||
188
cmd/pg-lockless-fifo-demo/main.go
Normal file
188
cmd/pg-lockless-fifo-demo/main.go
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "sqlite://", "conn string")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.SQL(ctx, *c, func(db *sql.DB) error {
|
||||||
|
if _, err := db.ExecContext(ctx, `
|
||||||
|
DROP TABLE IF EXISTS lockless_fifo_node;
|
||||||
|
DROP TABLE IF EXISTS lockless_fifo_head;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS lockless_fifo_node (
|
||||||
|
id BIGSERIAL NOT NULL PRIMARY KEY,
|
||||||
|
payload BYTEA,
|
||||||
|
previous_id BIGSERIAL,
|
||||||
|
cksum BYTEA
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS lockless_fifo_head (
|
||||||
|
id BIGSERIAL NOT NULL PRIMARY KEY,
|
||||||
|
node_id BIGSERIAL NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
INSERT INTO lockless_fifo_node (id, payload) VALUES (0, NULL) ON CONFLICT DO NOTHING;
|
||||||
|
INSERT INTO lockless_fifo_head (id, node_id) VALUES (0, 0) ON CONFLICT DO NOTHING;
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
head := func() (int64, []byte, error) {
|
||||||
|
var idBefore int64
|
||||||
|
var cksumBefore []byte
|
||||||
|
row := db.QueryRowContext(ctx, `
|
||||||
|
SELECT lockless_fifo_node.id, lockless_fifo_node.cksum
|
||||||
|
FROM lockless_fifo_node
|
||||||
|
JOIN lockless_fifo_head ON lockless_fifo_head.node_id=lockless_fifo_node.id
|
||||||
|
`)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to query head: %w", err)
|
||||||
|
}
|
||||||
|
err := row.Scan(&idBefore, &cksumBefore)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to scan head: %w", err)
|
||||||
|
}
|
||||||
|
return idBefore, cksumBefore, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
push := func(a any) error {
|
||||||
|
payload, _ := json.Marshal(a)
|
||||||
|
|
||||||
|
idBefore, cksumBefore, err := head()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := md5.New()
|
||||||
|
hash.Write(cksumBefore)
|
||||||
|
cksumAfter := hash.Sum(payload)
|
||||||
|
|
||||||
|
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
|
||||||
|
defer tx.Rollback()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := tx.ExecContext(ctx, `
|
||||||
|
INSERT INTO lockless_fifo_node (
|
||||||
|
id,
|
||||||
|
payload,
|
||||||
|
previous_id,
|
||||||
|
cksum
|
||||||
|
) VALUES (
|
||||||
|
$1,
|
||||||
|
$2,
|
||||||
|
$3,
|
||||||
|
$4
|
||||||
|
)
|
||||||
|
`, idBefore+1, payload, idBefore, cksumAfter); err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := tx.ExecContext(ctx, `
|
||||||
|
UPDATE lockless_fifo_head
|
||||||
|
SET node_id=(
|
||||||
|
SELECT id
|
||||||
|
FROM lockless_fifo_node
|
||||||
|
WHERE cksum=$1
|
||||||
|
)
|
||||||
|
`, cksumAfter); err != nil {
|
||||||
|
return err
|
||||||
|
} else if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
peek := func() (any, error) {
|
||||||
|
curId, _, err := head()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
row := db.QueryRowContext(ctx, `
|
||||||
|
SELECT previous_id
|
||||||
|
FROM lockless_fifo_node
|
||||||
|
WHERE id=$1
|
||||||
|
`, curId)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to select node %d: %w", curId, err)
|
||||||
|
}
|
||||||
|
var previousId sql.NullInt64
|
||||||
|
if err := row.Scan(&previousId); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to scan node %d: %w", curId, err)
|
||||||
|
} else if prev := previousId.Int64; !previousId.Valid || prev == curId {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
curId = prev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
row := db.QueryRowContext(ctx, `
|
||||||
|
SELECT payload
|
||||||
|
FROM lockless_fifo_node
|
||||||
|
WHERE id=$1
|
||||||
|
`, curId)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var payload []byte
|
||||||
|
if err := row.Scan(&payload); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to scan tail: %w", err)
|
||||||
|
}
|
||||||
|
var a any
|
||||||
|
if len(payload) > 0 {
|
||||||
|
err = json.Unmarshal(payload, &a)
|
||||||
|
}
|
||||||
|
return a, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pushes atomic.Int64
|
||||||
|
go with.Every(ctx, time.Second, func() {
|
||||||
|
id, cksum, err := head()
|
||||||
|
if err == nil {
|
||||||
|
log.Printf("HEAD id=%d vs expected %d (cksum=%s)", id, pushes.Load(), base64.StdEncoding.EncodeToString(cksum))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
go with.Every(ctx, 10*time.Millisecond, func() {
|
||||||
|
if err := push(time.Now()); err != nil {
|
||||||
|
//log.Printf("failed push: %v", err)
|
||||||
|
} else {
|
||||||
|
pushes.Add(1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
with.Every(ctx, time.Second, func() {
|
||||||
|
if _, err := peek(); err != nil {
|
||||||
|
log.Printf("failed peek: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return ctx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
1
cmd/pg-pubsub-demo/README.md
Normal file
1
cmd/pg-pubsub-demo/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks
|
||||||
261
cmd/pg-pubsub-demo/main.go
Normal file
261
cmd/pg-pubsub-demo/main.go
Normal file
@@ -0,0 +1,261 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
|
log.Println("setup...")
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
CREATE SCHEMA IF NOT EXISTS "pubsub";
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".topic (
|
||||||
|
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
|
||||||
|
created_at TIMESTAMP DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMP DEFAULT NOW(),
|
||||||
|
partitions INTEGER NOT NULL DEFAULT 10,
|
||||||
|
name TEXT UNIQUE
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".group (
|
||||||
|
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
|
||||||
|
created_at TIMESTAMP DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMP DEFAULT NOW(),
|
||||||
|
name TEXT UNIQUE
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNLOGGED TABLE IF NOT EXISTS "pubsub".group_topic_partition_offset (
|
||||||
|
group_name TEXT REFERENCES "pubsub".group(name) ON DELETE CASCADE,
|
||||||
|
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
|
||||||
|
partition INTEGER NOT NULL DEFAULT 0,
|
||||||
|
partition_offset INTEGER NOT NULL,
|
||||||
|
PRIMARY KEY (group_name, topic_name, partition)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data (
|
||||||
|
topic_name TEXT REFERENCES "pubsub".topic(name) ON DELETE CASCADE,
|
||||||
|
partition INTEGER NOT NULL DEFAULT 0,
|
||||||
|
partition_offset INTEGER NOT NULL DEFAULT 0,
|
||||||
|
payload BYTEA NOT NULL,
|
||||||
|
PRIMARY KEY (topic_name, partition, partition_offset)
|
||||||
|
) PARTITION BY HASH (topic_name, partition);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_0
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 0);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_1
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 1);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_2
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 2);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_3
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 3);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_4
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 4);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_5
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 5);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_6
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 6);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_7
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 7);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_8
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 8);
|
||||||
|
CREATE TABLE IF NOT EXISTS "pubsub".data_9
|
||||||
|
PARTITION OF "pubsub".data
|
||||||
|
FOR VALUES WITH (modulus 10, remainder 9);
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pub := func(topic string, a any) error {
|
||||||
|
payload, _ := json.Marshal(a)
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO "pubsub".topic (name)
|
||||||
|
VALUES ($1)
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
`, topic); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err := pg.ExecContext(ctx, `
|
||||||
|
WITH topic AS (
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
|
||||||
|
FROM "pubsub".topic
|
||||||
|
WHERE name=$1
|
||||||
|
)
|
||||||
|
, partition_offset AS (
|
||||||
|
SELECT
|
||||||
|
topic.partition AS partition,
|
||||||
|
COALESCE(MAX(data.partition_offset), -1)+1 AS next_offset
|
||||||
|
FROM topic
|
||||||
|
LEFT JOIN "pubsub".data data ON data.topic_name=topic.name AND data.partition=topic.partition
|
||||||
|
GROUP BY topic.partition
|
||||||
|
)
|
||||||
|
INSERT INTO "pubsub".data
|
||||||
|
(topic_name, partition, partition_offset, payload)
|
||||||
|
SELECT
|
||||||
|
$1, partition_offset.partition, partition_offset.next_offset, $2
|
||||||
|
FROM partition_offset
|
||||||
|
`, topic, payload)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sub := func(topic, group string) (int, int, any, error) {
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO "pubsub".group (name)
|
||||||
|
VALUES ($1)
|
||||||
|
ON CONFLICT DO NOTHING;
|
||||||
|
`, group); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := pg.QueryContext(ctx, `
|
||||||
|
WITH topic AS (
|
||||||
|
SELECT
|
||||||
|
name,
|
||||||
|
FLOOR(100 * RANDOM())::INTEGER % partitions AS partition
|
||||||
|
FROM "pubsub".topic
|
||||||
|
WHERE name=$1
|
||||||
|
)
|
||||||
|
, group_topic_partition_offset AS (
|
||||||
|
SELECT
|
||||||
|
topic.name AS topic_name,
|
||||||
|
topic.partition AS partition,
|
||||||
|
COALESCE(group_topic_partition_offset.partition_offset, 0) AS partition_offset
|
||||||
|
FROM topic
|
||||||
|
LEFT JOIN "pubsub".group_topic_partition_offset group_topic_partition_offset
|
||||||
|
ON topic.name=group_topic_partition_offset.topic_name
|
||||||
|
AND topic.partition=group_topic_partition_offset.partition
|
||||||
|
AND group_topic_partition_offset.group_name=$2
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
group_offset.partition,
|
||||||
|
group_offset.partition_offset,
|
||||||
|
data.payload
|
||||||
|
FROM group_topic_partition_offset group_offset
|
||||||
|
JOIN "pubsub".data data
|
||||||
|
ON group_offset.topic_name=data.topic_name
|
||||||
|
AND group_offset.partition=data.partition
|
||||||
|
AND group_offset.partition_offset=data.partition_offset
|
||||||
|
`, topic, group)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
partition := -1
|
||||||
|
offset := -1
|
||||||
|
var b []byte
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&partition, &offset, &b); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload any
|
||||||
|
if len(b) == 0 {
|
||||||
|
} else if err := json.Unmarshal(b, &payload); err != nil {
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return partition, offset, payload, rows.Err()
|
||||||
|
}
|
||||||
|
commit := func(topic, group string, partition, offset int) error {
|
||||||
|
_, err := pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO "pubsub".group_topic_partition_offset
|
||||||
|
(topic_name, group_name, partition, partition_offset)
|
||||||
|
VALUES
|
||||||
|
($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (group_name, topic_name, partition) DO UPDATE
|
||||||
|
SET topic_name=$1, group_name=$2, partition=$3, partition_offset=$4
|
||||||
|
`, topic, group, partition, offset+1)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
started := time.Now()
|
||||||
|
var pubs atomic.Uint64
|
||||||
|
var subs atomic.Uint64
|
||||||
|
var lastCommit string
|
||||||
|
go with.Every(ctx, 2*time.Second, func() {
|
||||||
|
pubs := pubs.Load()
|
||||||
|
subs := subs.Load()
|
||||||
|
seconds := uint64(time.Since(started).Seconds())
|
||||||
|
if seconds == 0 {
|
||||||
|
seconds = 1
|
||||||
|
}
|
||||||
|
log.Printf("pubbed %v per second (%v), subbed %v per second (%v) over %vs, last committed %s",
|
||||||
|
pubs/seconds, pubs,
|
||||||
|
subs/seconds, subs,
|
||||||
|
seconds, lastCommit,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
topic := fmt.Sprintf("topic_%d", i)
|
||||||
|
go with.Every(ctx, 1, func() {
|
||||||
|
if err := pub(topic, 1); err != nil {
|
||||||
|
log.Printf("failed pub: %v", err)
|
||||||
|
} else {
|
||||||
|
pubs.Add(1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
topic := fmt.Sprintf("topic_%d", i)
|
||||||
|
for j := 0; j < 2; j++ {
|
||||||
|
group := fmt.Sprintf("group_%d", i)
|
||||||
|
go with.Every(ctx, 1, func() {
|
||||||
|
if err := func() error {
|
||||||
|
if partition, offset, _, err := sub(topic, group); err != nil {
|
||||||
|
return fmt.Errorf("failed sub: %w", err)
|
||||||
|
} else if partition == -1 {
|
||||||
|
} else if err := commit(topic, group, partition, offset); err != nil {
|
||||||
|
return fmt.Errorf("failed commit: %w", err)
|
||||||
|
} else {
|
||||||
|
subs.Add(1)
|
||||||
|
lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
log.Printf("failed subs: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
return ctx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -7,17 +7,20 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
|
if err := with.Context(run); err != nil {
|
||||||
defer can()
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||||
d := fs.Duration("d", time.Second, "interval")
|
d := fs.Duration("d", time.Second, "interval")
|
||||||
@@ -25,154 +28,92 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("opening...")
|
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
pg, err := sql.Open("postgres", *c)
|
log.Println("staging...")
|
||||||
if err != nil {
|
if _, err := pg.ExecContext(ctx, `
|
||||||
panic(err)
|
DROP TABLE IF EXISTS "pg-pulse";
|
||||||
}
|
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
|
||||||
defer func() {
|
`); err != nil {
|
||||||
log.Println("closed:", pg.Close())
|
return err
|
||||||
}()
|
}
|
||||||
|
|
||||||
func() {
|
log.Println("spamming...")
|
||||||
pinged := make(chan bool)
|
|
||||||
defer close(pinged)
|
var downtime time.Duration
|
||||||
for {
|
okc := make(chan bool)
|
||||||
log.Println("pinging...")
|
defer close(okc)
|
||||||
go func() {
|
go func() {
|
||||||
err := pg.PingContext(ctx)
|
lastOK := true
|
||||||
if err != nil {
|
wasOK := time.Now()
|
||||||
log.Println("!", err)
|
for ok := range okc {
|
||||||
|
if isNewlyOK := ok && !lastOK; isNewlyOK {
|
||||||
|
downtime += time.Since(wasOK)
|
||||||
}
|
}
|
||||||
select {
|
lastOK = ok
|
||||||
case pinged <- err == nil:
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case ok := <-pinged:
|
|
||||||
if ok {
|
if ok {
|
||||||
return
|
wasOK = time.Now()
|
||||||
}
|
}
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Println("staging...")
|
|
||||||
if _, err := pg.ExecContext(ctx, `
|
|
||||||
DROP TABLE IF EXISTS "pg-pulse";
|
|
||||||
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
|
|
||||||
`); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("spamming...")
|
|
||||||
|
|
||||||
var downtime time.Duration
|
|
||||||
okc := make(chan bool)
|
|
||||||
defer close(okc)
|
|
||||||
go func() {
|
|
||||||
lastOK := true
|
|
||||||
wasOK := time.Now()
|
|
||||||
for ok := range okc {
|
|
||||||
if isNewlyOK := ok && !lastOK; isNewlyOK {
|
|
||||||
downtime += time.Since(wasOK)
|
|
||||||
}
|
|
||||||
lastOK = ok
|
|
||||||
if ok {
|
|
||||||
wasOK = time.Now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
pushOK := func(v bool) {
|
|
||||||
select {
|
|
||||||
case okc <- v:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer pticker.Stop()
|
|
||||||
ticker := time.NewTicker(*d)
|
|
||||||
defer ticker.Stop()
|
|
||||||
n := uint(0)
|
|
||||||
isHA := false
|
|
||||||
isHA0 := false
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
func() {
|
|
||||||
ctx, can := context.WithCancel(ctx)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
|
|
||||||
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
|
|
||||||
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
|
|
||||||
`)); err != nil {
|
|
||||||
pushOK(false)
|
|
||||||
log.Println("\n! failed nonzero insert:", err)
|
|
||||||
} else {
|
|
||||||
n += 1
|
|
||||||
pushOK(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
var lostWrites uint
|
|
||||||
ha, ha1 := false, false
|
|
||||||
row := pg.QueryRowContext(ctx, `
|
|
||||||
SELECT
|
|
||||||
(
|
|
||||||
SELECT COUNT(*) > 0
|
|
||||||
FROM pg_replication_slots
|
|
||||||
WHERE slot_name LIKE 'dpg_%_a_ha__'
|
|
||||||
) AS is_ha,
|
|
||||||
(
|
|
||||||
SELECT COUNT(*) > 0
|
|
||||||
FROM pg_replication_slots
|
|
||||||
WHERE slot_name LIKE 'dpg_%_a_ha_0'
|
|
||||||
) AS is_ha1,
|
|
||||||
(
|
|
||||||
SELECT $1 - COUNT(*)
|
|
||||||
FROM "pg-pulse"
|
|
||||||
) AS lost_writes
|
|
||||||
`, n)
|
|
||||||
if err := row.Err(); err != nil {
|
|
||||||
log.Println("\n! failed getting ha-stuff:", err)
|
|
||||||
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
|
|
||||||
log.Println("\n! failed scanning ha-stuff:", err)
|
|
||||||
} else {
|
|
||||||
isHA = isHA || ha
|
|
||||||
isHA0 = isHA && !ha1
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, can = context.WithTimeout(ctx, time.Second*5)
|
|
||||||
defer can()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-pticker.C:
|
|
||||||
using := ""
|
|
||||||
if isHA && isHA0 {
|
|
||||||
using = "using ha0"
|
|
||||||
} else if isHA && !isHA0 {
|
|
||||||
using = "using ha1"
|
|
||||||
}
|
|
||||||
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-done:
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
select {
|
pushOK := func(v bool) {
|
||||||
case <-ctx.Done():
|
select {
|
||||||
case <-ticker.C:
|
case okc <- v:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
n := uint(0)
|
||||||
|
isHA := false
|
||||||
|
isHA0 := false
|
||||||
|
var lostWrites uint
|
||||||
|
go with.Every(ctx, 5*time.Second, func() {
|
||||||
|
using := ""
|
||||||
|
if isHA && isHA0 {
|
||||||
|
using = "using ha0"
|
||||||
|
} else if isHA && !isHA0 {
|
||||||
|
using = "using ha1"
|
||||||
|
}
|
||||||
|
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
|
||||||
|
})
|
||||||
|
|
||||||
|
with.GoEvery(ctx, *d, func() {
|
||||||
|
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
|
||||||
|
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
|
||||||
|
`)); err != nil {
|
||||||
|
pushOK(false)
|
||||||
|
log.Println("\n! failed nonzero insert:", err)
|
||||||
|
} else {
|
||||||
|
n += 1
|
||||||
|
pushOK(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
ha, ha1 := false, false
|
||||||
|
row := pg.QueryRowContext(ctx, `
|
||||||
|
SELECT
|
||||||
|
(
|
||||||
|
SELECT COUNT(*) > 0
|
||||||
|
FROM pg_replication_slots
|
||||||
|
WHERE slot_name LIKE 'dpg_%_a_ha__'
|
||||||
|
) AS is_ha,
|
||||||
|
(
|
||||||
|
SELECT COUNT(*) > 0
|
||||||
|
FROM pg_replication_slots
|
||||||
|
WHERE slot_name LIKE 'dpg_%_a_ha_0'
|
||||||
|
) AS is_ha1,
|
||||||
|
(
|
||||||
|
SELECT $1 - COUNT(*)
|
||||||
|
FROM "pg-pulse"
|
||||||
|
) AS lost_writes
|
||||||
|
`, n)
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
log.Println("\n! failed getting ha-stuff:", err)
|
||||||
|
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
|
||||||
|
log.Println("\n! failed scanning ha-stuff:", err)
|
||||||
|
} else {
|
||||||
|
isHA = isHA || ha
|
||||||
|
isHA0 = isHA && !ha1
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return ctx.Err()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
1
cmd/pg-queue-demo/README.md
Normal file
1
cmd/pg-queue-demo/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks
|
||||||
164
cmd/pg-queue-demo/main.go
Normal file
164
cmd/pg-queue-demo/main.go
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"pg/src/with"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if err := with.Context(run); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context) error {
|
||||||
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
|
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
|
||||||
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
|
||||||
|
log.Println("creating tables...")
|
||||||
|
if _, err := pg.ExecContext(ctx, `
|
||||||
|
CREATE SCHEMA IF NOT EXISTS "queue";
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "queue"."queue" (
|
||||||
|
id UUID NOT NULL DEFAULT GEN_RANDOM_UUID() PRIMARY KEY,
|
||||||
|
payload BYTEA NOT NULL,
|
||||||
|
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||||
|
attempts INTEGER DEFAULT 0,
|
||||||
|
reservation UUID,
|
||||||
|
checked_out_at TIMESTAMP
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS "queue"."archive" (
|
||||||
|
id UUID NOT NULL PRIMARY KEY,
|
||||||
|
payload BYTEA NOT NULL,
|
||||||
|
created_at TIMESTAMP NOT NULL,
|
||||||
|
archived_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue := func(a any) error {
|
||||||
|
b, _ := json.Marshal(a)
|
||||||
|
_, err := pg.ExecContext(ctx, `INSERT INTO "queue"."queue" (payload) VALUES ($1)`, b)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mustCheckout := func() (string, []byte, error) {
|
||||||
|
rows, err := pg.QueryContext(ctx, `
|
||||||
|
UPDATE "queue"."queue"
|
||||||
|
SET
|
||||||
|
reservation = GEN_RANDOM_UUID(),
|
||||||
|
attempts = attempts+1,
|
||||||
|
checked_out_at = NOW()
|
||||||
|
WHERE
|
||||||
|
id IN (
|
||||||
|
SELECT id
|
||||||
|
FROM "queue"."queue"
|
||||||
|
WHERE
|
||||||
|
attempts < 100
|
||||||
|
AND (reservation IS NULL OR NOW() - checked_out_at > INTERVAL '1 minute')
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
RETURNING reservation, payload
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var reservation string
|
||||||
|
var payload []byte
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&reservation, &payload); err != nil {
|
||||||
|
return "", nil, fmt.Errorf("failed to scan reservation,payload: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return reservation, payload, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve := func(reservation string) error {
|
||||||
|
_, err := pg.ExecContext(ctx, `
|
||||||
|
WITH deleted AS (
|
||||||
|
DELETE FROM "queue"."queue"
|
||||||
|
WHERE
|
||||||
|
reservation=$1
|
||||||
|
AND NOW() - checked_out_at < INTERVAL '1 minute'
|
||||||
|
RETURNING id, payload, created_at
|
||||||
|
)
|
||||||
|
INSERT INTO "queue"."archive" (
|
||||||
|
id, payload, created_at
|
||||||
|
) SELECT * FROM deleted
|
||||||
|
`, reservation)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
var enqueues atomic.Uint64
|
||||||
|
var dequeues atomic.Uint64
|
||||||
|
go with.Every(ctx, 2*time.Second, func() {
|
||||||
|
seconds := uint64(time.Since(start).Seconds())
|
||||||
|
if seconds < 1 {
|
||||||
|
seconds = 1
|
||||||
|
}
|
||||||
|
e := enqueues.Load()
|
||||||
|
d := dequeues.Load()
|
||||||
|
log.Printf("%d enqueues (%d per second), %d dequeues (%d per second) over %d seconds",
|
||||||
|
e,
|
||||||
|
e/seconds,
|
||||||
|
d,
|
||||||
|
d/seconds,
|
||||||
|
seconds,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
go with.Every(ctx, 1, func() {
|
||||||
|
if err := enqueue(1); err != nil {
|
||||||
|
log.Printf("failed to enqueue: %v", err)
|
||||||
|
} else {
|
||||||
|
enqueues.Add(1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
with.Every(ctx, 1, func() {
|
||||||
|
reservation, payload, err := mustCheckout()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("failed checkout:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if reservation == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(payload) != "1" {
|
||||||
|
log.Printf("bad checkout: unexpected payload %q", payload)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := resolve(reservation); err != nil {
|
||||||
|
log.Printf("failed to resolve: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dequeues.Add(1)
|
||||||
|
})
|
||||||
|
return ctx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
18
go.mod
18
go.mod
@@ -2,4 +2,20 @@ module pg
|
|||||||
|
|
||||||
go 1.24.2
|
go 1.24.2
|
||||||
|
|
||||||
require github.com/lib/pq v1.10.9
|
require (
|
||||||
|
github.com/lib/pq v1.10.9
|
||||||
|
modernc.org/sqlite v1.40.1
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
|
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
||||||
|
golang.org/x/sys v0.36.0 // indirect
|
||||||
|
modernc.org/libc v1.66.10 // indirect
|
||||||
|
modernc.org/mathutil v1.7.1 // indirect
|
||||||
|
modernc.org/memory v1.11.0 // indirect
|
||||||
|
)
|
||||||
|
|||||||
49
go.sum
49
go.sum
@@ -1,2 +1,51 @@
|
|||||||
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||||
|
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
|
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||||
|
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
||||||
|
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
||||||
|
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
|
||||||
|
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
|
||||||
|
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
|
||||||
|
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||||
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
|
||||||
|
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||||
|
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
|
||||||
|
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
|
||||||
|
modernc.org/cc/v4 v4.26.5 h1:xM3bX7Mve6G8K8b+T11ReenJOT+BmVqQj0FY5T4+5Y4=
|
||||||
|
modernc.org/cc/v4 v4.26.5/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||||
|
modernc.org/ccgo/v4 v4.28.1 h1:wPKYn5EC/mYTqBO373jKjvX2n+3+aK7+sICCv4Fjy1A=
|
||||||
|
modernc.org/ccgo/v4 v4.28.1/go.mod h1:uD+4RnfrVgE6ec9NGguUNdhqzNIeeomeXf6CL0GTE5Q=
|
||||||
|
modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA=
|
||||||
|
modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
|
||||||
|
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
||||||
|
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
||||||
|
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
|
||||||
|
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
|
||||||
|
modernc.org/libc v1.66.10 h1:yZkb3YeLx4oynyR+iUsXsybsX4Ubx7MQlSYEw4yj59A=
|
||||||
|
modernc.org/libc v1.66.10/go.mod h1:8vGSEwvoUoltr4dlywvHqjtAqHBaw0j1jI7iFBTAr2I=
|
||||||
|
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||||
|
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||||
|
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
||||||
|
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||||
|
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
||||||
|
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||||
|
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||||
|
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||||
|
modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY=
|
||||||
|
modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE=
|
||||||
|
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||||
|
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||||
|
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||||
|
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||||
|
|||||||
17
src/with/ctx.go
Normal file
17
src/with/ctx.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
package with
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Context(foo func(context.Context) error) error {
|
||||||
|
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
if err := foo(ctx); err != nil && ctx.Err() == nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
34
src/with/every.go
Normal file
34
src/with/every.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package with
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GoEvery(ctx context.Context, d time.Duration, foo func()) {
|
||||||
|
every(ctx, d, foo, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Every(ctx context.Context, d time.Duration, foo func()) {
|
||||||
|
every(ctx, d, foo, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func every(ctx context.Context, d time.Duration, foo func(), async bool) {
|
||||||
|
ticker := time.NewTicker(d)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
everyTry(ctx, foo, async)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func everyTry(ctx context.Context, foo func(), async bool) {
|
||||||
|
if async {
|
||||||
|
go foo()
|
||||||
|
} else {
|
||||||
|
foo()
|
||||||
|
}
|
||||||
|
}
|
||||||
12
src/with/pg.go
Normal file
12
src/with/pg.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package with
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PSQL(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
|
||||||
|
return _sql(ctx, "postgres", conn, foo)
|
||||||
|
}
|
||||||
71
src/with/sql.go
Normal file
71
src/with/sql.go
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
package with
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "modernc.org/sqlite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SQL(ctx context.Context, conn string, foo func(*sql.DB) error) error {
|
||||||
|
u, err := url.Parse(conn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch u.Scheme {
|
||||||
|
case "sqlite":
|
||||||
|
return Sqlite(ctx, conn, foo)
|
||||||
|
case "postgres", "postgresql":
|
||||||
|
return PSQL(ctx, conn, foo)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unknown sql scheme %q", u.Scheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _sql(ctx context.Context, engine, conn string, foo func(db *sql.DB) error) error {
|
||||||
|
log.Printf("opening %s %s...", engine, conn)
|
||||||
|
db, err := sql.Open(engine, conn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
log.Println("closed:", db.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
func() {
|
||||||
|
pinged := make(chan bool)
|
||||||
|
defer close(pinged)
|
||||||
|
for {
|
||||||
|
log.Println("pinging...")
|
||||||
|
go func() {
|
||||||
|
err := db.PingContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("!", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case pinged <- err == nil:
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case ok := <-pinged:
|
||||||
|
if ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
log.Println("connected")
|
||||||
|
|
||||||
|
return foo(db)
|
||||||
|
}
|
||||||
14
src/with/sqlite.go
Normal file
14
src/with/sqlite.go
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
package with
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
_ "modernc.org/sqlite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Sqlite(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
|
||||||
|
conn = strings.TrimPrefix(conn, "sqlite://")
|
||||||
|
return _sql(ctx, "sqlite", conn, foo)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user