From 3ae725edadf99c5efc43c46527d8c6d087b34083 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Thu, 11 Dec 2025 11:05:17 -0700 Subject: [PATCH] underwhelming and locks wouldve been faster BUT this is sql instead of s3 so maybe thats different --- cmd/pg-lockless-fifo-demo/main.go | 163 ++++++++++++++++++++++++++++-- 1 file changed, 155 insertions(+), 8 deletions(-) diff --git a/cmd/pg-lockless-fifo-demo/main.go b/cmd/pg-lockless-fifo-demo/main.go index 15afeb0..13f28c5 100644 --- a/cmd/pg-lockless-fifo-demo/main.go +++ b/cmd/pg-lockless-fifo-demo/main.go @@ -2,10 +2,15 @@ package main import ( "context" + "crypto/md5" "database/sql" + "encoding/base64" + "encoding/json" "flag" + "fmt" "log" "os" + "sync/atomic" "time" "pg/src/with" @@ -25,17 +30,159 @@ func run(ctx context.Context) error { } return with.SQL(ctx, *c, func(db *sql.DB) error { - with.Every(ctx, 5*time.Second, func() { - row := db.QueryRowContext(ctx, `SELECT 1`) - var n int + if _, err := db.ExecContext(ctx, ` + DROP TABLE IF EXISTS lockless_fifo_node; + DROP TABLE IF EXISTS lockless_fifo_head; + + CREATE TABLE IF NOT EXISTS lockless_fifo_node ( + id BIGSERIAL NOT NULL PRIMARY KEY, + payload BYTEA, + previous_id BIGSERIAL, + cksum BYTEA + ); + + CREATE TABLE IF NOT EXISTS lockless_fifo_head ( + id BIGSERIAL NOT NULL PRIMARY KEY, + node_id BIGSERIAL NOT NULL + ); + + INSERT INTO lockless_fifo_node (id, payload) VALUES (0, NULL) ON CONFLICT DO NOTHING; + INSERT INTO lockless_fifo_head (id, node_id) VALUES (0, 0) ON CONFLICT DO NOTHING; + `); err != nil { + return err + } + + head := func() (int64, []byte, error) { + var idBefore int64 + var cksumBefore []byte + row := db.QueryRowContext(ctx, ` + SELECT lockless_fifo_node.id, lockless_fifo_node.cksum + FROM lockless_fifo_node + JOIN lockless_fifo_head ON lockless_fifo_head.node_id=lockless_fifo_node.id + `) if err := row.Err(); err != nil { - log.Println("query err:", err) - } else if err := row.Scan(&n); err != nil { - log.Println("scan err:", err) - } else { - log.Println("pinged") + return 0, nil, fmt.Errorf("failed to query head: %w", err) + } + err := row.Scan(&idBefore, &cksumBefore) + if err != nil { + return 0, nil, fmt.Errorf("failed to scan head: %w", err) + } + return idBefore, cksumBefore, nil + } + + push := func(a any) error { + payload, _ := json.Marshal(a) + + idBefore, cksumBefore, err := head() + if err != nil { + return err + } + + hash := md5.New() + hash.Write(cksumBefore) + cksumAfter := hash.Sum(payload) + + tx, err := db.BeginTx(ctx, &sql.TxOptions{}) + defer tx.Rollback() + if err != nil { + return err + } else if _, err := tx.ExecContext(ctx, ` + INSERT INTO lockless_fifo_node ( + id, + payload, + previous_id, + cksum + ) VALUES ( + $1, + $2, + $3, + $4 + ) + `, idBefore+1, payload, idBefore, cksumAfter); err != nil { + return err + } else if _, err := tx.ExecContext(ctx, ` + UPDATE lockless_fifo_head + SET node_id=( + SELECT id + FROM lockless_fifo_node + WHERE cksum=$1 + ) + `, cksumAfter); err != nil { + return err + } else if err := tx.Commit(); err != nil { + return err + } + return nil + } + + pop := func() (any, error) { + curId, _, err := head() + if err != nil { + return nil, err + } + + for ctx.Err() == nil { + row := db.QueryRowContext(ctx, ` + SELECT previous_id + FROM lockless_fifo_node + WHERE id=$1 + `, curId) + if err := row.Err(); err != nil { + return nil, fmt.Errorf("failed to select node %d: %w", curId, err) + } + var previousId sql.NullInt64 + if err := row.Scan(&previousId); err != nil { + return nil, fmt.Errorf("failed to scan node %d: %w", curId, err) + } else if prev := previousId.Int64; !previousId.Valid || prev == curId { + break + } else { + curId = prev + } + } + + row := db.QueryRowContext(ctx, ` + SELECT payload + FROM lockless_fifo_node + WHERE id=$1 + `, curId) + if err := row.Err(); err != nil { + return nil, err + } + var payload []byte + if err := row.Scan(&payload); err != nil { + return nil, fmt.Errorf("failed to scan tail: %w", err) + } + var a any + if len(payload) > 0 { + err = json.Unmarshal(payload, &a) + } + return a, err + } + + var pushes atomic.Int64 + go with.Every(ctx, time.Second, func() { + id, cksum, err := head() + if err == nil { + log.Printf("HEAD id=%d vs expected %d (cksum=%s)", id, pushes.Load(), base64.StdEncoding.EncodeToString(cksum)) } }) + + for i := 0; i < 20; i++ { + go with.Every(ctx, 10*time.Millisecond, func() { + if err := push(time.Now()); err != nil { + //log.Printf("failed push: %v", err) + } else { + pushes.Add(1) + } + }) + } + + with.Every(ctx, time.Second, func() { + if _, err := pop(); err != nil { + log.Printf("failed pop: %v", err) + } + }) + return ctx.Err() }) }