diff --git a/monitor/item.go b/monitor/item.go new file mode 100644 index 0000000..a41caf9 --- /dev/null +++ b/monitor/item.go @@ -0,0 +1,44 @@ +package monitor + +import ( + "time" + + "github.com/golang-collections/go-datastructures/queue" +) + +type Item struct { + url string + interval time.Duration + next time.Time +} + +func NewItem(url string, interval time.Duration) *Item { + return &Item{ + url: url, + interval: interval, + next: time.Now(), + } +} + +func (item *Item) Compare(other queue.Item) int { + j, ok := other.(*Item) + if !ok { + return 0 + } + if item.next.Before(j.next) { + return -1 + } else if item.next.After(j.next) { + return 1 + } + if item.interval < j.interval { + return -1 + } else if item.interval > j.interval { + return 1 + } + return 0 +} + +func (item *Item) increment() *Item { + item.next = time.Now().Add(item.interval) + return item +} diff --git a/monitor/item_test.go b/monitor/item_test.go new file mode 100644 index 0000000..e2b55ba --- /dev/null +++ b/monitor/item_test.go @@ -0,0 +1,61 @@ +package monitor + +import ( + "testing" + "time" +) + +func Test_Item(t *testing.T) { + a := NewItem("localhost:12345", time.Second) + b := NewItem("localhost:54321", time.Minute) + if a.Compare(b) != -1 { + t.Errorf("incorrect item comparison") + } + was := b.next + if was == b.increment().next { + t.Errorf("increment didn't apply") + } +} + +func Test_ItemCompare(t *testing.T) { + cases := []struct { + inter [2]time.Duration + nexts [2]time.Time + result int + }{ + { + nexts: [2]time.Time{time.Unix(0, 0), time.Unix(10, 10)}, + result: -1, + }, + { + nexts: [2]time.Time{time.Unix(10, 10), time.Unix(0, 0)}, + result: 1, + }, + { + nexts: [2]time.Time{time.Unix(10, 10), time.Unix(10, 10)}, + result: 0, + }, + { + nexts: [2]time.Time{time.Unix(0, 0), time.Unix(10, 10)}, + result: -1, + }, + { + inter: [2]time.Duration{time.Second, time.Minute}, + result: -1, + }, + { + inter: [2]time.Duration{time.Minute, time.Second}, + result: 1, + }, + { + inter: [2]time.Duration{time.Second, time.Second}, + result: 0, + }, + } + for _, c := range cases { + comparison := (&Item{next: c.nexts[0], interval: c.inter[0]}).Compare(&Item{next: c.nexts[1], interval: c.inter[1]}) + if comparison != c.result { + t.Errorf("failed to compare %v: got %v, expected %v", c.nexts, comparison, c.result) + } + } +} diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 0000000..35bea62 --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,83 @@ +package monitor + +import ( + "errors" + "time" + + "github.com/golang-collections/go-datastructures/queue" +) + +type Monitor struct { + newItems chan Item + triggeredItems chan Item +} + +func New(newItems, triggeredItems chan Item) (*Monitor, error) { + return &Monitor{ + newItems: newItems, + triggeredItems: triggeredItems, + }, nil +} + +func (monitor *Monitor) Start() error { + queue := queue.NewPriorityQueue(1) + active := make(map[string]*Item) + nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC) + for { + select { + case newItem := <-monitor.newItems: + if newItem == (Item{}) { + return nil + } + if oldItem, ok := active[newItem.url]; ok { + *oldItem = newItem + } else { + queue.Put(&newItem) + } + var err error + if nextEvent, err = nextEventTime(queue); err != nil { + return err + } + case <-time.After(nextEvent.Sub(time.Now())): + items, err := queue.Get(1) + if err != nil { + return err + } + if len(items) == 0 { + continue + } + item, ok := items[0].(*Item) + if !ok { + return errors.New("queue contains illegal item") + } + select { + case monitor.triggeredItems <- *item: + item.increment() + case <-time.After(time.Second * 10): + } + queue.Put(item) + if nextEvent, err = nextEventTime(queue); err != nil { + return err + } + } + } + return nil +} + +func nextEventTime(q *queue.PriorityQueue) (time.Time, error) { + qitem := q.Peek() + item, ok := qitem.(*Item) + if !ok { + return time.Unix(0, 0), errors.New("non-item in priority queue") + } + return item.next, nil +} + +func (monitor *Monitor) Stop() error { + select { + case monitor.newItems <- Item{}: + return nil + case <-time.After(time.Second * 10): + } + return errors.New("could not stop monitor") +} diff --git a/monitor/monitor_test.go b/monitor/monitor_test.go new file mode 100644 index 0000000..1f23de0 --- /dev/null +++ b/monitor/monitor_test.go @@ -0,0 +1,65 @@ +package monitor + +import ( + "strconv" + "testing" + "time" + + "github.com/golang-collections/go-datastructures/queue" +) + +func Test_Monitor(t *testing.T) { + itemsNew := make(chan Item, 1) + itemsDone := make(chan Item, 1) + m, err := New(itemsNew, itemsDone) + if err != nil { + t.Fatalf("cannot create new monitor: %v", err) + } + + errs := make(chan error) + go func() { + errs <- m.Start() + }() + select { + case err := <-errs: + t.Fatalf("monitor stopped early: %v", err) + case <-time.After(time.Second * 1): + } + + for i := 0; i < 2; i++ { + item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10) + select { + case itemsNew <- *item: + case <-time.After(time.Second * 1): + t.Fatalf("could not add new item in time limit") + } + } + + for i := 0; i < 2; i++ { + select { + case triggered := <-itemsDone: + if triggered.url != "item"+strconv.Itoa(i) { + t.Fatalf("wrong item done order: %d was %v", i, triggered) + } + case <-time.After(time.Second * 3): + t.Fatalf("could not get done item in time limit") + } + } + + if err := m.Stop(); err != nil { + t.Fatalf("could not stop monitor: %v", err) + } +} + +func Test_NextEvent(t *testing.T) { + q := queue.NewPriorityQueue(1) + a := NewItem("a", time.Second*5) + b := NewItem("b", time.Second*500) + q.Put(a) + q.Put(b) + if time, err := nextEventTime(q); err != nil { + t.Fatalf("could not get next event time: %v", err) + } else if time != a.next { + t.Fatalf("got wrong next event time: %v, expected %v", time, a.next) + } +}