Compare commits

...

2 Commits

Author SHA1 Message Date
bel
13a1b2e3cd dumb storage 2024-04-11 23:04:25 -06:00
bel
ea902cef86 up to q 2024-04-11 23:01:40 -06:00
7 changed files with 115 additions and 18 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/slack-bot-vr
**/*.sw*

View File

@@ -9,9 +9,15 @@ import (
"go.etcd.io/bbolt"
)
type Driver interface {
Close() error
ForEach(context.Context, string, func(string, []byte) error) error
Get(context.Context, string, string) ([]byte, error)
Set(context.Context, string, string, []byte) error
}
type BBolt struct {
db *bbolt.DB
bkt string
db *bbolt.DB
}
func NewTestDB() BBolt {
@@ -37,9 +43,9 @@ func (bb BBolt) Close() error {
return bb.db.Close()
}
func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) error {
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
return bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(bb.bkt))
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
@@ -55,10 +61,10 @@ func (bb BBolt) ForEach(ctx context.Context, cb func(string, []byte) error) erro
})
}
func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) {
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
var b []byte
err := bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(bb.bkt))
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
@@ -69,17 +75,20 @@ func (bb BBolt) Get(_ context.Context, id string) ([]byte, error) {
return b, err
}
func (bb BBolt) Set(_ context.Context, id string, value []byte) error {
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
return bb.db.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(bb.bkt))
bkt := tx.Bucket([]byte(db))
if bkt == nil {
var err error
bkt, err = tx.CreateBucket([]byte(bb.bkt))
bkt, err = tx.CreateBucket([]byte(db))
if err != nil {
return err
}
}
if value == nil {
return bkt.Delete([]byte(id))
}
return bkt.Put([]byte(id), value)
})
}

5
go.mod
View File

@@ -2,6 +2,9 @@ module gitea/render/slack-bot-vr
go 1.20
require go.etcd.io/bbolt v1.3.9
require (
github.com/go-errors/errors v1.5.1
go.etcd.io/bbolt v1.3.9
)
require golang.org/x/sys v0.4.0 // indirect

2
go.sum
View File

@@ -1,4 +1,6 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=

View File

@@ -14,6 +14,10 @@ type Message struct {
AssetID string
}
func (m Message) Empty() bool {
return m == (Message{})
}
func (m Message) Serialize() []byte {
b, err := json.Marshal(m)
if err != nil {

62
queue.go Normal file
View File

@@ -0,0 +1,62 @@
package main
import (
"context"
"time"
"github.com/go-errors/errors"
)
type Queue struct {
driver Driver
}
func NewTestQueue() Queue {
return Queue{driver: NewTestDB()}
}
func NewQueue(driver Driver) Queue {
return Queue{driver: driver}
}
func (q Queue) Push(ctx context.Context, m Message) error {
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
}
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
for {
m, err := q.peekFirst(ctx)
if err != nil {
return m, err
}
if !m.Empty() {
return m, nil
}
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case <-time.After(time.Second):
}
}
}
func (q Queue) Ack(ctx context.Context, id string) error {
return q.driver.Set(ctx, "q", id, nil)
}
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
var m Message
subctx, subcan := context.WithCancel(ctx)
defer subcan()
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
m = MustDeserialize(value)
subcan()
return nil
})
if errors.Is(err, subctx.Err()) {
err = nil
}
return m, err
}

View File

@@ -1,18 +1,18 @@
package main
import "context"
import (
"context"
"errors"
)
var (
ErrNotFound = errors.New("not found")
)
type Storage struct {
driver Driver
}
type Driver interface {
Close() error
ForEach(context.Context, func(string, []byte) error) error
Get(context.Context, string) ([]byte, error)
Set(context.Context, string, []byte) error
}
func NewTestStorage() Storage {
return Storage{driver: NewTestDB()}
}
@@ -20,3 +20,18 @@ func NewTestStorage() Storage {
func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "storage", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "storage", id)
if err != nil {
return Message{}, err
}
if b == nil {
return Message{}, ErrNotFound
}
return MustDeserialize(b), nil
}