59 lines
1.0 KiB
Go
59 lines
1.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/go-errors/errors"
|
|
)
|
|
|
|
type Queue struct {
|
|
driver Driver
|
|
}
|
|
|
|
func NewQueue(driver Driver) Queue {
|
|
return Queue{driver: driver}
|
|
}
|
|
|
|
func (q Queue) Push(ctx context.Context, m Message) error {
|
|
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
|
|
}
|
|
|
|
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
|
|
for {
|
|
m, err := q.peekFirst(ctx)
|
|
if err != nil {
|
|
return m, err
|
|
}
|
|
|
|
if !m.Empty() {
|
|
return m, nil
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return Message{}, ctx.Err()
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q Queue) Ack(ctx context.Context, id string) error {
|
|
return q.driver.Set(ctx, "q", id, nil)
|
|
}
|
|
|
|
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
|
|
var m Message
|
|
subctx, subcan := context.WithCancel(ctx)
|
|
defer subcan()
|
|
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
|
|
m = MustDeserialize(value)
|
|
subcan()
|
|
return nil
|
|
})
|
|
if errors.Is(err, subctx.Err()) {
|
|
err = nil
|
|
}
|
|
return m, err
|
|
}
|