package monitor import ( "context" "encoding/json" "errors" "fmt" "io/ioutil" "local1/logger" "net/http" "time" "github.com/golang-collections/go-datastructures/queue" ) type Monitor struct { newItems chan Item triggeredItems chan Item port string server *http.Server } func New(port string) (*Monitor, error) { newItems := make(chan Item) triggeredItems := make(chan Item) return &Monitor{ newItems: newItems, triggeredItems: triggeredItems, port: port, }, nil } func (monitor *Monitor) listen() error { monitor.server = &http.Server{ Addr: monitor.port, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/newfeed" { http.NotFound(w, r) logger.Logf("bad path: %q", r.URL.Path) return } if r.Method != "PUT" && r.Method != "POST" { http.NotFound(w, r) logger.Logf("bad method: %q", r.Method) return } b, err := ioutil.ReadAll(r.Body) defer r.Body.Close() if err != nil || len(b) == 0 { logger.Log(len(b), err) w.WriteHeader(http.StatusBadRequest) return } var item Item if err := json.Unmarshal(b, &item); err != nil { logger.Log(err) w.WriteHeader(http.StatusBadRequest) return } select { case monitor.newItems <- item: w.WriteHeader(http.StatusOK) case <-time.After(time.Second * 10): w.WriteHeader(http.StatusInternalServerError) } }), } if err := monitor.server.ListenAndServe(); err != http.ErrServerClosed && err != nil { return err } return nil } func (monitor *Monitor) Start() error { errs := make(chan error) go func() { if err := monitor.listen(); err != nil { select { case errs <- err: case <-time.After(time.Second * 10): panic(err) } } }() go func() { if err := monitor.loop(); err != nil { select { case errs <- err: case <-time.After(time.Second * 10): panic(err) } } }() select { case err := <-errs: return fmt.Errorf("%s: %v", "monitor server quit early", err) case <-time.After(time.Second * 5): } return nil } func (monitor *Monitor) loop() error { queue := queue.NewPriorityQueue(1) active := make(map[string]*Item) nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC) for { select { case newItem := <-monitor.newItems: if newItem == (Item{}) { return nil } if oldItem, ok := active[newItem.URL]; ok { *oldItem = newItem } else { queue.Put(&newItem) } var err error if nextEvent, err = nextEventTime(queue); err != nil { return err } case <-time.After(nextEvent.Sub(time.Now())): items, err := queue.Get(1) if err != nil { return err } if len(items) == 0 { continue } item, ok := items[0].(*Item) if !ok { return errors.New("queue contains illegal item") } select { case monitor.triggeredItems <- *item: item.increment() case <-time.After(time.Second * 10): } queue.Put(item) if nextEvent, err = nextEventTime(queue); err != nil { return err } } } return nil } func nextEventTime(q *queue.PriorityQueue) (time.Time, error) { qitem := q.Peek() item, ok := qitem.(*Item) if !ok { return time.Unix(0, 0), errors.New("non-item in priority queue") } return item.next, nil } func (monitor *Monitor) Close() error { return monitor.Stop() } func (monitor *Monitor) Stop() error { select { case monitor.newItems <- Item{}: case <-time.After(time.Second * 10): return errors.New("could not stop monitor") } ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() return monitor.server.Shutdown(ctx) }