From 39123044b2af40eba21e79c58041fba4f612df02 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Wed, 19 Nov 2025 12:31:17 -0700 Subject: [PATCH] pulse more boilerplate refactoring --- .gitignore | 3 + cmd/pg-lo-demo/main.go | 115 -------------------------------------- cmd/pg-pulse/main.go | 124 ++++++++++++++++------------------------- src/with/every.go | 34 +++++++++++ 4 files changed, 85 insertions(+), 191 deletions(-) create mode 100644 src/with/every.go diff --git a/.gitignore b/.gitignore index 78fd378..5559324 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ **/*.sw* +/cmd/pg-lo-demo/pg-lo-demo +/cmd/pg-pulse/pg-pulse +/cmd/pg-walspam/pg-walspam diff --git a/cmd/pg-lo-demo/main.go b/cmd/pg-lo-demo/main.go index 4cae220..1994c75 100644 --- a/cmd/pg-lo-demo/main.go +++ b/cmd/pg-lo-demo/main.go @@ -4,8 +4,6 @@ import ( "context" "database/sql" "flag" - "fmt" - "log" "os" "time" @@ -29,119 +27,6 @@ func run(ctx context.Context) error { } return with.PSQL(ctx, *c, func(pg *sql.DB) error { - log.Println("staging...") - if _, err := pg.ExecContext(ctx, ` - DROP TABLE IF EXISTS "pg-pulse"; - CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT); - `); err != nil { - return err - } - - log.Println("spamming...") - - var downtime time.Duration - okc := make(chan bool) - defer close(okc) - go func() { - lastOK := true - wasOK := time.Now() - for ok := range okc { - if isNewlyOK := ok && !lastOK; isNewlyOK { - downtime += time.Since(wasOK) - } - lastOK = ok - if ok { - wasOK = time.Now() - } - } - }() - pushOK := func(v bool) { - select { - case okc <- v: - default: - } - } - - pticker := time.NewTicker(5 * time.Second) - defer pticker.Stop() - ticker := time.NewTicker(*d) - defer ticker.Stop() - n := uint(0) - isHA := false - isHA0 := false - for ctx.Err() == nil { - func() { - ctx, can := context.WithCancel(ctx) - defer can() - - done := make(chan struct{}) - go func() { - defer close(done) - - if _, err := pg.ExecContext(ctx, fmt.Sprintf(` - INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) - `)); err != nil { - pushOK(false) - log.Println("\n! failed nonzero insert:", err) - } else { - n += 1 - pushOK(true) - } - - var lostWrites uint - ha, ha1 := false, false - row := pg.QueryRowContext(ctx, ` - SELECT - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha__' - ) AS is_ha, - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha_0' - ) AS is_ha1, - ( - SELECT $1 - COUNT(*) - FROM "pg-pulse" - ) AS lost_writes - `, n) - if err := row.Err(); err != nil { - log.Println("\n! failed getting ha-stuff:", err) - } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { - log.Println("\n! failed scanning ha-stuff:", err) - } else { - isHA = isHA || ha - isHA0 = isHA && !ha1 - } - - ctx, can = context.WithTimeout(ctx, time.Second*5) - defer can() - - select { - case <-pticker.C: - using := "" - if isHA && isHA0 { - using = "using ha0" - } else if isHA && !isHA0 { - using = "using ha1" - } - log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) - default: - } - }() - - select { - case <-ctx.Done(): - case <-done: - } - }() - select { - case <-ctx.Done(): - case <-ticker.C: - } - } return ctx.Err() }) } diff --git a/cmd/pg-pulse/main.go b/cmd/pg-pulse/main.go index 4cae220..e14cb8d 100644 --- a/cmd/pg-pulse/main.go +++ b/cmd/pg-pulse/main.go @@ -62,86 +62,58 @@ func run(ctx context.Context) error { } } - pticker := time.NewTicker(5 * time.Second) - defer pticker.Stop() - ticker := time.NewTicker(*d) - defer ticker.Stop() n := uint(0) isHA := false isHA0 := false - for ctx.Err() == nil { - func() { - ctx, can := context.WithCancel(ctx) - defer can() - - done := make(chan struct{}) - go func() { - defer close(done) - - if _, err := pg.ExecContext(ctx, fmt.Sprintf(` - INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) - `)); err != nil { - pushOK(false) - log.Println("\n! failed nonzero insert:", err) - } else { - n += 1 - pushOK(true) - } - - var lostWrites uint - ha, ha1 := false, false - row := pg.QueryRowContext(ctx, ` - SELECT - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha__' - ) AS is_ha, - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha_0' - ) AS is_ha1, - ( - SELECT $1 - COUNT(*) - FROM "pg-pulse" - ) AS lost_writes - `, n) - if err := row.Err(); err != nil { - log.Println("\n! failed getting ha-stuff:", err) - } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { - log.Println("\n! failed scanning ha-stuff:", err) - } else { - isHA = isHA || ha - isHA0 = isHA && !ha1 - } - - ctx, can = context.WithTimeout(ctx, time.Second*5) - defer can() - - select { - case <-pticker.C: - using := "" - if isHA && isHA0 { - using = "using ha0" - } else if isHA && !isHA0 { - using = "using ha1" - } - log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) - default: - } - }() - - select { - case <-ctx.Done(): - case <-done: - } - }() - select { - case <-ctx.Done(): - case <-ticker.C: + var lostWrites uint + go with.Every(ctx, 5*time.Second, func() { + using := "" + if isHA && isHA0 { + using = "using ha0" + } else if isHA && !isHA0 { + using = "using ha1" } - } + log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) + }) + + with.GoEvery(ctx, *d, func() { + if _, err := pg.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) + `)); err != nil { + pushOK(false) + log.Println("\n! failed nonzero insert:", err) + } else { + n += 1 + pushOK(true) + } + + ha, ha1 := false, false + row := pg.QueryRowContext(ctx, ` + SELECT + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha__' + ) AS is_ha, + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha_0' + ) AS is_ha1, + ( + SELECT $1 - COUNT(*) + FROM "pg-pulse" + ) AS lost_writes + `, n) + if err := row.Err(); err != nil { + log.Println("\n! failed getting ha-stuff:", err) + } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { + log.Println("\n! failed scanning ha-stuff:", err) + } else { + isHA = isHA || ha + isHA0 = isHA && !ha1 + } + }) return ctx.Err() }) } diff --git a/src/with/every.go b/src/with/every.go new file mode 100644 index 0000000..25bce14 --- /dev/null +++ b/src/with/every.go @@ -0,0 +1,34 @@ +package with + +import ( + "context" + "time" +) + +func GoEvery(ctx context.Context, d time.Duration, foo func()) { + every(ctx, d, foo, true) +} + +func Every(ctx context.Context, d time.Duration, foo func()) { + every(ctx, d, foo, false) +} + +func every(ctx context.Context, d time.Duration, foo func(), async bool) { + ticker := time.NewTicker(d) + defer ticker.Stop() + for ctx.Err() == nil { + everyTry(ctx, foo, async) + select { + case <-ctx.Done(): + case <-ticker.C: + } + } +} + +func everyTry(ctx context.Context, foo func(), async bool) { + if async { + go foo() + } else { + foo() + } +}