Convert monitor to http server

master
Bel LaPointe 2018-10-08 19:34:34 -06:00
parent 1c3ff9a8d2
commit 4946e53b57
4 changed files with 176 additions and 28 deletions

View File

@ -1,21 +1,48 @@
package monitor package monitor
import ( import (
"encoding/json"
"errors"
"time" "time"
"github.com/golang-collections/go-datastructures/queue" "github.com/golang-collections/go-datastructures/queue"
) )
type Duration struct {
time.Duration
}
func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
d.Duration = time.Duration(value)
return nil
case string:
var err error
d.Duration, err = time.ParseDuration(value)
if err != nil {
return err
}
return nil
default:
return errors.New("invalid duration")
}
}
type Item struct { type Item struct {
url string URL string
interval time.Duration Interval Duration
next time.Time next time.Time
} }
func NewItem(url string, interval time.Duration) *Item { func NewItem(URL string, Interval time.Duration) *Item {
return &Item{ return &Item{
url: url, URL: URL,
interval: interval, Interval: Duration{Interval},
next: time.Now(), next: time.Now(),
} }
} }
@ -30,15 +57,15 @@ func (item *Item) Compare(other queue.Item) int {
} else if item.next.After(j.next) { } else if item.next.After(j.next) {
return 1 return 1
} }
if item.interval < j.interval { if item.Interval.Duration < j.Interval.Duration {
return -1 return -1
} else if item.interval > j.interval { } else if item.Interval.Duration > j.Interval.Duration {
return 1 return 1
} }
return 0 return 0
} }
func (item *Item) increment() *Item { func (item *Item) increment() *Item {
item.next = time.Now().Add(item.interval) item.next = time.Now().Add(item.Interval.Duration)
return item return item
} }

View File

@ -53,7 +53,7 @@ func Test_ItemCompare(t *testing.T) {
}, },
} }
for _, c := range cases { for _, c := range cases {
comparison := (&Item{next: c.nexts[0], interval: c.inter[0]}).Compare(&Item{next: c.nexts[1], interval: c.inter[1]}) comparison := (&Item{next: c.nexts[0], Interval: Duration{c.inter[0]}}).Compare(&Item{next: c.nexts[1], Interval: Duration{c.inter[1]}})
if comparison != c.result { if comparison != c.result {
t.Errorf("failed to compare %v: got %v, expected %v", c.nexts, comparison, c.result) t.Errorf("failed to compare %v: got %v, expected %v", c.nexts, comparison, c.result)
} }

View File

@ -1,7 +1,13 @@
package monitor package monitor
import ( import (
"context"
"encoding/json"
"errors" "errors"
"fmt"
"io/ioutil"
"local1/logger"
"net/http"
"time" "time"
"github.com/golang-collections/go-datastructures/queue" "github.com/golang-collections/go-datastructures/queue"
@ -10,16 +16,90 @@ import (
type Monitor struct { type Monitor struct {
newItems chan Item newItems chan Item
triggeredItems chan Item triggeredItems chan Item
port string
server *http.Server
} }
func New(newItems, triggeredItems chan Item) (*Monitor, error) { func New(port string) (*Monitor, error) {
newItems := make(chan Item)
triggeredItems := make(chan Item)
return &Monitor{ return &Monitor{
newItems: newItems, newItems: newItems,
triggeredItems: triggeredItems, triggeredItems: triggeredItems,
port: port,
}, nil }, nil
} }
func (monitor *Monitor) listen() error {
monitor.server = &http.Server{
Addr: monitor.port,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/newfeed" {
http.NotFound(w, r)
logger.Logf("bad path: %q", r.URL.Path)
return
}
if r.Method != "PUT" && r.Method != "POST" {
http.NotFound(w, r)
logger.Logf("bad method: %q", r.Method)
return
}
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil || len(b) == 0 {
logger.Log(len(b), err)
w.WriteHeader(http.StatusBadRequest)
return
}
var item Item
if err := json.Unmarshal(b, &item); err != nil {
logger.Log(err)
w.WriteHeader(http.StatusBadRequest)
return
}
select {
case monitor.newItems <- item:
w.WriteHeader(http.StatusOK)
case <-time.After(time.Second * 10):
w.WriteHeader(http.StatusInternalServerError)
}
}),
}
if err := monitor.server.ListenAndServe(); err != http.ErrServerClosed && err != nil {
return err
}
return nil
}
func (monitor *Monitor) Start() error { func (monitor *Monitor) Start() error {
errs := make(chan error)
go func() {
if err := monitor.listen(); err != nil {
select {
case errs <- err:
case <-time.After(time.Second * 10):
panic(err)
}
}
}()
go func() {
if err := monitor.loop(); err != nil {
select {
case errs <- err:
case <-time.After(time.Second * 10):
panic(err)
}
}
}()
select {
case err := <-errs:
return fmt.Errorf("%s: %v", "monitor server quit early", err)
case <-time.After(time.Second * 5):
}
return nil
}
func (monitor *Monitor) loop() error {
queue := queue.NewPriorityQueue(1) queue := queue.NewPriorityQueue(1)
active := make(map[string]*Item) active := make(map[string]*Item)
nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC) nextEvent := time.Date(2099, time.January, 1, 1, 1, 1, 1, time.UTC)
@ -29,7 +109,7 @@ func (monitor *Monitor) Start() error {
if newItem == (Item{}) { if newItem == (Item{}) {
return nil return nil
} }
if oldItem, ok := active[newItem.url]; ok { if oldItem, ok := active[newItem.URL]; ok {
*oldItem = newItem *oldItem = newItem
} else { } else {
queue.Put(&newItem) queue.Put(&newItem)
@ -73,11 +153,17 @@ func nextEventTime(q *queue.PriorityQueue) (time.Time, error) {
return item.next, nil return item.next, nil
} }
func (monitor *Monitor) Close() error {
return monitor.Stop()
}
func (monitor *Monitor) Stop() error { func (monitor *Monitor) Stop() error {
select { select {
case monitor.newItems <- Item{}: case monitor.newItems <- Item{}:
return nil
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
return errors.New("could not stop monitor")
} }
return errors.New("could not stop monitor") ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
return monitor.server.Shutdown(ctx)
} }

View File

@ -1,6 +1,8 @@
package monitor package monitor
import ( import (
"bytes"
"net/http"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -8,29 +10,62 @@ import (
"github.com/golang-collections/go-datastructures/queue" "github.com/golang-collections/go-datastructures/queue"
) )
func Test_Monitor(t *testing.T) { const testmport = ":13152"
itemsNew := make(chan Item, 1)
itemsDone := make(chan Item, 1) func Test_MonitorListen(t *testing.T) {
m, err := New(itemsNew, itemsDone) m, err := New(testmport)
if err != nil { if err != nil {
t.Fatalf("cannot create new monitor: %v", err) t.Fatalf("cannot create new monitor: %v", err)
} }
if err := m.Start(); err != nil {
t.Fatalf("cannot start monitor: %v", err)
}
defer m.Stop()
errs := make(chan error) if resp, err := http.Get("http://localhost" + testmport + "/mia"); err != nil {
go func() { t.Fatalf("GET error: %v", err)
errs <- m.Start() } else if resp.StatusCode != http.StatusNotFound {
}() t.Errorf("GET /mia didn't 404: got %v", resp.StatusCode)
select { }
case err := <-errs:
t.Fatalf("monitor stopped early: %v", err) if resp, err := http.Get("http://localhost" + testmport + "/newfeed"); err != nil {
case <-time.After(time.Second * 1): t.Fatalf("GET error: %v", err)
} else if resp.StatusCode != http.StatusNotFound {
t.Errorf("GET /newfeed didn't 404: got %v", resp.StatusCode)
}
if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(""))); err != nil {
t.Fatalf("POST error: %v", err)
} else if resp.StatusCode != http.StatusBadRequest {
t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode)
}
if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(`{"URL":"hello", "Interval":"5m"}`))); err != nil {
t.Fatalf("POST error: %v", err)
} else if resp.StatusCode != http.StatusOK {
t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode)
}
}
func Test_Monitor(t *testing.T) {
m, err := New(testmport)
if err != nil {
t.Fatalf("cannot create new monitor: %v", err)
}
itemsNew := make(chan Item, 1)
itemsDone := make(chan Item, 1)
m.newItems = itemsNew
m.triggeredItems = itemsDone
if err := m.Start(); err != nil {
t.Fatalf("cannot start monitor: %v", err)
} }
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10) item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10)
select { select {
case itemsNew <- *item: case itemsNew <- *item:
case <-time.After(time.Second * 1): case <-time.After(time.Second * 5):
t.Fatalf("could not add new item in time limit") t.Fatalf("could not add new item in time limit")
} }
} }
@ -38,10 +73,10 @@ func Test_Monitor(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
select { select {
case triggered := <-itemsDone: case triggered := <-itemsDone:
if triggered.url != "item"+strconv.Itoa(i) { if triggered.URL != "item"+strconv.Itoa(i) {
t.Fatalf("wrong item done order: %d was %v", i, triggered) t.Fatalf("wrong item done order: %d was %v", i, triggered)
} }
case <-time.After(time.Second * 3): case <-time.After(time.Second * 5):
t.Fatalf("could not get done item in time limit") t.Fatalf("could not get done item in time limit")
} }
} }