From 5b11ae1648f462c5f10c57adb379fd9ff03f849b Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Wed, 19 Nov 2025 12:14:20 -0700 Subject: [PATCH] refactoring --- cmd/pg-lo-demo/main.go | 147 +++++++++++++++++++++++ cmd/pg-pulse/main.go | 263 ++++++++++++++++++----------------------- src/with/ctx.go | 17 +++ src/with/pg.go | 51 ++++++++ 4 files changed, 331 insertions(+), 147 deletions(-) create mode 100644 cmd/pg-lo-demo/main.go create mode 100644 src/with/ctx.go create mode 100644 src/with/pg.go diff --git a/cmd/pg-lo-demo/main.go b/cmd/pg-lo-demo/main.go new file mode 100644 index 0000000..4cae220 --- /dev/null +++ b/cmd/pg-lo-demo/main.go @@ -0,0 +1,147 @@ +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "log" + "os" + "time" + + "pg/src/with" + + _ "github.com/lib/pq" +) + +func main() { + if err := with.Context(run); err != nil { + panic(err) + } +} + +func run(ctx context.Context) error { + fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") + d := fs.Duration("d", time.Second, "interval") + if err := fs.Parse(os.Args[1:]); err != nil { + panic(err) + } + + return with.PSQL(ctx, *c, func(pg *sql.DB) error { + log.Println("staging...") + if _, err := pg.ExecContext(ctx, ` + DROP TABLE IF EXISTS "pg-pulse"; + CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT); + `); err != nil { + return err + } + + log.Println("spamming...") + + var downtime time.Duration + okc := make(chan bool) + defer close(okc) + go func() { + lastOK := true + wasOK := time.Now() + for ok := range okc { + if isNewlyOK := ok && !lastOK; isNewlyOK { + downtime += time.Since(wasOK) + } + lastOK = ok + if ok { + wasOK = time.Now() + } + } + }() + pushOK := func(v bool) { + select { + case okc <- v: + default: + } + } + + pticker := time.NewTicker(5 * time.Second) + defer pticker.Stop() + ticker := time.NewTicker(*d) + defer ticker.Stop() + n := uint(0) + isHA := false + isHA0 := false + for ctx.Err() == nil { + func() { + ctx, can := context.WithCancel(ctx) + defer can() + + done := make(chan struct{}) + go func() { + defer close(done) + + if _, err := pg.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) + `)); err != nil { + pushOK(false) + log.Println("\n! failed nonzero insert:", err) + } else { + n += 1 + pushOK(true) + } + + var lostWrites uint + ha, ha1 := false, false + row := pg.QueryRowContext(ctx, ` + SELECT + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha__' + ) AS is_ha, + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha_0' + ) AS is_ha1, + ( + SELECT $1 - COUNT(*) + FROM "pg-pulse" + ) AS lost_writes + `, n) + if err := row.Err(); err != nil { + log.Println("\n! failed getting ha-stuff:", err) + } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { + log.Println("\n! failed scanning ha-stuff:", err) + } else { + isHA = isHA || ha + isHA0 = isHA && !ha1 + } + + ctx, can = context.WithTimeout(ctx, time.Second*5) + defer can() + + select { + case <-pticker.C: + using := "" + if isHA && isHA0 { + using = "using ha0" + } else if isHA && !isHA0 { + using = "using ha1" + } + log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) + default: + } + }() + + select { + case <-ctx.Done(): + case <-done: + } + }() + select { + case <-ctx.Done(): + case <-ticker.C: + } + } + return ctx.Err() + }) +} diff --git a/cmd/pg-pulse/main.go b/cmd/pg-pulse/main.go index abc33f0..4cae220 100644 --- a/cmd/pg-pulse/main.go +++ b/cmd/pg-pulse/main.go @@ -7,17 +7,20 @@ import ( "fmt" "log" "os" - "os/signal" - "syscall" "time" + "pg/src/with" + _ "github.com/lib/pq" ) func main() { - ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) - defer can() + if err := with.Context(run); err != nil { + panic(err) + } +} +func run(ctx context.Context) error { fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string") d := fs.Duration("d", time.Second, "interval") @@ -25,154 +28,120 @@ func main() { panic(err) } - log.Println("opening...") - pg, err := sql.Open("postgres", *c) - if err != nil { - panic(err) - } - defer func() { - log.Println("closed:", pg.Close()) - }() + return with.PSQL(ctx, *c, func(pg *sql.DB) error { + log.Println("staging...") + if _, err := pg.ExecContext(ctx, ` + DROP TABLE IF EXISTS "pg-pulse"; + CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT); + `); err != nil { + return err + } - func() { - pinged := make(chan bool) - defer close(pinged) - for { - log.Println("pinging...") - go func() { - err := pg.PingContext(ctx) - if err != nil { - log.Println("!", err) + log.Println("spamming...") + + var downtime time.Duration + okc := make(chan bool) + defer close(okc) + go func() { + lastOK := true + wasOK := time.Now() + for ok := range okc { + if isNewlyOK := ok && !lastOK; isNewlyOK { + downtime += time.Since(wasOK) } - select { - case pinged <- err == nil: - case <-ctx.Done(): - case <-time.After(time.Second * 5): - } - }() - select { - case <-ctx.Done(): - return - case ok := <-pinged: + lastOK = ok if ok { - return + wasOK = time.Now() } - case <-time.After(time.Second * 5): - } - } - }() - - log.Println("staging...") - if _, err := pg.ExecContext(ctx, ` - DROP TABLE IF EXISTS "pg-pulse"; - CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT); - `); err != nil { - panic(err) - } - - log.Println("spamming...") - - var downtime time.Duration - okc := make(chan bool) - defer close(okc) - go func() { - lastOK := true - wasOK := time.Now() - for ok := range okc { - if isNewlyOK := ok && !lastOK; isNewlyOK { - downtime += time.Since(wasOK) - } - lastOK = ok - if ok { - wasOK = time.Now() - } - } - }() - pushOK := func(v bool) { - select { - case okc <- v: - default: - } - } - - pticker := time.NewTicker(5 * time.Second) - defer pticker.Stop() - ticker := time.NewTicker(*d) - defer ticker.Stop() - n := uint(0) - isHA := false - isHA0 := false - for ctx.Err() == nil { - func() { - ctx, can := context.WithCancel(ctx) - defer can() - - done := make(chan struct{}) - go func() { - defer close(done) - - if _, err := pg.ExecContext(ctx, fmt.Sprintf(` - INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) - `)); err != nil { - pushOK(false) - log.Println("\n! failed nonzero insert:", err) - } else { - n += 1 - pushOK(true) - } - - var lostWrites uint - ha, ha1 := false, false - row := pg.QueryRowContext(ctx, ` - SELECT - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha__' - ) AS is_ha, - ( - SELECT COUNT(*) > 0 - FROM pg_replication_slots - WHERE slot_name LIKE 'dpg_%_a_ha_0' - ) AS is_ha1, - ( - SELECT $1 - COUNT(*) - FROM "pg-pulse" - ) AS lost_writes - `, n) - if err := row.Err(); err != nil { - log.Println("\n! failed getting ha-stuff:", err) - } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { - log.Println("\n! failed scanning ha-stuff:", err) - } else { - isHA = isHA || ha - isHA0 = isHA && !ha1 - } - - ctx, can = context.WithTimeout(ctx, time.Second*5) - defer can() - - select { - case <-pticker.C: - using := "" - if isHA && isHA0 { - using = "using ha0" - } else if isHA && !isHA0 { - using = "using ha1" - } - log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) - default: - } - }() - - select { - case <-ctx.Done(): - case <-done: } }() - select { - case <-ctx.Done(): - case <-ticker.C: + pushOK := func(v bool) { + select { + case okc <- v: + default: + } } - } + + pticker := time.NewTicker(5 * time.Second) + defer pticker.Stop() + ticker := time.NewTicker(*d) + defer ticker.Stop() + n := uint(0) + isHA := false + isHA0 := false + for ctx.Err() == nil { + func() { + ctx, can := context.WithCancel(ctx) + defer can() + + done := make(chan struct{}) + go func() { + defer close(done) + + if _, err := pg.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25) + `)); err != nil { + pushOK(false) + log.Println("\n! failed nonzero insert:", err) + } else { + n += 1 + pushOK(true) + } + + var lostWrites uint + ha, ha1 := false, false + row := pg.QueryRowContext(ctx, ` + SELECT + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha__' + ) AS is_ha, + ( + SELECT COUNT(*) > 0 + FROM pg_replication_slots + WHERE slot_name LIKE 'dpg_%_a_ha_0' + ) AS is_ha1, + ( + SELECT $1 - COUNT(*) + FROM "pg-pulse" + ) AS lost_writes + `, n) + if err := row.Err(); err != nil { + log.Println("\n! failed getting ha-stuff:", err) + } else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil { + log.Println("\n! failed scanning ha-stuff:", err) + } else { + isHA = isHA || ha + isHA0 = isHA && !ha1 + } + + ctx, can = context.WithTimeout(ctx, time.Second*5) + defer can() + + select { + case <-pticker.C: + using := "" + if isHA && isHA0 { + using = "using ha0" + } else if isHA && !isHA0 { + using = "using ha1" + } + log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using) + default: + } + }() + + select { + case <-ctx.Done(): + case <-done: + } + }() + select { + case <-ctx.Done(): + case <-ticker.C: + } + } + return ctx.Err() + }) } diff --git a/src/with/ctx.go b/src/with/ctx.go new file mode 100644 index 0000000..480dd6a --- /dev/null +++ b/src/with/ctx.go @@ -0,0 +1,17 @@ +package with + +import ( + "context" + "os/signal" + "syscall" +) + +func Context(foo func(context.Context) error) error { + ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) + defer can() + + if err := foo(ctx); err != nil && ctx.Err() == nil { + return err + } + return nil +} diff --git a/src/with/pg.go b/src/with/pg.go new file mode 100644 index 0000000..0a43611 --- /dev/null +++ b/src/with/pg.go @@ -0,0 +1,51 @@ +package with + +import ( + "context" + "database/sql" + "log" + "time" + + _ "github.com/lib/pq" +) + +func PSQL(ctx context.Context, conn string, foo func(db *sql.DB) error) error { + log.Println("opening...") + pg, err := sql.Open("postgres", conn) + if err != nil { + return err + } + defer func() { + log.Println("closed:", pg.Close()) + }() + + func() { + pinged := make(chan bool) + defer close(pinged) + for { + log.Println("pinging...") + go func() { + err := pg.PingContext(ctx) + if err != nil { + log.Println("!", err) + } + select { + case pinged <- err == nil: + case <-ctx.Done(): + case <-time.After(time.Second * 5): + } + }() + select { + case <-ctx.Done(): + return + case ok := <-pinged: + if ok { + return + } + case <-time.After(time.Second * 5): + } + } + }() + + return foo(pg) +}