MyTinyTodo2/mytinytodo/buffer.go

124 lines
2.6 KiB
Go

package mytinytodo
import (
"local/mytinytodoclient/mytinytodo/remote"
"local/rproxy3/storage"
"log"
"sync"
"time"
"github.com/google/uuid"
)
const nsQueue = "delta"
const keyQueue = "queue"
const nsTasks = "delta"
type Buffer struct {
config *Config
db storage.DB
dbLock *sync.RWMutex
done chan struct{}
interval time.Duration
}
func NewBuffer(config *Config) (*Buffer, error) {
db := storage.NewMap()
b := &Buffer{
config: config,
db: db,
done: make(chan struct{}),
dbLock: &sync.RWMutex{},
interval: time.Second * 10,
}
go b.Dequeue()
go b.RefreshLocal()
return b, nil
}
func (buffer *Buffer) Close() {
close(buffer.done)
}
func (buffer *Buffer) Enqueue(op remote.Op, listID, taskName string, taskTags ...string) error {
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
qop := &QueuedOp{
Op: op,
ListID: listID,
TaskName: taskName,
TaskTags: taskTags,
}
uuid, _ := uuid.NewRandom()
key := uuid.String()
todo := NewStringArray()
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil {
return err
}
sa := todo.StringArray()
sa = append(sa, key)
todo = NewStringArray(sa...)
if err := buffer.db.Set(nsTasks, key, qop); err != nil {
return err
}
if err := buffer.db.Set(nsQueue, keyQueue, todo); err != nil {
return err
}
log.Printf("enqueued task %v as %v, %vth in line", qop, key, len(sa))
return nil
}
func (buffer *Buffer) Dequeue() {
buffer.notDoneCallback(func() {
client, err := remote.NewClient(buffer.config.Config)
if err != nil {
log.Printf("cannot create client: %v", err)
return
}
if _, err := client.Lists(); err != nil {
log.Printf("cannot client.lists: %v", err)
return
}
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
todo := NewStringArray()
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil {
log.Printf("cannot get %v.%v: %v", nsQueue, keyQueue, err)
return
}
sa := todo.StringArray()
nsa := []string{}
for i := range sa {
qop := &QueuedOp{}
if err := buffer.db.Get(nsTasks, sa[i], qop); err != nil {
log.Printf("cannot get %v.%v: %v", nsTasks, sa[i], err)
nsa = append(nsa, sa[i])
continue
}
log.Printf("UPSERT %v", qop)
}
if err := buffer.db.Set(nsQueue, keyQueue, NewStringArray(nsa...)); err != nil {
log.Printf("cannot update queue: %v", err)
return
}
})
}
func (buffer *Buffer) RefreshLocal() {
buffer.notDoneCallback(func() {
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
})
}
func (buffer *Buffer) notDoneCallback(foo func()) {
for {
select {
case <-buffer.done:
return
case <-time.After(buffer.interval):
foo()
}
}
}