118 lines
2.3 KiB
Go
118 lines
2.3 KiB
Go
package monitor
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/golang-collections/go-datastructures/queue"
|
|
)
|
|
|
|
type Monitor struct {
|
|
newItems chan Item
|
|
trigger func(string)
|
|
}
|
|
|
|
func New(trigger func(string)) (*Monitor, error) {
|
|
newItems := make(chan Item)
|
|
return &Monitor{
|
|
newItems: newItems,
|
|
trigger: trigger,
|
|
}, nil
|
|
}
|
|
|
|
func (monitor *Monitor) Submit(url string, interval time.Duration) error {
|
|
select {
|
|
case monitor.newItems <- Item{
|
|
URL: url,
|
|
Interval: Duration{interval},
|
|
}:
|
|
case <-time.After(time.Second * 5):
|
|
return errors.New("timeout submitting new item")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (monitor *Monitor) Start() error {
|
|
errs := make(chan error)
|
|
go func() {
|
|
if err := monitor.loop(); err != nil {
|
|
select {
|
|
case errs <- err:
|
|
case <-time.After(time.Second * 5):
|
|
panic(err)
|
|
}
|
|
}
|
|
}()
|
|
select {
|
|
case err := <-errs:
|
|
return fmt.Errorf("%s: %v", "monitor loop quit early", err)
|
|
case <-time.After(time.Second * 2):
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (monitor *Monitor) loop() error {
|
|
queue := queue.NewPriorityQueue(1)
|
|
active := make(map[string]*Item)
|
|
nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC)
|
|
for {
|
|
select {
|
|
case newItem := <-monitor.newItems:
|
|
if newItem == (Item{}) {
|
|
return nil
|
|
}
|
|
if oldItem, ok := active[newItem.URL]; ok {
|
|
*oldItem = newItem
|
|
} else {
|
|
queue.Put(&newItem)
|
|
}
|
|
var err error
|
|
if nextEvent, err = nextEventTime(queue); err != nil {
|
|
return err
|
|
}
|
|
case <-time.After(nextEvent.Sub(time.Now())):
|
|
items, err := queue.Get(1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(items) == 0 {
|
|
continue
|
|
}
|
|
item, ok := items[0].(*Item)
|
|
if !ok {
|
|
return errors.New("queue contains illegal item")
|
|
}
|
|
monitor.trigger(item.URL)
|
|
item.increment()
|
|
queue.Put(item)
|
|
if nextEvent, err = nextEventTime(queue); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func nextEventTime(q *queue.PriorityQueue) (time.Time, error) {
|
|
qitem := q.Peek()
|
|
item, ok := qitem.(*Item)
|
|
if !ok {
|
|
return time.Unix(0, 0), errors.New("non-item in priority queue")
|
|
}
|
|
return item.next, nil
|
|
}
|
|
|
|
func (monitor *Monitor) Close() error {
|
|
return monitor.Stop()
|
|
}
|
|
|
|
func (monitor *Monitor) Stop() error {
|
|
select {
|
|
case monitor.newItems <- Item{}:
|
|
case <-time.After(time.Second * 10):
|
|
return errors.New("could not stop monitor")
|
|
}
|
|
return nil
|
|
}
|