Create monitor for item acceptance/publishing
parent
49d95c150e
commit
45465680ae
|
|
@ -0,0 +1,44 @@
|
||||||
|
package monitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang-collections/go-datastructures/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Item struct {
|
||||||
|
url string
|
||||||
|
interval time.Duration
|
||||||
|
next time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewItem(url string, interval time.Duration) *Item {
|
||||||
|
return &Item{
|
||||||
|
url: url,
|
||||||
|
interval: interval,
|
||||||
|
next: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (item *Item) Compare(other queue.Item) int {
|
||||||
|
j, ok := other.(*Item)
|
||||||
|
if !ok {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if item.next.Before(j.next) {
|
||||||
|
return -1
|
||||||
|
} else if item.next.After(j.next) {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
if item.interval < j.interval {
|
||||||
|
return -1
|
||||||
|
} else if item.interval > j.interval {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (item *Item) increment() *Item {
|
||||||
|
item.next = time.Now().Add(item.interval)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
package monitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_Item(t *testing.T) {
|
||||||
|
a := NewItem("localhost:12345", time.Second)
|
||||||
|
b := NewItem("localhost:54321", time.Minute)
|
||||||
|
if a.Compare(b) != -1 {
|
||||||
|
t.Errorf("incorrect item comparison")
|
||||||
|
}
|
||||||
|
was := b.next
|
||||||
|
if was == b.increment().next {
|
||||||
|
t.Errorf("increment didn't apply")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_ItemCompare(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
inter [2]time.Duration
|
||||||
|
nexts [2]time.Time
|
||||||
|
result int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
nexts: [2]time.Time{time.Unix(0, 0), time.Unix(10, 10)},
|
||||||
|
result: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nexts: [2]time.Time{time.Unix(10, 10), time.Unix(0, 0)},
|
||||||
|
result: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nexts: [2]time.Time{time.Unix(10, 10), time.Unix(10, 10)},
|
||||||
|
result: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
nexts: [2]time.Time{time.Unix(0, 0), time.Unix(10, 10)},
|
||||||
|
result: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inter: [2]time.Duration{time.Second, time.Minute},
|
||||||
|
result: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inter: [2]time.Duration{time.Minute, time.Second},
|
||||||
|
result: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inter: [2]time.Duration{time.Second, time.Second},
|
||||||
|
result: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
comparison := (&Item{next: c.nexts[0], interval: c.inter[0]}).Compare(&Item{next: c.nexts[1], interval: c.inter[1]})
|
||||||
|
if comparison != c.result {
|
||||||
|
t.Errorf("failed to compare %v: got %v, expected %v", c.nexts, comparison, c.result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,83 @@
|
||||||
|
package monitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang-collections/go-datastructures/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Monitor struct {
|
||||||
|
newItems chan Item
|
||||||
|
triggeredItems chan Item
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(newItems, triggeredItems chan Item) (*Monitor, error) {
|
||||||
|
return &Monitor{
|
||||||
|
newItems: newItems,
|
||||||
|
triggeredItems: triggeredItems,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (monitor *Monitor) Start() error {
|
||||||
|
queue := queue.NewPriorityQueue(1)
|
||||||
|
active := make(map[string]*Item)
|
||||||
|
nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case newItem := <-monitor.newItems:
|
||||||
|
if newItem == (Item{}) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if oldItem, ok := active[newItem.url]; ok {
|
||||||
|
*oldItem = newItem
|
||||||
|
} else {
|
||||||
|
queue.Put(&newItem)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if nextEvent, err = nextEventTime(queue); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case <-time.After(nextEvent.Sub(time.Now())):
|
||||||
|
items, err := queue.Get(1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(items) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
item, ok := items[0].(*Item)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("queue contains illegal item")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case monitor.triggeredItems <- *item:
|
||||||
|
item.increment()
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
}
|
||||||
|
queue.Put(item)
|
||||||
|
if nextEvent, err = nextEventTime(queue); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func nextEventTime(q *queue.PriorityQueue) (time.Time, error) {
|
||||||
|
qitem := q.Peek()
|
||||||
|
item, ok := qitem.(*Item)
|
||||||
|
if !ok {
|
||||||
|
return time.Unix(0, 0), errors.New("non-item in priority queue")
|
||||||
|
}
|
||||||
|
return item.next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (monitor *Monitor) Stop() error {
|
||||||
|
select {
|
||||||
|
case monitor.newItems <- Item{}:
|
||||||
|
return nil
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
}
|
||||||
|
return errors.New("could not stop monitor")
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
package monitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang-collections/go-datastructures/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_Monitor(t *testing.T) {
|
||||||
|
itemsNew := make(chan Item, 1)
|
||||||
|
itemsDone := make(chan Item, 1)
|
||||||
|
m, err := New(itemsNew, itemsDone)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot create new monitor: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := make(chan error)
|
||||||
|
go func() {
|
||||||
|
errs <- m.Start()
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case err := <-errs:
|
||||||
|
t.Fatalf("monitor stopped early: %v", err)
|
||||||
|
case <-time.After(time.Second * 1):
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10)
|
||||||
|
select {
|
||||||
|
case itemsNew <- *item:
|
||||||
|
case <-time.After(time.Second * 1):
|
||||||
|
t.Fatalf("could not add new item in time limit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case triggered := <-itemsDone:
|
||||||
|
if triggered.url != "item"+strconv.Itoa(i) {
|
||||||
|
t.Fatalf("wrong item done order: %d was %v", i, triggered)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second * 3):
|
||||||
|
t.Fatalf("could not get done item in time limit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.Stop(); err != nil {
|
||||||
|
t.Fatalf("could not stop monitor: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_NextEvent(t *testing.T) {
|
||||||
|
q := queue.NewPriorityQueue(1)
|
||||||
|
a := NewItem("a", time.Second*5)
|
||||||
|
b := NewItem("b", time.Second*500)
|
||||||
|
q.Put(a)
|
||||||
|
q.Put(b)
|
||||||
|
if time, err := nextEventTime(q); err != nil {
|
||||||
|
t.Fatalf("could not get next event time: %v", err)
|
||||||
|
} else if time != a.next {
|
||||||
|
t.Fatalf("got wrong next event time: %v, expected %v", time, a.next)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue