diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..35877d4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/slack-bot-vr +**/*.sw* diff --git a/driver.go b/driver.go index 053e7a1..04dc1b6 100644 --- a/driver.go +++ b/driver.go @@ -9,9 +9,15 @@ import ( "go.etcd.io/bbolt" ) +type Driver interface { + Close() error + ForEach(context.Context, string, func(string, []byte) error) error + Get(context.Context, string, string) ([]byte, error) + Set(context.Context, string, string, []byte) error +} + type BBolt struct { - db *bbolt.DB - bkt string + db *bbolt.DB } func NewTestDB() BBolt { @@ -37,9 +43,9 @@ func (bb BBolt) Close() error { return bb.db.Close() } -func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) error { +func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error { return bb.db.View(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(bb.bkt)) + bkt := tx.Bucket([]byte(db)) if bkt == nil { return nil } @@ -55,10 +61,10 @@ func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) erro }) } -func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) { +func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) { var b []byte err := bb.db.View(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(bb.bkt)) + bkt := tx.Bucket([]byte(db)) if bkt == nil { return nil } @@ -69,17 +75,20 @@ func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) { return b, err } -func (bb BBolt) Set(_ context.Context, id string, value []byte) error { +func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error { return bb.db.Update(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(bb.bkt)) + bkt := tx.Bucket([]byte(db)) if bkt == nil { var err error - bkt, err = tx.CreateBucket([]byte(bb.bkt)) + bkt, err = tx.CreateBucket([]byte(db)) if err != nil { return err } } + if value == nil { + return bkt.Delete([]byte(id)) + } return bkt.Put([]byte(id), value) }) } diff --git a/go.mod b/go.mod index 0668477..f051867 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,9 @@ module gitea/render/slack-bot-vr go 1.20 -require go.etcd.io/bbolt v1.3.9 +require ( + github.com/go-errors/errors v1.5.1 + go.etcd.io/bbolt v1.3.9 +) require golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum index 2745d0e..bc17cbf 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= +github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= diff --git a/message.go b/message.go index 8417fea..031ea28 100644 --- a/message.go +++ b/message.go @@ -14,6 +14,10 @@ type Message struct { AssetID string } +func (m Message) Empty() bool { + return m == (Message{}) +} + func (m Message) Serialize() []byte { b, err := json.Marshal(m) if err != nil { diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..09512c7 --- /dev/null +++ b/queue.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "time" + + "github.com/go-errors/errors" +) + +type Queue struct { + driver Driver +} + +func NewTestQueue() Queue { + return Queue{driver: NewTestDB()} +} + +func NewQueue(driver Driver) Queue { + return Queue{driver: driver} +} + +func (q Queue) Push(ctx context.Context, m Message) error { + return q.driver.Set(ctx, "q", m.ID, m.Serialize()) +} + +func (q Queue) PeekFirst(ctx context.Context) (Message, error) { + for { + m, err := q.peekFirst(ctx) + if err != nil { + return m, err + } + + if !m.Empty() { + return m, nil + } + + select { + case <-ctx.Done(): + return Message{}, ctx.Err() + case <-time.After(time.Second): + } + } +} + +func (q Queue) Ack(ctx context.Context, id string) error { + return q.driver.Set(ctx, "q", id, nil) +} + +func (q Queue) peekFirst(ctx context.Context) (Message, error) { + var m Message + subctx, subcan := context.WithCancel(ctx) + defer subcan() + err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error { + m = MustDeserialize(value) + subcan() + return nil + }) + if errors.Is(err, subctx.Err()) { + err = nil + } + return m, err +} diff --git a/storage.go b/storage.go index 7645adc..aaac7d0 100644 --- a/storage.go +++ b/storage.go @@ -6,13 +6,6 @@ type Storage struct { driver Driver } -type Driver interface { - Close() error - ForEach(context.Context, func(string, []byte) error) error - Get(context.Context, string) ([]byte, error) - Set(context.Context, string, []byte) error -} - func NewTestStorage() Storage { return Storage{driver: NewTestDB()} } @@ -20,3 +13,9 @@ func NewTestStorage() Storage { func NewStorage(driver Driver) Storage { return Storage{driver: driver} } + +func (s Storage) Enqueue(ctx context.Context, m Message) error { +} + +func (s Storage) Dequeue(ctx context.Context, m Message) error { +}