up to q
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
/slack-bot-vr
|
||||||
|
**/*.sw*
|
||||||
27
driver.go
27
driver.go
@@ -9,9 +9,15 @@ import (
|
|||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Driver interface {
|
||||||
|
Close() error
|
||||||
|
ForEach(context.Context, string, func(string, []byte) error) error
|
||||||
|
Get(context.Context, string, string) ([]byte, error)
|
||||||
|
Set(context.Context, string, string, []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
type BBolt struct {
|
type BBolt struct {
|
||||||
db *bbolt.DB
|
db *bbolt.DB
|
||||||
bkt string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestDB() BBolt {
|
func NewTestDB() BBolt {
|
||||||
@@ -37,9 +43,9 @@ func (bb BBolt) Close() error {
|
|||||||
return bb.db.Close()
|
return bb.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) error {
|
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
|
||||||
return bb.db.View(func(tx *bbolt.Tx) error {
|
return bb.db.View(func(tx *bbolt.Tx) error {
|
||||||
bkt := tx.Bucket([]byte(bb.bkt))
|
bkt := tx.Bucket([]byte(db))
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -55,10 +61,10 @@ func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) erro
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) {
|
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
|
||||||
var b []byte
|
var b []byte
|
||||||
err := bb.db.View(func(tx *bbolt.Tx) error {
|
err := bb.db.View(func(tx *bbolt.Tx) error {
|
||||||
bkt := tx.Bucket([]byte(bb.bkt))
|
bkt := tx.Bucket([]byte(db))
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -69,17 +75,20 @@ func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) {
|
|||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bb BBolt) Set(_ context.Context, id string, value []byte) error {
|
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
|
||||||
return bb.db.Update(func(tx *bbolt.Tx) error {
|
return bb.db.Update(func(tx *bbolt.Tx) error {
|
||||||
bkt := tx.Bucket([]byte(bb.bkt))
|
bkt := tx.Bucket([]byte(db))
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
var err error
|
var err error
|
||||||
bkt, err = tx.CreateBucket([]byte(bb.bkt))
|
bkt, err = tx.CreateBucket([]byte(db))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if value == nil {
|
||||||
|
return bkt.Delete([]byte(id))
|
||||||
|
}
|
||||||
return bkt.Put([]byte(id), value)
|
return bkt.Put([]byte(id), value)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
5
go.mod
5
go.mod
@@ -2,6 +2,9 @@ module gitea/render/slack-bot-vr
|
|||||||
|
|
||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
require go.etcd.io/bbolt v1.3.9
|
require (
|
||||||
|
github.com/go-errors/errors v1.5.1
|
||||||
|
go.etcd.io/bbolt v1.3.9
|
||||||
|
)
|
||||||
|
|
||||||
require golang.org/x/sys v0.4.0 // indirect
|
require golang.org/x/sys v0.4.0 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -1,4 +1,6 @@
|
|||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
|
||||||
|
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
|
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
|
||||||
|
|||||||
@@ -14,6 +14,10 @@ type Message struct {
|
|||||||
AssetID string
|
AssetID string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m Message) Empty() bool {
|
||||||
|
return m == (Message{})
|
||||||
|
}
|
||||||
|
|
||||||
func (m Message) Serialize() []byte {
|
func (m Message) Serialize() []byte {
|
||||||
b, err := json.Marshal(m)
|
b, err := json.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
62
queue.go
Normal file
62
queue.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-errors/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Queue struct {
|
||||||
|
driver Driver
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestQueue() Queue {
|
||||||
|
return Queue{driver: NewTestDB()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueue(driver Driver) Queue {
|
||||||
|
return Queue{driver: driver}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) Push(ctx context.Context, m Message) error {
|
||||||
|
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
|
||||||
|
for {
|
||||||
|
m, err := q.peekFirst(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return m, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !m.Empty() {
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return Message{}, ctx.Err()
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) Ack(ctx context.Context, id string) error {
|
||||||
|
return q.driver.Set(ctx, "q", id, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
|
||||||
|
var m Message
|
||||||
|
subctx, subcan := context.WithCancel(ctx)
|
||||||
|
defer subcan()
|
||||||
|
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
|
||||||
|
m = MustDeserialize(value)
|
||||||
|
subcan()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if errors.Is(err, subctx.Err()) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
return m, err
|
||||||
|
}
|
||||||
13
storage.go
13
storage.go
@@ -6,13 +6,6 @@ type Storage struct {
|
|||||||
driver Driver
|
driver Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
type Driver interface {
|
|
||||||
Close() error
|
|
||||||
ForEach(context.Context, func(string, []byte) error) error
|
|
||||||
Get(context.Context, string) ([]byte, error)
|
|
||||||
Set(context.Context, string, []byte) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestStorage() Storage {
|
func NewTestStorage() Storage {
|
||||||
return Storage{driver: NewTestDB()}
|
return Storage{driver: NewTestDB()}
|
||||||
}
|
}
|
||||||
@@ -20,3 +13,9 @@ func NewTestStorage() Storage {
|
|||||||
func NewStorage(driver Driver) Storage {
|
func NewStorage(driver Driver) Storage {
|
||||||
return Storage{driver: driver}
|
return Storage{driver: driver}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) Enqueue(ctx context.Context, m Message) error {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) Dequeue(ctx context.Context, m Message) error {
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user