Compare commits

...

7 Commits

Author SHA1 Message Date
Bel LaPointe 3ae725edad underwhelming and locks wouldve been faster BUT this is sql instead of s3 so maybe thats different 2025-12-11 11:05:17 -07:00
Bel LaPointe 46e247a9e1 cp template 2025-12-11 10:13:16 -07:00
Bel LaPointe 558d2f7f9d edit template 2025-12-11 10:13:02 -07:00
Bel LaPointe 3f69a2d5d5 pg-lockless-fifo-demo defaults to sqlite 2025-12-11 10:11:48 -07:00
Bel LaPointe 352d8e1fbc sqlite conn string ok 2025-12-11 10:11:35 -07:00
Bel LaPointe 5f39cdad19 lockless-fifo accepts sqlite gomod 2025-12-11 10:00:47 -07:00
Bel LaPointe ab05743d4e lockless-fifo accepts sqlite 2025-12-11 10:00:25 -07:00
8 changed files with 352 additions and 52 deletions

View File

@ -21,13 +21,13 @@ func main() {
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "postgresql://pulsegres:pulsegres@localhost:15432", "conn string")
c := fs.String("c", "sqlite://", "conn string")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
return with.PSQL(ctx, *c, func(pg *sql.DB) error {
go with.Every(ctx, 5*time.Second, func() {
return with.SQL(ctx, *c, func(pg *sql.DB) error {
with.Every(ctx, 5*time.Second, func() {
row := pg.QueryRowContext(ctx, `SELECT 1`)
var n int
if err := row.Err(); err != nil {
@ -35,11 +35,9 @@ func run(ctx context.Context) error {
} else if err := row.Scan(&n); err != nil {
log.Println("scan err:", err)
} else {
log.Println("ping...")
log.Println("pinged")
}
})
<-ctx.Done()
return nil
return ctx.Err()
})
}

View File

@ -0,0 +1,7 @@
https://trychroma.com/engineering/wal3
(generally with S3 but this is fine)
1. GET (current_node, cksum) FROM head_pointer
1. INSERT node (previous_node) VALUES (head_pointer) RETURNING node.id
1. UPDATE head_pointer (current_node, cksum) VALUES (node.id, blockchain) IF head_pointer.cksum=cksum

View File

@ -0,0 +1,188 @@
package main
import (
"context"
"crypto/md5"
"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"sync/atomic"
"time"
"pg/src/with"
)
func main() {
if err := with.Context(run); err != nil {
panic(err)
}
}
func run(ctx context.Context) error {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
c := fs.String("c", "sqlite://", "conn string")
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
return with.SQL(ctx, *c, func(db *sql.DB) error {
if _, err := db.ExecContext(ctx, `
DROP TABLE IF EXISTS lockless_fifo_node;
DROP TABLE IF EXISTS lockless_fifo_head;
CREATE TABLE IF NOT EXISTS lockless_fifo_node (
id BIGSERIAL NOT NULL PRIMARY KEY,
payload BYTEA,
previous_id BIGSERIAL,
cksum BYTEA
);
CREATE TABLE IF NOT EXISTS lockless_fifo_head (
id BIGSERIAL NOT NULL PRIMARY KEY,
node_id BIGSERIAL NOT NULL
);
INSERT INTO lockless_fifo_node (id, payload) VALUES (0, NULL) ON CONFLICT DO NOTHING;
INSERT INTO lockless_fifo_head (id, node_id) VALUES (0, 0) ON CONFLICT DO NOTHING;
`); err != nil {
return err
}
head := func() (int64, []byte, error) {
var idBefore int64
var cksumBefore []byte
row := db.QueryRowContext(ctx, `
SELECT lockless_fifo_node.id, lockless_fifo_node.cksum
FROM lockless_fifo_node
JOIN lockless_fifo_head ON lockless_fifo_head.node_id=lockless_fifo_node.id
`)
if err := row.Err(); err != nil {
return 0, nil, fmt.Errorf("failed to query head: %w", err)
}
err := row.Scan(&idBefore, &cksumBefore)
if err != nil {
return 0, nil, fmt.Errorf("failed to scan head: %w", err)
}
return idBefore, cksumBefore, nil
}
push := func(a any) error {
payload, _ := json.Marshal(a)
idBefore, cksumBefore, err := head()
if err != nil {
return err
}
hash := md5.New()
hash.Write(cksumBefore)
cksumAfter := hash.Sum(payload)
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
defer tx.Rollback()
if err != nil {
return err
} else if _, err := tx.ExecContext(ctx, `
INSERT INTO lockless_fifo_node (
id,
payload,
previous_id,
cksum
) VALUES (
$1,
$2,
$3,
$4
)
`, idBefore+1, payload, idBefore, cksumAfter); err != nil {
return err
} else if _, err := tx.ExecContext(ctx, `
UPDATE lockless_fifo_head
SET node_id=(
SELECT id
FROM lockless_fifo_node
WHERE cksum=$1
)
`, cksumAfter); err != nil {
return err
} else if err := tx.Commit(); err != nil {
return err
}
return nil
}
pop := func() (any, error) {
curId, _, err := head()
if err != nil {
return nil, err
}
for ctx.Err() == nil {
row := db.QueryRowContext(ctx, `
SELECT previous_id
FROM lockless_fifo_node
WHERE id=$1
`, curId)
if err := row.Err(); err != nil {
return nil, fmt.Errorf("failed to select node %d: %w", curId, err)
}
var previousId sql.NullInt64
if err := row.Scan(&previousId); err != nil {
return nil, fmt.Errorf("failed to scan node %d: %w", curId, err)
} else if prev := previousId.Int64; !previousId.Valid || prev == curId {
break
} else {
curId = prev
}
}
row := db.QueryRowContext(ctx, `
SELECT payload
FROM lockless_fifo_node
WHERE id=$1
`, curId)
if err := row.Err(); err != nil {
return nil, err
}
var payload []byte
if err := row.Scan(&payload); err != nil {
return nil, fmt.Errorf("failed to scan tail: %w", err)
}
var a any
if len(payload) > 0 {
err = json.Unmarshal(payload, &a)
}
return a, err
}
var pushes atomic.Int64
go with.Every(ctx, time.Second, func() {
id, cksum, err := head()
if err == nil {
log.Printf("HEAD id=%d vs expected %d (cksum=%s)", id, pushes.Load(), base64.StdEncoding.EncodeToString(cksum))
}
})
for i := 0; i < 20; i++ {
go with.Every(ctx, 10*time.Millisecond, func() {
if err := push(time.Now()); err != nil {
//log.Printf("failed push: %v", err)
} else {
pushes.Add(1)
}
})
}
with.Every(ctx, time.Second, func() {
if _, err := pop(); err != nil {
log.Printf("failed pop: %v", err)
}
})
return ctx.Err()
})
}

18
go.mod
View File

@ -2,4 +2,20 @@ module pg
go 1.24.2
require github.com/lib/pq v1.10.9
require (
github.com/lib/pq v1.10.9
modernc.org/sqlite v1.40.1
)
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
golang.org/x/sys v0.36.0 // indirect
modernc.org/libc v1.66.10 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
)

49
go.sum
View File

@ -1,2 +1,51 @@
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
modernc.org/cc/v4 v4.26.5 h1:xM3bX7Mve6G8K8b+T11ReenJOT+BmVqQj0FY5T4+5Y4=
modernc.org/cc/v4 v4.26.5/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.28.1 h1:wPKYn5EC/mYTqBO373jKjvX2n+3+aK7+sICCv4Fjy1A=
modernc.org/ccgo/v4 v4.28.1/go.mod h1:uD+4RnfrVgE6ec9NGguUNdhqzNIeeomeXf6CL0GTE5Q=
modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA=
modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.66.10 h1:yZkb3YeLx4oynyR+iUsXsybsX4Ubx7MQlSYEw4yj59A=
modernc.org/libc v1.66.10/go.mod h1:8vGSEwvoUoltr4dlywvHqjtAqHBaw0j1jI7iFBTAr2I=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY=
modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

View File

@ -3,53 +3,10 @@ package with
import (
"context"
"database/sql"
"log"
"time"
_ "github.com/lib/pq"
)
func PSQL(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
log.Println("opening...")
pg, err := sql.Open("postgres", conn)
if err != nil {
return err
}
defer func() {
log.Println("closed:", pg.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := pg.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
}
select {
case <-ctx.Done():
case <-time.After(time.Second):
}
}
}()
log.Println("connected")
return foo(pg)
return _sql(ctx, "postgres", conn, foo)
}

71
src/with/sql.go Normal file
View File

@ -0,0 +1,71 @@
package with
import (
"context"
"database/sql"
"fmt"
"log"
"net/url"
"time"
_ "modernc.org/sqlite"
)
func SQL(ctx context.Context, conn string, foo func(*sql.DB) error) error {
u, err := url.Parse(conn)
if err != nil {
return err
}
switch u.Scheme {
case "sqlite":
return Sqlite(ctx, conn, foo)
case "postgres", "postgresql":
return PSQL(ctx, conn, foo)
}
return fmt.Errorf("unknown sql scheme %q", u.Scheme)
}
func _sql(ctx context.Context, engine, conn string, foo func(db *sql.DB) error) error {
log.Printf("opening %s %s...", engine, conn)
db, err := sql.Open(engine, conn)
if err != nil {
return err
}
defer func() {
log.Println("closed:", db.Close())
}()
func() {
pinged := make(chan bool)
defer close(pinged)
for {
log.Println("pinging...")
go func() {
err := db.PingContext(ctx)
if err != nil {
log.Println("!", err)
}
select {
case pinged <- err == nil:
case <-ctx.Done():
case <-time.After(time.Second * 5):
}
}()
select {
case <-ctx.Done():
return
case ok := <-pinged:
if ok {
return
}
}
select {
case <-ctx.Done():
case <-time.After(time.Second):
}
}
}()
log.Println("connected")
return foo(db)
}

14
src/with/sqlite.go Normal file
View File

@ -0,0 +1,14 @@
package with
import (
"context"
"database/sql"
"strings"
_ "modernc.org/sqlite"
)
func Sqlite(ctx context.Context, conn string, foo func(db *sql.DB) error) error {
conn = strings.TrimPrefix(conn, "sqlite://")
return _sql(ctx, "sqlite", conn, foo)
}