Callback rather than output channel
parent
eefb3d4163
commit
3339d7754f
|
|
@ -14,19 +14,18 @@ import (
|
|||
)
|
||||
|
||||
type Monitor struct {
|
||||
newItems chan Item
|
||||
triggeredItems chan Item
|
||||
port string
|
||||
server *http.Server
|
||||
newItems chan Item
|
||||
trigger func(string)
|
||||
port string
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
func New(port string) (*Monitor, error) {
|
||||
func New(port string, trigger func(string)) (*Monitor, error) {
|
||||
newItems := make(chan Item)
|
||||
triggeredItems := make(chan Item)
|
||||
return &Monitor{
|
||||
newItems: newItems,
|
||||
triggeredItems: triggeredItems,
|
||||
port: port,
|
||||
newItems: newItems,
|
||||
trigger: trigger,
|
||||
port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -77,7 +76,7 @@ func (monitor *Monitor) Start() error {
|
|||
if err := monitor.listen(); err != nil {
|
||||
select {
|
||||
case errs <- err:
|
||||
case <-time.After(time.Second * 10):
|
||||
case <-time.After(time.Second * 5):
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
|
@ -86,7 +85,7 @@ func (monitor *Monitor) Start() error {
|
|||
if err := monitor.loop(); err != nil {
|
||||
select {
|
||||
case errs <- err:
|
||||
case <-time.After(time.Second * 10):
|
||||
case <-time.After(time.Second * 5):
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
|
@ -94,7 +93,7 @@ func (monitor *Monitor) Start() error {
|
|||
select {
|
||||
case err := <-errs:
|
||||
return fmt.Errorf("%s: %v", "monitor server quit early", err)
|
||||
case <-time.After(time.Second * 5):
|
||||
case <-time.After(time.Second * 2):
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -130,11 +129,8 @@ func (monitor *Monitor) loop() error {
|
|||
if !ok {
|
||||
return errors.New("queue contains illegal item")
|
||||
}
|
||||
select {
|
||||
case monitor.triggeredItems <- *item:
|
||||
item.increment()
|
||||
case <-time.After(time.Second * 10):
|
||||
}
|
||||
monitor.trigger(item.URL)
|
||||
item.increment()
|
||||
queue.Put(item)
|
||||
if nextEvent, err = nextEventTime(queue); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
const testmport = ":13152"
|
||||
|
||||
func Test_MonitorListen(t *testing.T) {
|
||||
m, err := New(testmport)
|
||||
m, err := New(testmport, func(string) {})
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new monitor: %v", err)
|
||||
}
|
||||
|
|
@ -48,14 +48,12 @@ func Test_MonitorListen(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Monitor(t *testing.T) {
|
||||
m, err := New(testmport)
|
||||
m, err := New(testmport, func(string) {})
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new monitor: %v", err)
|
||||
}
|
||||
itemsNew := make(chan Item, 1)
|
||||
itemsDone := make(chan Item, 1)
|
||||
m.newItems = itemsNew
|
||||
m.triggeredItems = itemsDone
|
||||
|
||||
if err := m.Start(); err != nil {
|
||||
t.Fatalf("cannot start monitor: %v", err)
|
||||
|
|
@ -70,17 +68,6 @@ func Test_Monitor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case triggered := <-itemsDone:
|
||||
if triggered.URL != "item"+strconv.Itoa(i) {
|
||||
t.Fatalf("wrong item done order: %d was %v", i, triggered)
|
||||
}
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("could not get done item in time limit")
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.Stop(); err != nil {
|
||||
t.Fatalf("could not stop monitor: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue