some type stubs
This commit is contained in:
62
queue.go
Normal file
62
queue.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
driver Driver
|
||||
}
|
||||
|
||||
func NewTestQueue() Queue {
|
||||
return Queue{driver: NewTestDB()}
|
||||
}
|
||||
|
||||
func NewQueue(driver Driver) Queue {
|
||||
return Queue{driver: driver}
|
||||
}
|
||||
|
||||
func (q Queue) Push(ctx context.Context, m Message) error {
|
||||
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
|
||||
}
|
||||
|
||||
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
|
||||
for {
|
||||
m, err := q.peekFirst(ctx)
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
if !m.Empty() {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return Message{}, ctx.Err()
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q Queue) Ack(ctx context.Context, id string) error {
|
||||
return q.driver.Set(ctx, "q", id, nil)
|
||||
}
|
||||
|
||||
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
|
||||
var m Message
|
||||
subctx, subcan := context.WithCancel(ctx)
|
||||
defer subcan()
|
||||
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
|
||||
m = MustDeserialize(value)
|
||||
subcan()
|
||||
return nil
|
||||
})
|
||||
if errors.Is(err, subctx.Err()) {
|
||||
err = nil
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
Reference in New Issue
Block a user