refactoring

main
Bel LaPointe 2025-11-19 12:14:20 -07:00
parent 9fabbcb761
commit 5b11ae1648
4 changed files with 331 additions and 147 deletions

147
cmd/pg-lo-demo/main.go Normal file
View File

@ -0,0 +1,147 @@
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"time"
"pg/src/with"
_ "github.com/lib/pq"
)
func main() {
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
d := fs.Duration("d", time.Second, "interval")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS "pg-pulse";
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
return err
}
log.Println("spamming...")
var downtime time.Duration
okc := make(chan bool)
defer close(okc)
go func() {
lastOK := true
wasOK := time.Now()
for ok := range okc {
if isNewlyOK := ok && !lastOK; isNewlyOK {
downtime += time.Since(wasOK)
}
lastOK = ok
if ok {
wasOK = time.Now()
}
}
}()
pushOK := func(v bool) {
select {
case okc <- v:
default:
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
return ctx.Err()
})
}

View File

@ -7,17 +7,20 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"pg/src/with"
_ "github.com/lib/pq"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
d := fs.Duration("d", time.Second, "interval")
@ -25,154 +28,120 @@ func main() {
panic(err)
}
log.Println("opening...")
pg, err := sql.Open("postgres", *c)
if err != nil {
panic(err)
}
defer func() {
log.Println("closed:", pg.Close())
}()
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS "pg-pulse";
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
return err
}
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
log.Println("spamming...")
var downtime time.Duration
okc := make(chan bool)
defer close(okc)
go func() {
lastOK := true
wasOK := time.Now()
for ok := range okc {
if isNewlyOK := ok && !lastOK; isNewlyOK {
downtime += time.Since(wasOK)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
lastOK = ok
if ok {
return
wasOK = time.Now()
}
case <-time.After(time.Second * 5):
}
}
}()
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS "pg-pulse";
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
panic(err)
}
log.Println("spamming...")
var downtime time.Duration
okc := make(chan bool)
defer close(okc)
go func() {
lastOK := true
wasOK := time.Now()
for ok := range okc {
if isNewlyOK := ok && !lastOK; isNewlyOK {
downtime += time.Since(wasOK)
}
lastOK = ok
if ok {
wasOK = time.Now()
}
}
}()
pushOK := func(v bool) {
select {
case okc <- v:
default:
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
pushOK := func(v bool) {
select {
case okc <- v:
default:
}
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
return ctx.Err()
})
}

17
src/with/ctx.go Normal file
View File

@ -0,0 +1,17 @@
package with
import (
"context"
"os/signal"
"syscall"
)
func Context(foo func(context.Context) error) error {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := foo(ctx); err != nil && ctx.Err() == nil {
return err
}
return nil
}

51
src/with/pg.go Normal file
View File

@ -0,0 +1,51 @@
package with
import (
"context"
"database/sql"
"log"
"time"
_ "github.com/lib/pq"
)
func PSQL(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
log.Println("opening...")
pg, err := sql.Open("postgres", conn)
if err != nil {
return err
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
case <-time.After(time.Second * 5):
}
}
}()
return foo(pg)
}