MyTinyTodo2/mytinytodo/buffer.go

177 lines
3.9 KiB
Go

package mytinytodo
import (
"fmt"
"local/mytinytodoclient/mytinytodo/remote"
"local/rproxy3/storage"
"log"
"strings"
"sync"
"time"
"github.com/google/uuid"
)
const nsQueue = "delta"
const keyQueue = "queue"
const nsLists = "lists"
const keyLists = "lists"
const nsTasks = "tasks"
const keyTasks = "tasks"
type Buffer struct {
config *Config
db storage.DB
dbLock *sync.RWMutex
done chan struct{}
interval time.Duration
}
func NewBuffer(config *Config) (*Buffer, error) {
db := storage.NewMap()
b := &Buffer{
config: config,
db: db,
done: make(chan struct{}),
dbLock: &sync.RWMutex{},
interval: time.Second * 10,
}
go b.notDoneCallback(b.Dequeue)
go b.notDoneCallback(b.RefreshLocal)
return b, nil
}
func (buffer *Buffer) Close() {
close(buffer.done)
}
func (buffer *Buffer) Enqueue(op remote.Op, listID, taskTitle string, taskTags ...string) error {
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
qop := &QueuedOp{
Op: op,
List: remote.List{ID: listID},
Task: remote.Task{Title: taskTitle},
TaskTags: taskTags,
}
uuid, _ := uuid.NewRandom()
key := uuid.String()
todo := NewStringArray()
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil && err != storage.ErrNotFound {
return err
}
sa := todo.StringArray()
sa = append(sa, key)
todo = NewStringArray(sa...)
if err := buffer.db.Set(nsQueue, key, qop); err != nil {
return err
}
if err := buffer.db.Set(nsQueue, keyQueue, todo); err != nil {
return err
}
log.Printf("enqueued task %v as %v, %vth in line", qop, key, len(sa))
return nil
}
func (buffer *Buffer) Dequeue() {
client, err := remote.NewClient(buffer.config.Config)
if err != nil {
log.Printf("cannot create client: %v", err)
return
}
if _, err := client.Lists(); err != nil {
log.Printf("cannot client.lists: %v", err)
return
}
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
todo := NewStringArray()
if err := buffer.db.Get(nsQueue, keyQueue, todo); err != nil {
log.Printf("cannot get %v.%v: %v", nsQueue, keyQueue, err)
return
}
sa := todo.StringArray()
nsa := []string{}
for i := range sa {
qop := &QueuedOp{}
if err := buffer.db.Get(nsQueue, sa[i], qop); err != nil {
log.Printf("cannot get %v.%v: %v", nsQueue, sa[i], err)
nsa = append(nsa, sa[i])
continue
}
var err error
switch qop.Op {
case remote.NEW:
err = client.NewTask(qop.List, qop.Task, strings.Join(qop.TaskTags, ","))
case remote.CLOSE:
err = client.CloseTask(qop.Task)
case remote.OPEN:
err = client.OpenTask(qop.Task)
default:
err = fmt.Errorf("cannot dequeue op %v", qop.Op)
}
if err != nil {
log.Printf("failed op %v: %v", qop.Op, err)
continue
}
if err := buffer.db.Set(nsQueue, sa[i], nil); err != nil {
log.Printf("cannot unset %v.%v: %v", nsQueue, sa[i], err)
nsa = append(nsa, sa[i])
continue
}
}
if err := buffer.db.Set(nsQueue, keyQueue, NewStringArray(nsa...)); err != nil {
log.Printf("cannot update queue: %v", err)
return
}
}
func (buffer *Buffer) RefreshLocal() {
client, err := remote.NewClient(buffer.config.Config)
if err != nil {
log.Printf("cannot create client: %v", err)
return
}
lists, err := client.Lists()
if err != nil {
log.Printf("cannot client.lists: %v", err)
return
}
buffer.dbLock.Lock()
defer buffer.dbLock.Unlock()
if err := buffer.db.Set(nsLists, keyLists, &lists); err != nil {
log.Printf("cannot set lists: %v", err)
return
}
for _, list := range lists {
tasks, err := client.Tasks(list)
if err != nil {
log.Printf("cannot client.tasks(%v): %v", list, err)
continue
}
if err := buffer.db.Set(nsTasks, list.ID, &tasks); err != nil {
log.Printf("cannot set tasks(%v): %v", list.ID, tasks)
continue
}
}
}
func (buffer *Buffer) notDoneCallback(foo func()) {
for {
select {
case <-buffer.done:
return
case <-time.After(buffer.interval):
foo()
}
}
}