192 lines
4.3 KiB
Go
192 lines
4.3 KiB
Go
package mytinytodo
|
|
|
|
import (
|
|
"fmt"
|
|
"local/mytinytodoclient/mytinytodo/remote"
|
|
"local/rproxy3/storage"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
const nsQueue = "delta"
|
|
const keyQueue = "queue"
|
|
const nsLists = "lists"
|
|
const keyLists = "lists"
|
|
const nsTasks = "tasks"
|
|
const keyTasks = "tasks"
|
|
|
|
type Buffer struct {
|
|
config *Config
|
|
db storage.DB
|
|
dbLock *sync.RWMutex
|
|
done chan struct{}
|
|
interval time.Duration
|
|
}
|
|
|
|
func NewBuffer(config *Config) (*Buffer, error) {
|
|
db := storage.NewMap()
|
|
b := &Buffer{
|
|
config: config,
|
|
db: db,
|
|
done: make(chan struct{}),
|
|
dbLock: &sync.RWMutex{},
|
|
interval: time.Second * 10,
|
|
}
|
|
go b.notDoneCallback(b.Dequeue)
|
|
go b.notDoneCallback(b.RefreshLocal)
|
|
return b, nil
|
|
}
|
|
|
|
func (buffer *Buffer) Close() {
|
|
close(buffer.done)
|
|
}
|
|
|
|
func (buffer *Buffer) Enqueue(op remote.Op, list remote.List, task remote.Task, taskTags ...string) error {
|
|
buffer.dbLock.Lock()
|
|
defer buffer.dbLock.Unlock()
|
|
qop := &QueuedOp{
|
|
Op: op,
|
|
List: list,
|
|
Task: task,
|
|
TaskTags: taskTags,
|
|
}
|
|
uuid, _ := uuid.NewRandom()
|
|
key := uuid.String()
|
|
|
|
todo := NewStringArray()
|
|
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil && err != storage.ErrNotFound {
|
|
return err
|
|
}
|
|
|
|
sa := todo.StringArray()
|
|
sa = append(sa, key)
|
|
todo = NewStringArray(sa...)
|
|
if err := buffer.db.Set(nsQueue, key, qop); err != nil {
|
|
return err
|
|
}
|
|
if err := buffer.db.Set(nsQueue, keyQueue, todo); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("enqueued task %v as %v, %vth in line", qop, key, len(sa))
|
|
return nil
|
|
}
|
|
|
|
func (buffer *Buffer) Dequeue() {
|
|
client, err := remote.NewClient(buffer.config.Config)
|
|
if err != nil {
|
|
log.Printf("cannot create client: %v", err)
|
|
return
|
|
}
|
|
if _, err := client.Lists(); err != nil {
|
|
log.Printf("cannot client.lists: %v", err)
|
|
return
|
|
}
|
|
|
|
buffer.dbLock.Lock()
|
|
defer buffer.dbLock.Unlock()
|
|
todo := NewStringArray()
|
|
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil {
|
|
log.Printf("cannot get %v.%v: %v", nsQueue, keyQueue, err)
|
|
return
|
|
}
|
|
|
|
sa := todo.StringArray()
|
|
nsa := []string{}
|
|
for i := range sa {
|
|
qop := &QueuedOp{}
|
|
if err := buffer.db.Get(nsQueue, sa[i], qop); err != nil {
|
|
log.Printf("cannot get %v.%v: %v", nsQueue, sa[i], err)
|
|
nsa = append(nsa, sa[i])
|
|
continue
|
|
}
|
|
var err error
|
|
switch qop.Op {
|
|
case remote.NEW:
|
|
err = client.NewTask(qop.List, qop.Task, strings.Join(qop.TaskTags, ","))
|
|
log.Printf("dequeue NEW(%v, %v, %v): %v", qop.List, qop.Task, qop.TaskTags, err)
|
|
case remote.CLOSE:
|
|
err = client.CloseTask(qop.Task)
|
|
log.Printf("dequeue CLOSE(%v): %v", qop.Task, err)
|
|
case remote.OPEN:
|
|
err = client.OpenTask(qop.Task)
|
|
log.Printf("dequeue OPEN(%v): %v", qop.Task, err)
|
|
default:
|
|
err = fmt.Errorf("cannot dequeue op %v", qop.Op)
|
|
}
|
|
if err != nil {
|
|
log.Printf("failed op %v: %v", qop.Op, err)
|
|
continue
|
|
}
|
|
if err := buffer.db.Set(nsQueue, sa[i], nil); err != nil {
|
|
log.Printf("cannot unset %v.%v: %v", nsQueue, sa[i], err)
|
|
nsa = append(nsa, sa[i])
|
|
continue
|
|
}
|
|
}
|
|
|
|
if err := buffer.db.Set(nsQueue, keyQueue, NewStringArray(nsa...)); err != nil {
|
|
log.Printf("cannot update queue: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (buffer *Buffer) RefreshLocal() {
|
|
client, err := remote.NewClient(buffer.config.Config)
|
|
if err != nil {
|
|
log.Printf("cannot create client: %v", err)
|
|
return
|
|
}
|
|
lists, err := client.Lists()
|
|
if err != nil {
|
|
log.Printf("cannot client.lists: %v", err)
|
|
return
|
|
}
|
|
|
|
buffer.dbLock.Lock()
|
|
defer buffer.dbLock.Unlock()
|
|
if err := buffer.db.Set(nsLists, keyLists, &lists); err != nil {
|
|
log.Printf("cannot set lists: %v", err)
|
|
return
|
|
}
|
|
|
|
for _, list := range lists {
|
|
tasks, err := client.Tasks(list)
|
|
if err != nil {
|
|
log.Printf("cannot client.tasks(%v): %v", list, err)
|
|
continue
|
|
}
|
|
if err := buffer.db.Set(nsTasks, list.ID, &tasks); err != nil {
|
|
log.Printf("cannot set tasks(%v): %v", list.ID, tasks)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (buffer *Buffer) Lists() (remote.Lists, error) {
|
|
var lists remote.Lists
|
|
err := buffer.db.Get(nsLists, keyLists, &lists)
|
|
return lists, err
|
|
}
|
|
|
|
func (buffer *Buffer) Tasks(list remote.List) (remote.Tasks, error) {
|
|
var tasks remote.Tasks
|
|
err := buffer.db.Get(nsTasks, list.ID, &tasks)
|
|
return tasks, err
|
|
}
|
|
|
|
func (buffer *Buffer) notDoneCallback(foo func()) {
|
|
for {
|
|
select {
|
|
case <-buffer.done:
|
|
return
|
|
case <-time.After(buffer.interval):
|
|
foo()
|
|
}
|
|
}
|
|
}
|