monitor a good
Former-commit-id: 5a37bfdfe208d3d1a71e6b4924209d0c8e6f53a0
This commit is contained in:
124
monitor/item.go
Normal file
124
monitor/item.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"local/rssmon3/config"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/golang-collections/go-datastructures/queue"
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Key string
|
||||
}
|
||||
|
||||
const nsLast = "nsLast"
|
||||
const nsInterval = "nsInterval"
|
||||
|
||||
var never = time.Date(2999, time.January, 1, 1, 1, 1, 1, time.UTC)
|
||||
var forever = time.Duration(time.Hour * 99999)
|
||||
|
||||
func NewItem(key string, interval time.Duration) (*Item, error) {
|
||||
i := &Item{
|
||||
Key: key,
|
||||
}
|
||||
|
||||
if err := i.setInterval(interval); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := i.setLast(time.Now().Add(-2 * interval)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Item{
|
||||
Key: key,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *Item) Compare(other queue.Item) int {
|
||||
j := other.(*Item)
|
||||
iNext := i.Last().Add(i.Interval())
|
||||
jNext := j.Last().Add(j.Interval())
|
||||
if iNext.Before(jNext) {
|
||||
return 1
|
||||
} else if jNext.Before(iNext) {
|
||||
return -1
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Item) Interval() time.Duration {
|
||||
t, err := i.getInterval()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return forever
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func (i *Item) getInterval() (time.Duration, error) {
|
||||
t := time.Duration(0)
|
||||
b, err := config.Values().DB.Get(i.Key, nsInterval)
|
||||
if err != nil {
|
||||
return forever, err
|
||||
}
|
||||
buff := bytes.NewBuffer(b)
|
||||
dec := gob.NewDecoder(buff)
|
||||
err = dec.Decode(&t)
|
||||
return t, err
|
||||
}
|
||||
|
||||
func (i *Item) setInterval(t time.Duration) error {
|
||||
buff := bytes.NewBuffer(nil)
|
||||
enc := gob.NewEncoder(buff)
|
||||
if err := enc.Encode(t); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := config.Values().DB.Set(i.Key, buff.Bytes(), nsInterval); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Item) Last() time.Time {
|
||||
t, err := i.getLast()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func (i *Item) Mark() {
|
||||
if err := i.setLast(time.Now()); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Item) setLast(t time.Time) error {
|
||||
buff := bytes.NewBuffer(nil)
|
||||
enc := gob.NewEncoder(buff)
|
||||
if err := enc.Encode(t); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := config.Values().DB.Set(i.Key, buff.Bytes(), nsLast); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Item) getLast() (time.Time, error) {
|
||||
t := time.Now()
|
||||
b, err := config.Values().DB.Get(i.Key, nsLast)
|
||||
if err != nil {
|
||||
return never, err
|
||||
}
|
||||
buff := bytes.NewBuffer(b)
|
||||
dec := gob.NewDecoder(buff)
|
||||
if err := dec.Decode(&t); err != nil {
|
||||
return never, err
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
123
monitor/item_test.go
Normal file
123
monitor/item_test.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"local/rssmon3/config"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func initItem() {
|
||||
os.Args = []string{"a", "-db", "map"}
|
||||
if err := config.New(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemNewItem(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i, err := NewItem("NewItem", time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if i.Key != "NewItem" {
|
||||
t.Fatal(i.Key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemCompare(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i, _ := NewItem("iCompare", time.Second)
|
||||
j, _ := NewItem("jCompare", 10*time.Second)
|
||||
if c := i.Compare(j); c != -1 {
|
||||
t.Fatal(c)
|
||||
}
|
||||
|
||||
i, _ = NewItem("iCompare", time.Second)
|
||||
j, _ = NewItem("iCompare", time.Second)
|
||||
if c := i.Compare(j); c != 0 {
|
||||
t.Fatal(c)
|
||||
}
|
||||
|
||||
i, _ = NewItem("iCompare", 10*time.Second)
|
||||
j, _ = NewItem("jCompare", time.Second)
|
||||
if c := i.Compare(j); c != 1 {
|
||||
t.Fatal(c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemInterval(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i, _ := NewItem("iInterval", 10*time.Second)
|
||||
if i := i.Interval(); i != 10*time.Second {
|
||||
t.Fatal(i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemGetSetInterval(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i := &Item{Key: "GetSetInterval"}
|
||||
|
||||
if i, err := i.getInterval(); err == nil {
|
||||
t.Errorf("get interval of unset passes")
|
||||
} else if i != forever {
|
||||
t.Errorf("failing get interval is not forever")
|
||||
}
|
||||
|
||||
if err := i.setInterval(time.Minute); err != nil {
|
||||
t.Errorf("failing set interval: %v", err)
|
||||
}
|
||||
|
||||
if i, err := i.getInterval(); err != nil {
|
||||
t.Errorf("get interval of set fails: %v", err)
|
||||
} else if i != time.Minute {
|
||||
t.Errorf("passing get interval is not time.Minute")
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemLastMark(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i, _ := NewItem("iLastMark", 100*time.Millisecond)
|
||||
|
||||
first := i.Last()
|
||||
if first.Sub(time.Now().Add(-100*time.Millisecond)) > time.Millisecond*200 {
|
||||
t.Errorf("default last is wrong: %v", first)
|
||||
}
|
||||
|
||||
i.Mark()
|
||||
second := i.Last()
|
||||
if first == second {
|
||||
t.Errorf("mark() didn't change last")
|
||||
}
|
||||
if second.Sub(time.Now()) > time.Millisecond*100 {
|
||||
t.Errorf("marked last is wrong: %v", second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemGetSetLast(t *testing.T) {
|
||||
initItem()
|
||||
|
||||
i := &Item{Key: "iGetSetLast"}
|
||||
|
||||
if i, err := i.getLast(); err == nil {
|
||||
t.Errorf("get last of unset passes")
|
||||
} else if i != never {
|
||||
t.Errorf("failing get last is not never")
|
||||
}
|
||||
|
||||
stamp := time.Now()
|
||||
if err := i.setLast(stamp); err != nil {
|
||||
t.Errorf("failing set last: %v", err)
|
||||
}
|
||||
|
||||
if i, err := i.getLast(); err != nil {
|
||||
t.Errorf("get last of set fails: %v", err)
|
||||
} else if i.Sub(stamp) > time.Millisecond*10 {
|
||||
t.Errorf("passing get last is not %v: got %v (+%v)", stamp, i, i.Sub(stamp))
|
||||
}
|
||||
}
|
||||
67
monitor/monitor.go
Normal file
67
monitor/monitor.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"local/rssmon3/config"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Monitor struct {
|
||||
queue *Queue
|
||||
Incoming chan *Item
|
||||
Outgoing chan *Item
|
||||
}
|
||||
|
||||
func New() *Monitor {
|
||||
return &Monitor{
|
||||
queue: newQueue(),
|
||||
Incoming: make(chan *Item),
|
||||
Outgoing: make(chan *Item),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) Run() error {
|
||||
for {
|
||||
select {
|
||||
case <-m.stopped():
|
||||
return nil
|
||||
case i := <-m.enqueued():
|
||||
m.enqueue(i)
|
||||
continue
|
||||
case <-m.triggered():
|
||||
m.trigger()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Monitor) stopped() <-chan struct{} {
|
||||
return config.Values().Ctx.Done()
|
||||
}
|
||||
|
||||
func (m *Monitor) enqueued() chan *Item {
|
||||
return m.Incoming
|
||||
}
|
||||
|
||||
func (m *Monitor) enqueue(i *Item) {
|
||||
m.queue.Push(i)
|
||||
}
|
||||
|
||||
func (m *Monitor) triggered() <-chan time.Time {
|
||||
if m.queue.Len() < 1 {
|
||||
return nil
|
||||
}
|
||||
top := m.queue.Peek()
|
||||
if top == nil {
|
||||
return nil
|
||||
}
|
||||
return time.After(time.Until(top.Last().Add(top.Interval())))
|
||||
}
|
||||
|
||||
func (m *Monitor) trigger() {
|
||||
i := m.queue.Pop()
|
||||
if i == nil {
|
||||
return
|
||||
}
|
||||
m.Outgoing <- i
|
||||
i.Mark()
|
||||
m.queue.Push(i)
|
||||
}
|
||||
110
monitor/monitor_test.go
Normal file
110
monitor/monitor_test.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"local/rssmon3/config"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func initMonitor() {
|
||||
os.Args = []string{"a", "-db", "map"}
|
||||
if err := config.New(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorNew(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
New()
|
||||
}
|
||||
|
||||
func TestMonitorTrigger(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
m := New()
|
||||
i, _ := NewItem("MonitorTrigger", time.Second)
|
||||
i.setLast(never)
|
||||
if time.Now().After(i.Last()) {
|
||||
t.Error(i.Last())
|
||||
}
|
||||
|
||||
m.Outgoing = make(chan *Item, 1)
|
||||
m.queue.Push(i)
|
||||
m.trigger()
|
||||
|
||||
j := <-m.Outgoing
|
||||
if i.Key != j.Key {
|
||||
t.Errorf("popped %v != %v", j, i)
|
||||
}
|
||||
if time.Since(j.Last()) > time.Second {
|
||||
t.Error(j.Last())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorTriggered(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
m := New()
|
||||
i, _ := NewItem("MonitorTriggered", time.Second)
|
||||
i.setLast(time.Now().Add(time.Hour * -1))
|
||||
m.queue.Push(i)
|
||||
|
||||
start := time.Now()
|
||||
select {
|
||||
case <-m.triggered():
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
if time.Since(start) > time.Second {
|
||||
t.Errorf("failed to triggered")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorEnqueued(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
m := New()
|
||||
m.Incoming = make(chan *Item, 1)
|
||||
m.Incoming <- &Item{Key: "MonitorEnqueued"}
|
||||
|
||||
select {
|
||||
case i := <-m.enqueued():
|
||||
if i.Key != "MonitorEnqueued" {
|
||||
t.Error(i)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Errorf("enqueued didnt pop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorEnqueue(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
m := New()
|
||||
m.enqueue(nil)
|
||||
|
||||
if m.queue.Len() != 1 {
|
||||
t.Error(m.queue.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorRun(t *testing.T) {
|
||||
initMonitor()
|
||||
|
||||
m := New()
|
||||
m.Outgoing = make(chan *Item, 1)
|
||||
go m.Run()
|
||||
defer config.Values().Can()
|
||||
|
||||
i, _ := NewItem("MonitorRun", time.Second*5)
|
||||
m.Incoming <- i
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
for j := 0; j < 5; j++ {
|
||||
if m.queue.Len() == 1 {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
t.Errorf("incoming item didnt enter/reenter queue")
|
||||
}
|
||||
43
monitor/queue.go
Normal file
43
monitor/queue.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package monitor
|
||||
|
||||
import "github.com/golang-collections/go-datastructures/queue"
|
||||
|
||||
type Queue struct {
|
||||
queue *queue.PriorityQueue
|
||||
}
|
||||
|
||||
func newQueue() *Queue {
|
||||
return &Queue{
|
||||
queue: queue.NewPriorityQueue(1),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queue) Push(i *Item) {
|
||||
q.queue.Put(i)
|
||||
}
|
||||
|
||||
func (q *Queue) Pop() *Item {
|
||||
is, err := q.queue.Get(1)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if len(is) < 1 {
|
||||
return nil
|
||||
}
|
||||
if is[0] == nil {
|
||||
return nil
|
||||
}
|
||||
return is[0].(*Item)
|
||||
}
|
||||
|
||||
func (q *Queue) Peek() *Item {
|
||||
i := q.queue.Peek()
|
||||
if i == nil {
|
||||
return nil
|
||||
}
|
||||
return i.(*Item)
|
||||
}
|
||||
|
||||
func (q *Queue) Len() int {
|
||||
return q.queue.Len()
|
||||
}
|
||||
49
monitor/queue_test.go
Normal file
49
monitor/queue_test.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func initQueue() {
|
||||
initItem()
|
||||
}
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
initQueue()
|
||||
|
||||
q := newQueue()
|
||||
|
||||
if i := q.Peek(); i != nil {
|
||||
t.Error(i)
|
||||
}
|
||||
|
||||
i := &Item{Key: "k"}
|
||||
q.Push(i)
|
||||
|
||||
if i := q.Peek(); i.Key != "k" {
|
||||
t.Error(i)
|
||||
} else if j := q.Pop(); i != j {
|
||||
t.Error(j)
|
||||
} else if j.Key != "k" {
|
||||
t.Error(j)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueuePriority(t *testing.T) {
|
||||
initQueue()
|
||||
|
||||
q := newQueue()
|
||||
|
||||
i, _ := NewItem("iQueuePriority", time.Second)
|
||||
j, _ := NewItem("jQueuePriority", time.Hour)
|
||||
q.Push(i)
|
||||
q.Push(j)
|
||||
if k := q.Peek(); k.Key != "iQueuePriority" {
|
||||
t.Errorf("compare is backwards")
|
||||
}
|
||||
|
||||
if q.Len() != 2 {
|
||||
t.Error(q.Len())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user