monitor writes to db

Former-commit-id: 5aa70ad9d835c607abb1c19dc620f6c45d31a866
master
bel 2019-06-22 14:40:57 -06:00
parent 5e87f1659d
commit 093d468f87
8 changed files with 58 additions and 17 deletions

View File

@ -27,7 +27,10 @@ func main() {
} }
go InterruptAfter(s.Run, sigc) go InterruptAfter(s.Run, sigc)
m := monitor.New() m, err := monitor.New()
if err != nil {
panic(err)
}
go InterruptAfter(m.Run, sigc) go InterruptAfter(m.Run, sigc)
signal.Notify(sigc, signal.Notify(sigc,

View File

@ -12,12 +12,13 @@ type Monitor struct {
config.Stoppable config.Stoppable
} }
func New() *Monitor { func New() (*Monitor, error) {
q, err := newQueue()
return &Monitor{ return &Monitor{
queue: newQueue(), queue: q,
Incoming: make(chan *Item), Incoming: make(chan *Item),
Outgoing: make(chan *Item), Outgoing: make(chan *Item),
} }, err
} }
func (m *Monitor) Run() error { func (m *Monitor) Run() error {

View File

@ -23,7 +23,7 @@ func TestMonitorNew(t *testing.T) {
func TestMonitorTrigger(t *testing.T) { func TestMonitorTrigger(t *testing.T) {
initMonitor() initMonitor()
m := New() m, _ := New()
i, _ := NewItem("MonitorTrigger", time.Second) i, _ := NewItem("MonitorTrigger", time.Second)
i.setLast(never) i.setLast(never)
if time.Now().After(i.Last()) { if time.Now().After(i.Last()) {
@ -46,7 +46,7 @@ func TestMonitorTrigger(t *testing.T) {
func TestMonitorTriggered(t *testing.T) { func TestMonitorTriggered(t *testing.T) {
initMonitor() initMonitor()
m := New() m, _ := New()
i, _ := NewItem("MonitorTriggered", time.Second) i, _ := NewItem("MonitorTriggered", time.Second)
i.setLast(time.Now().Add(time.Hour * -1)) i.setLast(time.Now().Add(time.Hour * -1))
m.queue.Push(i) m.queue.Push(i)
@ -64,7 +64,7 @@ func TestMonitorTriggered(t *testing.T) {
func TestMonitorEnqueued(t *testing.T) { func TestMonitorEnqueued(t *testing.T) {
initMonitor() initMonitor()
m := New() m, _ := New()
m.Incoming = make(chan *Item, 1) m.Incoming = make(chan *Item, 1)
m.Incoming <- &Item{Key: "MonitorEnqueued"} m.Incoming <- &Item{Key: "MonitorEnqueued"}
@ -81,8 +81,8 @@ func TestMonitorEnqueued(t *testing.T) {
func TestMonitorEnqueue(t *testing.T) { func TestMonitorEnqueue(t *testing.T) {
initMonitor() initMonitor()
m := New() m, _ := New()
m.enqueue(nil) m.enqueue(&Item{})
if m.queue.Len() != 1 { if m.queue.Len() != 1 {
t.Error(m.queue.Len()) t.Error(m.queue.Len())
@ -92,7 +92,7 @@ func TestMonitorEnqueue(t *testing.T) {
func TestMonitorRun(t *testing.T) { func TestMonitorRun(t *testing.T) {
initMonitor() initMonitor()
m := New() m, _ := New()
m.Outgoing = make(chan *Item, 1) m.Outgoing = make(chan *Item, 1)
go m.Run() go m.Run()
defer config.Values().Can() defer config.Values().Can()

View File

@ -1,19 +1,55 @@
package monitor package monitor
import "github.com/golang-collections/go-datastructures/queue" import (
"local/rssmon3/config"
"log"
"github.com/golang-collections/go-datastructures/queue"
)
const nsQueued = "nsQueued"
type Queue struct { type Queue struct {
queue *queue.PriorityQueue queue *queue.PriorityQueue
} }
func newQueue() *Queue { func newQueue() (*Queue, error) {
return &Queue{ q := queue.NewPriorityQueue(1)
queue: queue.NewPriorityQueue(1), db := config.Values().DB
keys, err := db.List([]string{nsQueued})
if err != nil {
return nil, err
} }
for _, key := range keys {
b, err := db.Get(key, nsQueued)
if err != nil {
return nil, err
}
i := &Item{}
if err := i.Decode(b); err != nil {
return nil, err
}
q.Put(i)
}
return &Queue{
queue: q,
}, nil
} }
func (q *Queue) Push(i *Item) { func (q *Queue) Push(i *Item) {
if i == nil {
return
}
q.queue.Put(i) q.queue.Put(i)
b, err := i.Encode()
if err != nil {
log.Println(err)
return
}
if err := config.Values().DB.Set(i.Key, b, nsQueued); err != nil {
log.Println(err)
return
}
} }
func (q *Queue) Pop() *Item { func (q *Queue) Pop() *Item {

View File

@ -12,7 +12,7 @@ func initQueue() {
func TestQueue(t *testing.T) { func TestQueue(t *testing.T) {
initQueue() initQueue()
q := newQueue() q, _ := newQueue()
if i := q.Peek(); i != nil { if i := q.Peek(); i != nil {
t.Error(i) t.Error(i)
@ -33,7 +33,7 @@ func TestQueue(t *testing.T) {
func TestQueuePriority(t *testing.T) { func TestQueuePriority(t *testing.T) {
initQueue() initQueue()
q := newQueue() q, _ := newQueue()
i, _ := NewItem("iQueuePriority", time.Second) i, _ := NewItem("iQueuePriority", time.Second)
j, _ := NewItem("jQueuePriority", time.Hour) j, _ := NewItem("jQueuePriority", time.Hour)

View File

@ -1 +1 @@
0d46529517022e32f10ee1e18ee21760de34fecf cc79df3e2bb33d684975d23cd008eae93e7dffc2

View File

@ -105,4 +105,5 @@ func (s *Server) feed(w http.ResponseWriter, r *http.Request) {
} }
log.Println(i) log.Println(i)
s.error(w, r, errors.New("not impl")) s.error(w, r, errors.New("not impl"))
// TODO
} }