package main import ( "context" "crypto/md5" "database/sql" "encoding/base64" "encoding/json" "flag" "fmt" "log" "os" "sync/atomic" "time" "pg/src/with" ) func main() { if err := with.Context(run); err != nil { panic(err) } } func run(ctx context.Context) error { fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) c := fs.String("c", "sqlite://", "conn string") if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } return with.SQL(ctx, *c, func(db *sql.DB) error { if _, err := db.ExecContext(ctx, ` DROP TABLE IF EXISTS lockless_fifo_node; DROP TABLE IF EXISTS lockless_fifo_head; CREATE TABLE IF NOT EXISTS lockless_fifo_node ( id BIGSERIAL NOT NULL PRIMARY KEY, payload BYTEA, previous_id BIGSERIAL, cksum BYTEA ); CREATE TABLE IF NOT EXISTS lockless_fifo_head ( id BIGSERIAL NOT NULL PRIMARY KEY, node_id BIGSERIAL NOT NULL ); INSERT INTO lockless_fifo_node (id, payload) VALUES (0, NULL) ON CONFLICT DO NOTHING; INSERT INTO lockless_fifo_head (id, node_id) VALUES (0, 0) ON CONFLICT DO NOTHING; `); err != nil { return err } head := func() (int64, []byte, error) { var idBefore int64 var cksumBefore []byte row := db.QueryRowContext(ctx, ` SELECT lockless_fifo_node.id, lockless_fifo_node.cksum FROM lockless_fifo_node JOIN lockless_fifo_head ON lockless_fifo_head.node_id=lockless_fifo_node.id `) if err := row.Err(); err != nil { return 0, nil, fmt.Errorf("failed to query head: %w", err) } err := row.Scan(&idBefore, &cksumBefore) if err != nil { return 0, nil, fmt.Errorf("failed to scan head: %w", err) } return idBefore, cksumBefore, nil } push := func(a any) error { payload, _ := json.Marshal(a) idBefore, cksumBefore, err := head() if err != nil { return err } hash := md5.New() hash.Write(cksumBefore) cksumAfter := hash.Sum(payload) tx, err := db.BeginTx(ctx, &sql.TxOptions{}) defer tx.Rollback() if err != nil { return err } else if _, err := tx.ExecContext(ctx, ` INSERT INTO lockless_fifo_node ( id, payload, previous_id, cksum ) VALUES ( $1, $2, $3, $4 ) `, idBefore+1, payload, idBefore, cksumAfter); err != nil { return err } else if _, err := tx.ExecContext(ctx, ` UPDATE lockless_fifo_head SET node_id=( SELECT id FROM lockless_fifo_node WHERE cksum=$1 ) `, cksumAfter); err != nil { return err } else if err := tx.Commit(); err != nil { return err } return nil } pop := func() (any, error) { curId, _, err := head() if err != nil { return nil, err } for ctx.Err() == nil { row := db.QueryRowContext(ctx, ` SELECT previous_id FROM lockless_fifo_node WHERE id=$1 `, curId) if err := row.Err(); err != nil { return nil, fmt.Errorf("failed to select node %d: %w", curId, err) } var previousId sql.NullInt64 if err := row.Scan(&previousId); err != nil { return nil, fmt.Errorf("failed to scan node %d: %w", curId, err) } else if prev := previousId.Int64; !previousId.Valid || prev == curId { break } else { curId = prev } } row := db.QueryRowContext(ctx, ` SELECT payload FROM lockless_fifo_node WHERE id=$1 `, curId) if err := row.Err(); err != nil { return nil, err } var payload []byte if err := row.Scan(&payload); err != nil { return nil, fmt.Errorf("failed to scan tail: %w", err) } var a any if len(payload) > 0 { err = json.Unmarshal(payload, &a) } return a, err } var pushes atomic.Int64 go with.Every(ctx, time.Second, func() { id, cksum, err := head() if err == nil { log.Printf("HEAD id=%d vs expected %d (cksum=%s)", id, pushes.Load(), base64.StdEncoding.EncodeToString(cksum)) } }) for i := 0; i < 20; i++ { go with.Every(ctx, 10*time.Millisecond, func() { if err := push(time.Now()); err != nil { //log.Printf("failed push: %v", err) } else { pushes.Add(1) } }) } with.Every(ctx, time.Second, func() { if _, err := pop(); err != nil { log.Printf("failed pop: %v", err) } }) return ctx.Err() }) }