main
Bel LaPointe 2025-10-31 12:16:51 -06:00
commit 51307a7424
4 changed files with 198 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
**/*.sw*

190
cmd/pg-walspam/main.go Normal file
View File

@ -0,0 +1,190 @@
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
_ "github.com/lib/pq"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
d := fs.Duration("d", time.Second, "interval")
u := fs.Bool("unsafe", false, "do not assert safe to spam")
s := fs.Bool("superuser", true, "assume superuser else do less effective stuff")
zero := fs.Int("z", 0, "num of rand string spam")
q := fs.Int("q", 100, "size of queue in mb")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
log.Println("opening...")
pg, err := sql.Open("postgres", *c)
if err != nil {
panic(err)
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
case <-time.After(time.Second * 5):
}
}
}()
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS walspam_t;
CREATE TABLE IF NOT EXISTS walspam_t (i INT);
CREATE TABLE IF NOT EXISTS "pg-walspam" (k TEXT);
ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS v TEXT;
ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS x TEXT;
ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS y TEXT;
CREATE INDEX IF NOT EXISTS "pg-walspam_idx" ON "pg-walspam" (k);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_v" ON "pg-walspam" (k, v);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_x" ON "pg-walspam" (k, x);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_y" ON "pg-walspam" (k, y);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_k" ON "pg-walspam" (v, k);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_x" ON "pg-walspam" (v, x);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_y" ON "pg-walspam" (v, y);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_k" ON "pg-walspam" (x, k);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_v" ON "pg-walspam" (x, v);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_y" ON "pg-walspam" (x, y);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_k" ON "pg-walspam" (y, k);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_v" ON "pg-walspam" (y, v);
CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_x" ON "pg-walspam" (y, x);
`); err != nil {
panic(err)
}
log.Println("spamming...")
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := 0
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if *zero > 0 {
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-walspam" (k, v, x, y)
SELECT
substr(md5(random()::text), 1, 25),
substr(md5(random()::text), 1, 25),
substr(md5(random()::text), 1, 25),
substr(md5(random()::text), 1, 25)
FROM generate_series(0, %d);
`, *zero)); err != nil {
log.Println("\n! failed nonzero insert:", err)
}
}
once := &sync.Once{}
diskRoom := func() bool {
row := pg.QueryRowContext(ctx, `
SELECT COUNT(*)
FROM pg_ls_dir('pg_wal/archive_status')
WHERE pg_ls_dir LIKE '%.ready'
`)
var todo int
if err := row.Err(); err != nil {
log.Println("\n! cannot confirm archiving ok: failed select:", err)
return false
}
if err := row.Scan(&todo); err != nil {
log.Println("\n! cannot confirm archiving ok: failed scan:", err)
return false
}
if todo*16 > *q {
once.Do(func() {
log.Printf("waiting for pg_wal to shrink: pg_wal has %v wal files (~%dMB) in disk upload queue", todo, todo*16)
})
return false
}
return true
}
for *s && !*u {
if diskRoom() {
break
}
select {
case <-ctx.Done():
case <-ticker.C:
}
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
if !*s {
} else if _, err := pg.ExecContext(ctx, `
INSERT INTO walspam_t (i) VALUES (1);
SELECT pg_create_restore_point('test');
SELECT pg_switch_wal();
CHECKPOINT;
`); err != nil {
log.Println("\n! failed insert+walspam:", err)
return
}
n += 1
select {
case <-pticker.C:
log.Println(n)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}

5
go.mod Normal file
View File

@ -0,0 +1,5 @@
module pg
go 1.24.2
require github.com/lib/pq v1.10.9

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=