pulse more boilerplate refactoring

main
Bel LaPointe 2025-11-19 12:31:17 -07:00
parent 5b11ae1648
commit 39123044b2
4 changed files with 85 additions and 191 deletions

3
.gitignore vendored
View File

@ -1 +1,4 @@
**/*.sw*
/cmd/pg-lo-demo/pg-lo-demo
/cmd/pg-pulse/pg-pulse
/cmd/pg-walspam/pg-walspam

View File

@ -4,8 +4,6 @@ import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"time"
@ -29,119 +27,6 @@ func run(ctx context.Context) error {
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS "pg-pulse";
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
return err
}
log.Println("spamming...")
var downtime time.Duration
okc := make(chan bool)
defer close(okc)
go func() {
lastOK := true
wasOK := time.Now()
for ok := range okc {
if isNewlyOK := ok && !lastOK; isNewlyOK {
downtime += time.Since(wasOK)
}
lastOK = ok
if ok {
wasOK = time.Now()
}
}
}()
pushOK := func(v bool) {
select {
case okc <- v:
default:
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
return ctx.Err()
})
}

View File

@ -62,86 +62,58 @@ func run(ctx context.Context) error {
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
var lostWrites uint
go with.Every(ctx, 5*time.Second, func() {
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
})
with.GoEvery(ctx, *d, func() {
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
pushOK(false)
log.Println("\n! failed nonzero insert:", err)
} else {
n += 1
pushOK(true)
}
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha__'
) AS is_ha,
(
SELECT COUNT(*) > 0
FROM pg_replication_slots
WHERE slot_name LIKE 'dpg_%_a_ha_0'
) AS is_ha1,
(
SELECT $1 - COUNT(*)
FROM "pg-pulse"
) AS lost_writes
`, n)
if err := row.Err(); err != nil {
log.Println("\n! failed getting ha-stuff:", err)
} else if err := row.Scan(&ha, &ha1, &lostWrites); err != nil {
log.Println("\n! failed scanning ha-stuff:", err)
} else {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
})
return ctx.Err()
})
}

34
src/with/every.go Normal file
View File

@ -0,0 +1,34 @@
package with
import (
"context"
"time"
)
func GoEvery(ctx context.Context, d time.Duration, foo func()) {
every(ctx, d, foo, true)
}
func Every(ctx context.Context, d time.Duration, foo func()) {
every(ctx, d, foo, false)
}
func every(ctx context.Context, d time.Duration, foo func(), async bool) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for ctx.Err() == nil {
everyTry(ctx, foo, async)
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}
func everyTry(ctx context.Context, foo func(), async bool) {
if async {
go foo()
} else {
foo()
}
}