spoc-bot-vr/queue.go

63 lines
1.1 KiB
Go

package main
import (
"context"
"time"
"github.com/go-errors/errors"
)
type Queue struct {
driver Driver
}
func NewTestQueue() Queue {
return Queue{driver: NewTestDB()}
}
func NewQueue(driver Driver) Queue {
return Queue{driver: driver}
}
func (q Queue) Push(ctx context.Context, m Message) error {
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
}
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
for {
m, err := q.peekFirst(ctx)
if err != nil {
return m, err
}
if !m.Empty() {
return m, nil
}
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case <-time.After(time.Second):
}
}
}
func (q Queue) Ack(ctx context.Context, id string) error {
return q.driver.Set(ctx, "q", id, nil)
}
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
var m Message
subctx, subcan := context.WithCancel(ctx)
defer subcan()
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
m = MustDeserialize(value)
subcan()
return nil
})
if errors.Is(err, subctx.Err()) {
err = nil
}
return m, err
}