diff --git a/driver.go b/driver.go new file mode 100644 index 0000000..04dc1b6 --- /dev/null +++ b/driver.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "io/ioutil" + "os" + "time" + + "go.etcd.io/bbolt" +) + +type Driver interface { + Close() error + ForEach(context.Context, string, func(string, []byte) error) error + Get(context.Context, string, string) ([]byte, error) + Set(context.Context, string, string, []byte) error +} + +type BBolt struct { + db *bbolt.DB +} + +func NewTestDB() BBolt { + d, err := ioutil.TempDir(os.TempDir(), "test-db-*") + if err != nil { + panic(err) + } + db, err := NewDB(d) + if err != nil { + panic(err) + } + return db +} + +func NewDB(p string) (BBolt, error) { + db, err := bbolt.Open(p, 0600, &bbolt.Options{ + Timeout: time.Second, + }) + return BBolt{db: db}, err +} + +func (bb BBolt) Close() error { + return bb.db.Close() +} + +func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error { + return bb.db.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte(db)) + if bkt == nil { + return nil + } + + c := bkt.Cursor() + for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() { + if err := cb(string(k), v); err != nil { + return err + } + } + + return ctx.Err() + }) +} + +func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) { + var b []byte + err := bb.db.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte(db)) + if bkt == nil { + return nil + } + + b = bkt.Get([]byte(id)) + return nil + }) + return b, err +} + +func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error { + return bb.db.Update(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte(db)) + if bkt == nil { + var err error + bkt, err = tx.CreateBucket([]byte(db)) + if err != nil { + return err + } + } + + if value == nil { + return bkt.Delete([]byte(id)) + } + return bkt.Put([]byte(id), value) + }) +} diff --git a/go.mod b/go.mod index cd4b6ab..0fec412 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,10 @@ module github.com/breel-render/spoc-bot-vr go 1.22.1 + +require ( + github.com/go-errors/errors v1.5.1 + go.etcd.io/bbolt v1.3.9 +) + +require golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fe9ccb0 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= +github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/message.go b/message.go new file mode 100644 index 0000000..031ea28 --- /dev/null +++ b/message.go @@ -0,0 +1,41 @@ +package main + +import "encoding/json" + +type Message struct { + ID string + TS uint64 + Plain string + Source string + Channel string + Thread string + EventName string + EventID string + AssetID string +} + +func (m Message) Empty() bool { + return m == (Message{}) +} + +func (m Message) Serialize() []byte { + b, err := json.Marshal(m) + if err != nil { + panic(err) + } + return b +} + +func MustDeserialize(b []byte) Message { + m, err := Deserialize(b) + if err != nil { + panic(err) + } + return m +} + +func Deserialize(b []byte) (Message, error) { + var m Message + err := json.Unmarshal(b, &m) + return m, err +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..09512c7 --- /dev/null +++ b/queue.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "time" + + "github.com/go-errors/errors" +) + +type Queue struct { + driver Driver +} + +func NewTestQueue() Queue { + return Queue{driver: NewTestDB()} +} + +func NewQueue(driver Driver) Queue { + return Queue{driver: driver} +} + +func (q Queue) Push(ctx context.Context, m Message) error { + return q.driver.Set(ctx, "q", m.ID, m.Serialize()) +} + +func (q Queue) PeekFirst(ctx context.Context) (Message, error) { + for { + m, err := q.peekFirst(ctx) + if err != nil { + return m, err + } + + if !m.Empty() { + return m, nil + } + + select { + case <-ctx.Done(): + return Message{}, ctx.Err() + case <-time.After(time.Second): + } + } +} + +func (q Queue) Ack(ctx context.Context, id string) error { + return q.driver.Set(ctx, "q", id, nil) +} + +func (q Queue) peekFirst(ctx context.Context) (Message, error) { + var m Message + subctx, subcan := context.WithCancel(ctx) + defer subcan() + err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error { + m = MustDeserialize(value) + subcan() + return nil + }) + if errors.Is(err, subctx.Err()) { + err = nil + } + return m, err +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..c37a1fc --- /dev/null +++ b/storage.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "errors" +) + +var ( + ErrNotFound = errors.New("not found") +) + +type Storage struct { + driver Driver +} + +func NewTestStorage() Storage { + return Storage{driver: NewTestDB()} +} + +func NewStorage(driver Driver) Storage { + return Storage{driver: driver} +} + +func (s Storage) Upsert(ctx context.Context, m Message) error { + return s.driver.Set(ctx, "storage", m.ID, m.Serialize()) +} + +func (s Storage) Get(ctx context.Context, id string) (Message, error) { + b, err := s.driver.Get(ctx, "storage", id) + if err != nil { + return Message{}, err + } + if b == nil { + return Message{}, ErrNotFound + } + return MustDeserialize(b), nil +} diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..fb613a7 --- /dev/null +++ b/writer.go @@ -0,0 +1,3 @@ +package main + +type Writer struct{}