Call monitor via callback in server
parent
772058abbc
commit
8f5630353e
13
main.go
13
main.go
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"local3/rssmon2/rss"
|
"local3/rssmon2/rss"
|
||||||
"local3/rssmon2/server"
|
"local3/rssmon2/server"
|
||||||
"local3/rssmon2/store"
|
"local3/rssmon2/store"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const nsForFeeds = "FEEDS"
|
const nsForFeeds = "FEEDS"
|
||||||
|
|
@ -30,28 +31,34 @@ func main() {
|
||||||
if !ok {
|
if !ok {
|
||||||
feed, items, err = rss.New(url, "Blue", "<img.*?/(img)?>")
|
feed, items, err = rss.New(url, "Blue", "<img.*?/(img)?>")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Log("can't create new RSS %q: %v", url, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
items, err = allFeeds[url].Update()
|
items, err = allFeeds[url].Update()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Log("can't update old RSS %q: %v", url, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
b, err := feed.Serialize()
|
b, err := feed.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Log("can't serialize to save RSS %q: %v", url, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := sclient.Set(nsForFeeds, feed.ID(), b); err != nil {
|
if err := sclient.Set(nsForFeeds, feed.ID(), b); err != nil {
|
||||||
|
logger.Log("can't save RSS %q.%q: %v", nsForFeeds, feed.ID(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Log("Saved feed", feed)
|
logger.Log("Saved feed", feed)
|
||||||
for i := range items {
|
for i := range items {
|
||||||
b, err := items[i].Serialize()
|
b, err := items[i].Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Log("can't save rss item %q.%q: %v", url, items[i].Link, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := sclient.Set(feed.ID(), items[i].ID(), b); err != nil {
|
if err := sclient.Set(feed.ID(), items[i].ID(), b); err != nil {
|
||||||
|
logger.Log("can't save rss item %q.%q: %v", feed.ID(), items[i].ID(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Log("Saved feed item", feed.ID(), items[i].ID(), items[i])
|
logger.Log("Saved feed item", feed.ID(), items[i].ID(), items[i])
|
||||||
|
|
@ -77,7 +84,11 @@ func main() {
|
||||||
}()
|
}()
|
||||||
*/
|
*/
|
||||||
|
|
||||||
server, err := server.New(config.Port)
|
server, err := server.New(config.Port, func(url string, interval time.Duration) {
|
||||||
|
if err := mon.Submit(url, interval); err != nil {
|
||||||
|
logger.Log("Cannot accept new feed %q: %v", url, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,18 @@ func New(port string, trigger func(string)) (*Monitor, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (monitor *Monitor) Submit(url string, interval time.Duration) error {
|
||||||
|
select {
|
||||||
|
case monitor.newItems <- Item{
|
||||||
|
URL: url,
|
||||||
|
Interval: Duration{interval},
|
||||||
|
}:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
return errors.New("timeout submitting new item")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (monitor *Monitor) Start() error {
|
func (monitor *Monitor) Start() error {
|
||||||
errs := make(chan error)
|
errs := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
||||||
|
|
@ -17,19 +17,14 @@ func Test_Monitor(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot create new monitor: %v", err)
|
t.Fatalf("cannot create new monitor: %v", err)
|
||||||
}
|
}
|
||||||
itemsNew := make(chan Item, 1)
|
|
||||||
m.newItems = itemsNew
|
|
||||||
|
|
||||||
if err := m.Start(); err != nil {
|
if err := m.Start(); err != nil {
|
||||||
t.Fatalf("cannot start monitor: %v", err)
|
t.Fatalf("cannot start monitor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < numItems; i++ {
|
for i := 0; i < numItems; i++ {
|
||||||
item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*1)
|
if err := m.Submit("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)); err != nil {
|
||||||
select {
|
t.Errorf("failed to submit item %d: %v", i, err)
|
||||||
case itemsNew <- *item:
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
t.Fatalf("could not add new item in time limit")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,18 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
addr string
|
addr string
|
||||||
|
newItemHandler func(string, time.Duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(addr string) (*Server, error) {
|
func New(addr string, newItemHandler func(string, time.Duration)) (*Server, error) {
|
||||||
return &Server{
|
return &Server{
|
||||||
addr: addr,
|
addr: addr,
|
||||||
|
newItemHandler: newItemHandler,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue