From 4946e53b57661596afce24a1a0185546ee9e127e Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Mon, 8 Oct 2018 19:34:34 -0600 Subject: [PATCH] Convert monitor to http server --- monitor/item.go | 43 +++++++++++++++---- monitor/item_test.go | 2 +- monitor/monitor.go | 94 +++++++++++++++++++++++++++++++++++++++-- monitor/monitor_test.go | 65 +++++++++++++++++++++------- 4 files changed, 176 insertions(+), 28 deletions(-) diff --git a/monitor/item.go b/monitor/item.go index a41caf9..f211a22 100644 --- a/monitor/item.go +++ b/monitor/item.go @@ -1,21 +1,48 @@ package monitor import ( + "encoding/json" + "errors" "time" "github.com/golang-collections/go-datastructures/queue" ) +type Duration struct { + time.Duration +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + d.Duration = time.Duration(value) + return nil + case string: + var err error + d.Duration, err = time.ParseDuration(value) + if err != nil { + return err + } + return nil + default: + return errors.New("invalid duration") + } +} + type Item struct { - url string - interval time.Duration + URL string + Interval Duration next time.Time } -func NewItem(url string, interval time.Duration) *Item { +func NewItem(URL string, Interval time.Duration) *Item { return &Item{ - url: url, - interval: interval, + URL: URL, + Interval: Duration{Interval}, next: time.Now(), } } @@ -30,15 +57,15 @@ func (item *Item) Compare(other queue.Item) int { } else if item.next.After(j.next) { return 1 } - if item.interval < j.interval { + if item.Interval.Duration < j.Interval.Duration { return -1 - } else if item.interval > j.interval { + } else if item.Interval.Duration > j.Interval.Duration { return 1 } return 0 } func (item *Item) increment() *Item { - item.next = time.Now().Add(item.interval) + item.next = time.Now().Add(item.Interval.Duration) return item } diff --git a/monitor/item_test.go b/monitor/item_test.go index e2b55ba..cbec746 100644 --- a/monitor/item_test.go +++ b/monitor/item_test.go @@ -53,7 +53,7 @@ func Test_ItemCompare(t *testing.T) { }, } for _, c := range cases { - comparison := (&Item{next: c.nexts[0], interval: c.inter[0]}).Compare(&Item{next: c.nexts[1], interval: c.inter[1]}) + comparison := (&Item{next: c.nexts[0], Interval: Duration{c.inter[0]}}).Compare(&Item{next: c.nexts[1], Interval: Duration{c.inter[1]}}) if comparison != c.result { t.Errorf("failed to compare %v: got %v, expected %v", c.nexts, comparison, c.result) } diff --git a/monitor/monitor.go b/monitor/monitor.go index 35bea62..69cdce9 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -1,7 +1,13 @@ package monitor import ( + "context" + "encoding/json" "errors" + "fmt" + "io/ioutil" + "local1/logger" + "net/http" "time" "github.com/golang-collections/go-datastructures/queue" @@ -10,16 +16,90 @@ import ( type Monitor struct { newItems chan Item triggeredItems chan Item + port string + server *http.Server } -func New(newItems, triggeredItems chan Item) (*Monitor, error) { +func New(port string) (*Monitor, error) { + newItems := make(chan Item) + triggeredItems := make(chan Item) return &Monitor{ newItems: newItems, triggeredItems: triggeredItems, + port: port, }, nil } +func (monitor *Monitor) listen() error { + monitor.server = &http.Server{ + Addr: monitor.port, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/newfeed" { + http.NotFound(w, r) + logger.Logf("bad path: %q", r.URL.Path) + return + } + if r.Method != "PUT" && r.Method != "POST" { + http.NotFound(w, r) + logger.Logf("bad method: %q", r.Method) + return + } + b, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil || len(b) == 0 { + logger.Log(len(b), err) + w.WriteHeader(http.StatusBadRequest) + return + } + var item Item + if err := json.Unmarshal(b, &item); err != nil { + logger.Log(err) + w.WriteHeader(http.StatusBadRequest) + return + } + select { + case monitor.newItems <- item: + w.WriteHeader(http.StatusOK) + case <-time.After(time.Second * 10): + w.WriteHeader(http.StatusInternalServerError) + } + }), + } + if err := monitor.server.ListenAndServe(); err != http.ErrServerClosed && err != nil { + return err + } + return nil +} + func (monitor *Monitor) Start() error { + errs := make(chan error) + go func() { + if err := monitor.listen(); err != nil { + select { + case errs <- err: + case <-time.After(time.Second * 10): + panic(err) + } + } + }() + go func() { + if err := monitor.loop(); err != nil { + select { + case errs <- err: + case <-time.After(time.Second * 10): + panic(err) + } + } + }() + select { + case err := <-errs: + return fmt.Errorf("%s: %v", "monitor server quit early", err) + case <-time.After(time.Second * 5): + } + return nil +} + +func (monitor *Monitor) loop() error { queue := queue.NewPriorityQueue(1) active := make(map[string]*Item) nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC) @@ -29,7 +109,7 @@ func (monitor *Monitor) Start() error { if newItem == (Item{}) { return nil } - if oldItem, ok := active[newItem.url]; ok { + if oldItem, ok := active[newItem.URL]; ok { *oldItem = newItem } else { queue.Put(&newItem) @@ -73,11 +153,17 @@ func nextEventTime(q *queue.PriorityQueue) (time.Time, error) { return item.next, nil } +func (monitor *Monitor) Close() error { + return monitor.Stop() +} + func (monitor *Monitor) Stop() error { select { case monitor.newItems <- Item{}: - return nil case <-time.After(time.Second * 10): + return errors.New("could not stop monitor") } - return errors.New("could not stop monitor") + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + return monitor.server.Shutdown(ctx) } diff --git a/monitor/monitor_test.go b/monitor/monitor_test.go index 1f23de0..35a3b20 100644 --- a/monitor/monitor_test.go +++ b/monitor/monitor_test.go @@ -1,6 +1,8 @@ package monitor import ( + "bytes" + "net/http" "strconv" "testing" "time" @@ -8,29 +10,62 @@ import ( "github.com/golang-collections/go-datastructures/queue" ) -func Test_Monitor(t *testing.T) { - itemsNew := make(chan Item, 1) - itemsDone := make(chan Item, 1) - m, err := New(itemsNew, itemsDone) +const testmport = ":13152" + +func Test_MonitorListen(t *testing.T) { + m, err := New(testmport) if err != nil { t.Fatalf("cannot create new monitor: %v", err) } + if err := m.Start(); err != nil { + t.Fatalf("cannot start monitor: %v", err) + } + defer m.Stop() - errs := make(chan error) - go func() { - errs <- m.Start() - }() - select { - case err := <-errs: - t.Fatalf("monitor stopped early: %v", err) - case <-time.After(time.Second * 1): + if resp, err := http.Get("http://localhost" + testmport + "/mia"); err != nil { + t.Fatalf("GET error: %v", err) + } else if resp.StatusCode != http.StatusNotFound { + t.Errorf("GET /mia didn't 404: got %v", resp.StatusCode) + } + + if resp, err := http.Get("http://localhost" + testmport + "/newfeed"); err != nil { + t.Fatalf("GET error: %v", err) + } else if resp.StatusCode != http.StatusNotFound { + t.Errorf("GET /newfeed didn't 404: got %v", resp.StatusCode) + } + + if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(""))); err != nil { + t.Fatalf("POST error: %v", err) + } else if resp.StatusCode != http.StatusBadRequest { + t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode) + } + + if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(`{"URL":"hello", "Interval":"5m"}`))); err != nil { + t.Fatalf("POST error: %v", err) + } else if resp.StatusCode != http.StatusOK { + t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode) + } +} + +func Test_Monitor(t *testing.T) { + m, err := New(testmport) + if err != nil { + t.Fatalf("cannot create new monitor: %v", err) + } + itemsNew := make(chan Item, 1) + itemsDone := make(chan Item, 1) + m.newItems = itemsNew + m.triggeredItems = itemsDone + + if err := m.Start(); err != nil { + t.Fatalf("cannot start monitor: %v", err) } for i := 0; i < 2; i++ { item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10) select { case itemsNew <- *item: - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): t.Fatalf("could not add new item in time limit") } } @@ -38,10 +73,10 @@ func Test_Monitor(t *testing.T) { for i := 0; i < 2; i++ { select { case triggered := <-itemsDone: - if triggered.url != "item"+strconv.Itoa(i) { + if triggered.URL != "item"+strconv.Itoa(i) { t.Fatalf("wrong item done order: %d was %v", i, triggered) } - case <-time.After(time.Second * 3): + case <-time.After(time.Second * 5): t.Fatalf("could not get done item in time limit") } }