diff --git a/monitor/item.go b/monitor/item.go index f211a22..7dbbf9f 100644 --- a/monitor/item.go +++ b/monitor/item.go @@ -12,6 +12,10 @@ type Duration struct { time.Duration } +func (d *Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(d.Duration) +} + func (d *Duration) UnmarshalJSON(b []byte) error { var v interface{} if err := json.Unmarshal(b, &v); err != nil { diff --git a/monitor/item_test.go b/monitor/item_test.go index cbec746..c925bbb 100644 --- a/monitor/item_test.go +++ b/monitor/item_test.go @@ -1,6 +1,7 @@ package monitor import ( + "encoding/json" "testing" "time" ) @@ -59,3 +60,15 @@ func Test_ItemCompare(t *testing.T) { } } } + +func Test_ItemJSON(t *testing.T) { + item := NewItem("any", time.Minute) + var itemB Item + if b, err := json.Marshal(item); err != nil { + t.Fatalf("cannot marshal item: %v", err) + } else if err := json.Unmarshal(b, &itemB); err != nil { + t.Fatalf("cannot unmarshal item: %v", err) + } else if item.next = itemB.next; itemB != *item { + t.Fatalf("unmarshaled item does not match original: %v, expected %v", itemB, *item) + } +} diff --git a/monitor/monitor.go b/monitor/monitor.go index fb6d502..58dd670 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -1,13 +1,8 @@ package monitor import ( - "context" - "encoding/json" "errors" "fmt" - "io/ioutil" - "local1/logger" - "net/http" "time" "github.com/golang-collections/go-datastructures/queue" @@ -17,7 +12,6 @@ type Monitor struct { newItems chan Item trigger func(string) port string - server *http.Server } func New(port string, trigger func(string)) (*Monitor, error) { @@ -29,58 +23,8 @@ func New(port string, trigger func(string)) (*Monitor, error) { }, nil } -func (monitor *Monitor) listen() error { - monitor.server = &http.Server{ - Addr: monitor.port, - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/newfeed" { - http.NotFound(w, r) - logger.Logf("bad path: %q", r.URL.Path) - return - } - if r.Method != "PUT" && r.Method != "POST" { - http.NotFound(w, r) - logger.Logf("bad method: %q", r.Method) - return - } - b, err := ioutil.ReadAll(r.Body) - defer r.Body.Close() - if err != nil || len(b) == 0 { - logger.Log(len(b), err) - w.WriteHeader(http.StatusBadRequest) - return - } - var item Item - if err := json.Unmarshal(b, &item); err != nil { - logger.Log(err) - w.WriteHeader(http.StatusBadRequest) - return - } - select { - case monitor.newItems <- item: - w.WriteHeader(http.StatusOK) - case <-time.After(time.Second * 10): - w.WriteHeader(http.StatusInternalServerError) - } - }), - } - if err := monitor.server.ListenAndServe(); err != http.ErrServerClosed && err != nil { - return err - } - return nil -} - func (monitor *Monitor) Start() error { errs := make(chan error) - go func() { - if err := monitor.listen(); err != nil { - select { - case errs <- err: - case <-time.After(time.Second * 5): - panic(err) - } - } - }() go func() { if err := monitor.loop(); err != nil { select { @@ -92,7 +36,7 @@ func (monitor *Monitor) Start() error { }() select { case err := <-errs: - return fmt.Errorf("%s: %v", "monitor server quit early", err) + return fmt.Errorf("%s: %v", "monitor loop quit early", err) case <-time.After(time.Second * 2): } return nil @@ -159,7 +103,5 @@ func (monitor *Monitor) Stop() error { case <-time.After(time.Second * 10): return errors.New("could not stop monitor") } - ctx, can := context.WithTimeout(context.Background(), time.Second*10) - defer can() - return monitor.server.Shutdown(ctx) + return nil } diff --git a/monitor/monitor_test.go b/monitor/monitor_test.go index 18c98ea..16850f5 100644 --- a/monitor/monitor_test.go +++ b/monitor/monitor_test.go @@ -1,8 +1,6 @@ package monitor import ( - "bytes" - "net/http" "strconv" "testing" "time" @@ -12,43 +10,10 @@ import ( const testmport = ":13152" -func Test_MonitorListen(t *testing.T) { - m, err := New(testmport, func(string) {}) - if err != nil { - t.Fatalf("cannot create new monitor: %v", err) - } - if err := m.Start(); err != nil { - t.Fatalf("cannot start monitor: %v", err) - } - defer m.Stop() - - if resp, err := http.Get("http://localhost" + testmport + "/mia"); err != nil { - t.Fatalf("GET error: %v", err) - } else if resp.StatusCode != http.StatusNotFound { - t.Errorf("GET /mia didn't 404: got %v", resp.StatusCode) - } - - if resp, err := http.Get("http://localhost" + testmport + "/newfeed"); err != nil { - t.Fatalf("GET error: %v", err) - } else if resp.StatusCode != http.StatusNotFound { - t.Errorf("GET /newfeed didn't 404: got %v", resp.StatusCode) - } - - if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(""))); err != nil { - t.Fatalf("POST error: %v", err) - } else if resp.StatusCode != http.StatusBadRequest { - t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode) - } - - if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(`{"URL":"hello", "Interval":"5m"}`))); err != nil { - t.Fatalf("POST error: %v", err) - } else if resp.StatusCode != http.StatusOK { - t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode) - } -} - func Test_Monitor(t *testing.T) { - m, err := New(testmport, func(string) {}) + numItems := 2 + completed := make(chan struct{}, numItems) + m, err := New(testmport, func(string) { completed <- struct{}{} }) if err != nil { t.Fatalf("cannot create new monitor: %v", err) } @@ -59,8 +24,8 @@ func Test_Monitor(t *testing.T) { t.Fatalf("cannot start monitor: %v", err) } - for i := 0; i < 2; i++ { - item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10) + for i := 0; i < numItems; i++ { + item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*1) select { case itemsNew <- *item: case <-time.After(time.Second * 5): @@ -68,6 +33,14 @@ func Test_Monitor(t *testing.T) { } } + for i := 0; i < numItems; i++ { + select { + case <-completed: + case <-time.After(time.Second * 5): + t.Errorf("did not complete item %d", i) + } + } + if err := m.Stop(); err != nil { t.Fatalf("could not stop monitor: %v", err) }