package main import ( "context" "fmt" ) type MessageToPersistence struct { pipeline Pipeline } func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { reader, err := NewQueue(ctx, "new_message", cfg.driver) if err != nil { return Pipeline{}, err } writer, err := NewQueue(ctx, "new_persistence", cfg.driver) if err != nil { return Pipeline{}, err } return Pipeline{ writer: writer, reader: reader, process: newMessageToPersistenceProcess(cfg.driver), }, nil } func newMessageToPersistenceProcess(driver Driver) processFunc { return func(ctx context.Context, msg []byte) ([]byte, error) { m, err := Deserialize(msg) if err != nil { return nil, err } if result, err := driver.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS messages (id TEXT UNIQUE, v TEXT); INSERT INTO messages (id, v) VALUES (?, ?) ON CONFLICT(id) DO UPDATE set v = ?; `, m.ID, msg, msg); err != nil { return nil, err } else if n, err := result.RowsAffected(); err != nil { return nil, err } else if n != 1 { return nil, fmt.Errorf("upserting event to persistence modified %v rows", n) } return msg, nil } }