Compare commits

...

4 Commits

Author SHA1 Message Date
Bel LaPointe de8cb91f84 wip 2025-11-20 09:29:51 -07:00
Bel LaPointe f7c25c6603 drop unused 2025-11-19 13:11:56 -07:00
Bel LaPointe 39123044b2 pulse more boilerplate refactoring 2025-11-19 12:31:17 -07:00
Bel LaPointe 5b11ae1648 refactoring 2025-11-19 12:14:20 -07:00
6 changed files with 225 additions and 147 deletions

3
.gitignore vendored
View File

@ -1 +1,4 @@
**/*.sw*
/cmd/pg-lo-demo/pg-lo-demo
/cmd/pg-pulse/pg-pulse
/cmd/pg-walspam/pg-walspam

32
cmd/pg-lo-demo/main.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"context"
"database/sql"
"flag"
"os"
"pg/src/with"
_ "github.com/lib/pq"
)
func main() {
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
// SELECT lo_creat(-1);
// SELECT pg_catalog.lowrite(0, '\\037\\213\\010\\000\\000\\000\\000\\000\\000\\000\\245\\220;n\\3030\\014')
return ctx.Err()
})
}

View File

@ -7,17 +7,20 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"pg/src/with"
_ "github.com/lib/pq"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
d := fs.Duration("d", time.Second, "interval")
@ -25,49 +28,13 @@ func main() {
panic(err)
}
log.Println("opening...")
pg, err := sql.Open("postgres", *c)
if err != nil {
panic(err)
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
case <-time.After(time.Second * 5):
}
}
}()
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
log.Println("staging...")
if _, err := pg.ExecContext(ctx, `
DROP TABLE IF EXISTS "pg-pulse";
CREATE TABLE IF NOT EXISTS "pg-pulse" (k TEXT);
`); err != nil {
panic(err)
return err
}
log.Println("spamming...")
@ -95,22 +62,21 @@ func main() {
}
}
pticker := time.NewTicker(5 * time.Second)
defer pticker.Stop()
ticker := time.NewTicker(*d)
defer ticker.Stop()
n := uint(0)
isHA := false
isHA0 := false
for ctx.Err() == nil {
func() {
ctx, can := context.WithCancel(ctx)
defer can()
done := make(chan struct{})
go func() {
defer close(done)
var lostWrites uint
go with.Every(ctx, 5*time.Second, func() {
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
})
with.GoEvery(ctx, *d, func() {
if _, err := pg.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO "pg-pulse" (k) SELECT substr(md5(random()::text), 1, 25)
`)); err != nil {
@ -121,7 +87,6 @@ func main() {
pushOK(true)
}
var lostWrites uint
ha, ha1 := false, false
row := pg.QueryRowContext(ctx, `
SELECT
@ -148,31 +113,7 @@ func main() {
isHA = isHA || ha
isHA0 = isHA && !ha1
}
ctx, can = context.WithTimeout(ctx, time.Second*5)
defer can()
select {
case <-pticker.C:
using := ""
if isHA && isHA0 {
using = "using ha0"
} else if isHA && !isHA0 {
using = "using ha1"
}
log.Printf("%v writes (%v lost) (%v down) %s", n, lostWrites, downtime.Round(time.Second), using)
default:
}
}()
select {
case <-ctx.Done():
case <-done:
}
}()
select {
case <-ctx.Done():
case <-ticker.C:
}
}
})
return ctx.Err()
})
}

17
src/with/ctx.go Normal file
View File

@ -0,0 +1,17 @@
package with
import (
"context"
"os/signal"
"syscall"
)
func Context(foo func(context.Context) error) error {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := foo(ctx); err != nil && ctx.Err() == nil {
return err
}
return nil
}

34
src/with/every.go Normal file
View File

@ -0,0 +1,34 @@
package with
import (
"context"
"time"
)
func GoEvery(ctx context.Context, d time.Duration, foo func()) {
every(ctx, d, foo, true)
}
func Every(ctx context.Context, d time.Duration, foo func()) {
every(ctx, d, foo, false)
}
func every(ctx context.Context, d time.Duration, foo func(), async bool) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for ctx.Err() == nil {
everyTry(ctx, foo, async)
select {
case <-ctx.Done():
case <-ticker.C:
}
}
}
func everyTry(ctx context.Context, foo func(), async bool) {
if async {
go foo()
} else {
foo()
}
}

51
src/with/pg.go Normal file
View File

@ -0,0 +1,51 @@
package with
import (
"context"
"database/sql"
"log"
"time"
_ "github.com/lib/pq"
)
func PSQL(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
log.Println("opening...")
pg, err := sql.Open("postgres", conn)
if err != nil {
return err
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
case <-time.After(time.Second * 5):
}
}
}()
return foo(pg)
}