Rssmon2/monitor/monitor.go

114 lines
2.4 KiB
Go

package monitor
import (
"errors"
"local1/logger"
"time"
"github.com/golang-collections/go-datastructures/queue"
)
type Monitor struct {
newItems chan Item
trigger func(string)
}
func New(trigger func(string)) (*Monitor, error) {
newItems := make(chan Item)
return &Monitor{
newItems: newItems,
trigger: trigger,
}, nil
}
func (monitor *Monitor) Submit(url string, interval time.Duration) error {
select {
case monitor.newItems <- Item{
URL: url,
Interval: Duration{interval},
}:
case <-time.After(time.Second * 5):
return errors.New("timeout submitting new item")
}
return nil
}
func (monitor *Monitor) Start() error {
go func() {
if err := monitor.loop(); err != nil {
panic(err)
}
}()
return nil
}
func (monitor *Monitor) loop() error {
queue := queue.NewPriorityQueue(1)
active := make(map[string]*Item)
nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC)
for {
select {
case newItem := <-monitor.newItems:
if newItem == (Item{}) {
return nil
}
if oldItem, ok := active[newItem.URL]; ok {
*oldItem = newItem
} else {
queue.Put(&newItem)
}
var err error
if nextEvent, err = nextEventTime(queue); err != nil {
logger.Log("no next event time", err)
nextEvent = time.Now().Add(time.Minute * 60)
continue
}
case <-time.After(nextEvent.Sub(time.Now())):
items, err := queue.Get(1)
if err != nil {
logger.Fatal("can't get item")
}
if len(items) == 0 {
logger.Log("no items in queue")
nextEvent = time.Now().Add(time.Minute * 60)
continue
}
item, ok := items[0].(*Item)
if !ok {
logger.Fatal("queue contains illegal item")
}
go monitor.trigger(item.URL)
item.increment()
queue.Put(item)
if nextEvent, err = nextEventTime(queue); err != nil {
logger.Log("no next event time", err)
nextEvent = time.Now().Add(time.Minute * 60)
continue
}
}
}
return errors.New("monitor loop exited")
}
func nextEventTime(q *queue.PriorityQueue) (time.Time, error) {
qitem := q.Peek()
item, ok := qitem.(*Item)
if !ok {
return time.Unix(0, 0), errors.New("non-item in priority queue")
}
return item.next, nil
}
func (monitor *Monitor) Close() error {
return monitor.Stop()
}
func (monitor *Monitor) Stop() error {
select {
case monitor.newItems <- Item{}:
case <-time.After(time.Second * 10):
return errors.New("could not stop monitor")
}
return nil
}