package main import ( "context" "database/sql" "flag" "fmt" "log" "os" "os/signal" "sync" "syscall" "time" _ "github.com/lib/pq" ) func main() { ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) defer can() fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") d := fs.Duration("d", time.Second, "interval") u := fs.Bool("unsafe", false, "do not assert safe to spam") s := fs.Bool("superuser", true, "assume superuser else do less effective stuff") zero := fs.Int("z", 0, "num of rand string spam") q := fs.Int("q", 100, "size of queue in mb") if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } log.Println("opening...") pg, err := sql.Open("postgres", *c) if err != nil { panic(err) } defer func() { log.Println("closed:", pg.Close()) }() func() { pinged := make(chan bool) defer close(pinged) for { log.Println("pinging...") go func() { err := pg.PingContext(ctx) if err != nil { log.Println("!", err) } select { case pinged <- err == nil: case <-ctx.Done(): case <-time.After(time.Second * 5): } }() select { case <-ctx.Done(): return case ok := <-pinged: if ok { return } case <-time.After(time.Second * 5): } } }() log.Println("staging...") if _, err := pg.ExecContext(ctx, ` DROP TABLE IF EXISTS walspam_t; CREATE TABLE IF NOT EXISTS walspam_t (i INT); CREATE TABLE IF NOT EXISTS "pg-walspam" (k TEXT); ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS v TEXT; ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS x TEXT; ALTER TABLE "pg-walspam" ADD COLUMN IF NOT EXISTS y TEXT; CREATE INDEX IF NOT EXISTS "pg-walspam_idx" ON "pg-walspam" (k); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_v" ON "pg-walspam" (k, v); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_x" ON "pg-walspam" (k, x); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_k_y" ON "pg-walspam" (k, y); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_k" ON "pg-walspam" (v, k); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_x" ON "pg-walspam" (v, x); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_v_y" ON "pg-walspam" (v, y); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_k" ON "pg-walspam" (x, k); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_v" ON "pg-walspam" (x, v); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_x_y" ON "pg-walspam" (x, y); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_k" ON "pg-walspam" (y, k); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_v" ON "pg-walspam" (y, v); CREATE INDEX IF NOT EXISTS "pg-walspam_idx_y_x" ON "pg-walspam" (y, x); `); err != nil { panic(err) } log.Println("spamming...") pticker := time.NewTicker(5 * time.Second) defer pticker.Stop() ticker := time.NewTicker(*d) defer ticker.Stop() n := 0 for ctx.Err() == nil { func() { ctx, can := context.WithCancel(ctx) defer can() done := make(chan struct{}) go func() { defer close(done) if *zero > 0 { if _, err := pg.ExecContext(ctx, fmt.Sprintf(` INSERT INTO "pg-walspam" (k, v, x, y) SELECT substr(md5(random()::text), 1, 25), substr(md5(random()::text), 1, 25), substr(md5(random()::text), 1, 25), substr(md5(random()::text), 1, 25) FROM generate_series(0, %d); `, *zero)); err != nil { log.Println("\n! failed nonzero insert:", err) } } once := &sync.Once{} diskRoom := func() bool { row := pg.QueryRowContext(ctx, ` SELECT COUNT(*) FROM pg_ls_dir('pg_wal/archive_status') WHERE pg_ls_dir LIKE '%.ready' `) var todo int if err := row.Err(); err != nil { log.Println("\n! cannot confirm archiving ok: failed select:", err) return false } if err := row.Scan(&todo); err != nil { log.Println("\n! cannot confirm archiving ok: failed scan:", err) return false } if todo*16 > *q { once.Do(func() { log.Printf("waiting for pg_wal to shrink: pg_wal has %v wal files (~%dMB) in disk upload queue", todo, todo*16) }) return false } return true } for *s && !*u { if diskRoom() { break } select { case <-ctx.Done(): case <-ticker.C: } } ctx, can = context.WithTimeout(ctx, time.Second*5) defer can() if !*s { } else if _, err := pg.ExecContext(ctx, ` INSERT INTO walspam_t (i) VALUES (1); SELECT pg_create_restore_point('test'); SELECT pg_switch_wal(); CHECKPOINT; `); err != nil { log.Println("\n! failed insert+walspam:", err) return } n += 1 select { case <-pticker.C: log.Println(n) default: } }() select { case <-ctx.Done(): case <-done: } }() select { case <-ctx.Done(): case <-ticker.C: } } }