From 9fabbcb761a04b47cbeb62a0e0250e2d4928ad2e Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Fri, 31 Oct 2025 12:54:43 -0600 Subject: [PATCH] write lost writes and downtime --- cmd/pg-pulse/main.go | 70 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/cmd/pg-pulse/main.go b/cmd/pg-pulse/main.go index 361d920..abc33f0 100644 --- a/cmd/pg-pulse/main.go +++ b/cmd/pg-pulse/main.go @@ -64,17 +64,44 @@ func main() { log.Println("staging...") if _, err := pg.ExecContext(ctx, ` + DROP TABLE IF EXISTS "pg-pulse"; CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT); `); err != nil { panic(err) } log.Println("spamming...") + + var downtime time.Duration + okc := make(chan bool) + defer close(okc) + go func() { + lastOK := true + wasOK := time.Now() + for ok := range okc { + if isNewlyOK := ok && !lastOK; isNewlyOK { + downtime += time.Since(wasOK) + } + lastOK = ok + if ok { + wasOK = time.Now() + } + } + }() + pushOK := func(v bool) { + select { + case okc <- v: + default: + } + } + pticker := time.NewTicker(5 * time.Second) defer pticker.Stop() ticker := time.NewTicker(*d) defer ticker.Stop() - n := 0 + n := uint(0) + isHA := false + isHA0 := false for ctx.Err() == nil { func() { ctx, can := context.WithCancel(ctx) @@ -87,16 +114,53 @@ func main() { if _, err := pg.ExecContext(ctx, fmt.Sprintf(` INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) `)); err != nil { + pushOK(false) log.Println("\n! failed nonzero insert:", err) + } else { + n += 1 + pushOK(true) + } + + var lostWrites uint + ha, ha1 := false, false + row := pg.QueryRowContext(ctx, ` + SELECT + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha__' + ) AS is_ha, + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha_0' + ) AS is_ha1, + ( + SELECT $1 - COUNT(*) + FROM "pg-pulse" + ) AS lost_writes + `, n) + if err := row.Err(); err != nil { + log.Println("\n! failed getting ha-stuff:", err) + } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { + log.Println("\n! failed scanning ha-stuff:", err) + } else { + isHA = isHA || ha + isHA0 = isHA && !ha1 } ctx, can = context.WithTimeout(ctx, time.Second*5) defer can() - n += 1 select { case <-pticker.C: - log.Println(n) + using := "" + if isHA && isHA0 { + using = "using ha0" + } else if isHA && !isHA0 { + using = "using ha1" + } + log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) default: } }()