69 lines
1.1 KiB
Go
69 lines
1.1 KiB
Go
package monitor
|
|
|
|
import (
|
|
"local/rssmon3/config"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type Monitor struct {
|
|
queue *Queue
|
|
Incoming chan *Item
|
|
Outgoing chan *Item
|
|
config.Stoppable
|
|
}
|
|
|
|
func New() (*Monitor, error) {
|
|
q, err := newQueue()
|
|
return &Monitor{
|
|
queue: q,
|
|
Incoming: make(chan *Item),
|
|
Outgoing: make(chan *Item),
|
|
}, err
|
|
}
|
|
|
|
func (m *Monitor) Run() error {
|
|
for {
|
|
select {
|
|
case <-m.Stopped():
|
|
return nil
|
|
case i := <-m.enqueued():
|
|
m.enqueue(i)
|
|
continue
|
|
case <-m.triggered():
|
|
m.trigger()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Monitor) enqueued() chan *Item {
|
|
return m.Incoming
|
|
}
|
|
|
|
func (m *Monitor) enqueue(i *Item) {
|
|
m.queue.Push(i)
|
|
}
|
|
|
|
func (m *Monitor) triggered() <-chan time.Time {
|
|
if m.queue.Len() < 1 {
|
|
return nil
|
|
}
|
|
top := m.queue.Peek()
|
|
if top == nil {
|
|
return nil
|
|
}
|
|
block := time.Until(top.Last().Add(top.Interval()))
|
|
log.Printf("blocking %v until next task", block)
|
|
return time.After(time.Until(top.Last().Add(top.Interval())))
|
|
}
|
|
|
|
func (m *Monitor) trigger() {
|
|
i := m.queue.Pop()
|
|
if i == nil {
|
|
return
|
|
}
|
|
m.Outgoing <- i
|
|
i.Mark()
|
|
m.queue.Push(i)
|
|
}
|