package main import ( "context" "time" "github.com/go-errors/errors" ) type Queue struct { driver Driver } func NewQueue(driver Driver) Queue { return Queue{driver: driver} } func (q Queue) Push(ctx context.Context, m Message) error { return q.driver.Set(ctx, "q", m.ID, m.Serialize()) } func (q Queue) PeekFirst(ctx context.Context) (Message, error) { for { m, err := q.peekFirst(ctx) if err != nil { return m, err } if !m.Empty() { return m, nil } select { case <-ctx.Done(): return Message{}, ctx.Err() case <-time.After(time.Second): } } } func (q Queue) Ack(ctx context.Context, id string) error { return q.driver.Set(ctx, "q", id, nil) } func (q Queue) peekFirst(ctx context.Context) (Message, error) { var m Message subctx, subcan := context.WithCancel(ctx) defer subcan() err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error { m = MustDeserialize(value) subcan() return nil }) if errors.Is(err, subctx.Err()) { err = nil } return m, err }