impl pg-pulse

main
Bel LaPointe 2025-10-31 12:20:40 -06:00
parent 51307a7424
commit b8d8f318ab
1 changed files with 114 additions and 0 deletions

114
cmd/pg-pulse/main.go Normal file
View File

@ -0,0 +1,114 @@
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/lib/pq"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
d := fs.Duration("d", time.Second, "interval")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
log.Println("opening...")
pg, err := sql.Open("postgres", *c)
if err != nil {
panic(err)
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
case <-time.After(time.Second * 5):
}
}
}()
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
panic(err)
}
log.Println("spamming...")
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := 0
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
log.Println("\n! failed nonzero insert:", err)
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
n += 1
select {
case <-pticker.C:
log.Println(n)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}