commit 51307a742401c4bbee58f83d6077571e532a1254 Author: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Fri Oct 31 12:16:51 2025 -0600 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..78fd378 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +**/*.sw* diff --git a/cmd/pg-walspam/main.go b/cmd/pg-walspam/main.go new file mode 100644 index 0000000..551ef86 --- /dev/null +++ b/cmd/pg-walspam/main.go @@ -0,0 +1,190 @@ +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + _ "github.com/lib/pq" +) + +func main() { + ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) + defer can() + + fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") + d := fs.Duration("d", time.Second, "interval") + u := fs.Bool("unsafe", false, "do not assert safe to spam") + s := fs.Bool("superuser", true, "assume superuser else do less effective stuff") + zero := fs.Int("z", 0, "num of rand string spam") + q := fs.Int("q", 100, "size of queue in mb") + if err := fs.Parse(os.Args[1:]); err != nil { + panic(err) + } + + log.Println("opening...") + pg, err := sql.Open("postgres", *c) + if err != nil { + panic(err) + } + defer func() { + log.Println("closed:", pg.Close()) + }() + + func() { + pinged := make(chan bool) + defer close(pinged) + for { + log.Println("pinging...") + go func() { + err := pg.PingContext(ctx) + if err != nil { + log.Println("!", err) + } + select { + case pinged <- err == nil: + case <-ctx.Done(): + case <-time.After(time.Second * 5): + } + }() + select { + case <-ctx.Done(): + return + case ok := <-pinged: + if ok { + return + } + case <-time.After(time.Second * 5): + } + } + }() + + log.Println("staging...") + if _, err := pg.ExecContext(ctx, ` + DROP TABLE IF EXISTS walspam_t; + CREATE TABLE IF NOT EXISTS walspam_t (i INT); + CREATE TABLE IF NOT EXISTS "pg-walspam" (k TEXT); + ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS v TEXT; + ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS x TEXT; + ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS y TEXT; + CREATE INDEX IF NOT EXISTS "pg-walspam_idx" ON "pg-walspam" (k); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_v" ON "pg-walspam" (k, v); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_x" ON "pg-walspam" (k, x); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_y" ON "pg-walspam" (k, y); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_k" ON "pg-walspam" (v, k); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_x" ON "pg-walspam" (v, x); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_y" ON "pg-walspam" (v, y); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_k" ON "pg-walspam" (x, k); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_v" ON "pg-walspam" (x, v); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_y" ON "pg-walspam" (x, y); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_k" ON "pg-walspam" (y, k); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_v" ON "pg-walspam" (y, v); + CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_x" ON "pg-walspam" (y, x); + `); err != nil { + panic(err) + } + + log.Println("spamming...") + pticker := time.NewTicker(5 * time.Second) + defer pticker.Stop() + ticker := time.NewTicker(*d) + defer ticker.Stop() + n := 0 + for ctx.Err() == nil { + func() { + ctx, can := context.WithCancel(ctx) + defer can() + + done := make(chan struct{}) + go func() { + defer close(done) + + if *zero > 0 { + if _, err := pg.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO "pg-walspam" (k, v, x, y) + SELECT + substr(md5(random()::text), 1, 25), + substr(md5(random()::text), 1, 25), + substr(md5(random()::text), 1, 25), + substr(md5(random()::text), 1, 25) + FROM generate_series(0, %d); + `, *zero)); err != nil { + log.Println("\n! failed nonzero insert:", err) + } + } + + once := &sync.Once{} + diskRoom := func() bool { + row := pg.QueryRowContext(ctx, ` + SELECT COUNT(*) + FROM pg_ls_dir('pg_wal/archive_status') + WHERE pg_ls_dir LIKE '%.ready' + `) + var todo int + if err := row.Err(); err != nil { + log.Println("\n! cannot confirm archiving ok: failed select:", err) + return false + } + if err := row.Scan(&todo); err != nil { + log.Println("\n! cannot confirm archiving ok: failed scan:", err) + return false + } + if todo*16 > *q { + once.Do(func() { + log.Printf("waiting for pg_wal to shrink: pg_wal has %v wal files (~%dMB) in disk upload queue", todo, todo*16) + }) + return false + } + return true + } + for *s && !*u { + if diskRoom() { + break + } + select { + case <-ctx.Done(): + case <-ticker.C: + } + } + + ctx, can = context.WithTimeout(ctx, time.Second*5) + defer can() + + if !*s { + } else if _, err := pg.ExecContext(ctx, ` + INSERT INTO walspam_t (i) VALUES (1); + SELECT pg_create_restore_point('test'); + SELECT pg_switch_wal(); + CHECKPOINT; + `); err != nil { + log.Println("\n! failed insert+walspam:", err) + return + } + + n += 1 + select { + case <-pticker.C: + log.Println(n) + default: + } + }() + + select { + case <-ctx.Done(): + case <-done: + } + }() + select { + case <-ctx.Done(): + case <-ticker.C: + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a908944 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module pg + +go 1.24.2 + +require github.com/lib/pq v1.10.9 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..aeddeae --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=