Remove monitor server and instead use callback

master
Bel LaPointe 2018-10-09 08:16:18 -06:00
parent b5ff055505
commit 236ca1d603
4 changed files with 32 additions and 100 deletions

View File

@ -12,6 +12,10 @@ type Duration struct {
time.Duration time.Duration
} }
func (d *Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.Duration)
}
func (d *Duration) UnmarshalJSON(b []byte) error { func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{} var v interface{}
if err := json.Unmarshal(b, &v); err != nil { if err := json.Unmarshal(b, &v); err != nil {

View File

@ -1,6 +1,7 @@
package monitor package monitor
import ( import (
"encoding/json"
"testing" "testing"
"time" "time"
) )
@ -59,3 +60,15 @@ func Test_ItemCompare(t *testing.T) {
} }
} }
} }
func Test_ItemJSON(t *testing.T) {
item := NewItem("any", time.Minute)
var itemB Item
if b, err := json.Marshal(item); err != nil {
t.Fatalf("cannot marshal item: %v", err)
} else if err := json.Unmarshal(b, &itemB); err != nil {
t.Fatalf("cannot unmarshal item: %v", err)
} else if item.next = itemB.next; itemB != *item {
t.Fatalf("unmarshaled item does not match original: %v, expected %v", itemB, *item)
}
}

View File

@ -1,13 +1,8 @@
package monitor package monitor
import ( import (
"context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"local1/logger"
"net/http"
"time" "time"
"github.com/golang-collections/go-datastructures/queue" "github.com/golang-collections/go-datastructures/queue"
@ -17,7 +12,6 @@ type Monitor struct {
newItems chan Item newItems chan Item
trigger func(string) trigger func(string)
port string port string
server *http.Server
} }
func New(port string, trigger func(string)) (*Monitor, error) { func New(port string, trigger func(string)) (*Monitor, error) {
@ -29,58 +23,8 @@ func New(port string, trigger func(string)) (*Monitor, error) {
}, nil }, nil
} }
func (monitor *Monitor) listen() error {
monitor.server = &http.Server{
Addr: monitor.port,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/newfeed" {
http.NotFound(w, r)
logger.Logf("bad path: %q", r.URL.Path)
return
}
if r.Method != "PUT" && r.Method != "POST" {
http.NotFound(w, r)
logger.Logf("bad method: %q", r.Method)
return
}
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil || len(b) == 0 {
logger.Log(len(b), err)
w.WriteHeader(http.StatusBadRequest)
return
}
var item Item
if err := json.Unmarshal(b, &item); err != nil {
logger.Log(err)
w.WriteHeader(http.StatusBadRequest)
return
}
select {
case monitor.newItems <- item:
w.WriteHeader(http.StatusOK)
case <-time.After(time.Second * 10):
w.WriteHeader(http.StatusInternalServerError)
}
}),
}
if err := monitor.server.ListenAndServe(); err != http.ErrServerClosed && err != nil {
return err
}
return nil
}
func (monitor *Monitor) Start() error { func (monitor *Monitor) Start() error {
errs := make(chan error) errs := make(chan error)
go func() {
if err := monitor.listen(); err != nil {
select {
case errs <- err:
case <-time.After(time.Second * 5):
panic(err)
}
}
}()
go func() { go func() {
if err := monitor.loop(); err != nil { if err := monitor.loop(); err != nil {
select { select {
@ -92,7 +36,7 @@ func (monitor *Monitor) Start() error {
}() }()
select { select {
case err := <-errs: case err := <-errs:
return fmt.Errorf("%s: %v", "monitor server quit early", err) return fmt.Errorf("%s: %v", "monitor loop quit early", err)
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
} }
return nil return nil
@ -159,7 +103,5 @@ func (monitor *Monitor) Stop() error {
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
return errors.New("could not stop monitor") return errors.New("could not stop monitor")
} }
ctx, can := context.WithTimeout(context.Background(), time.Second*10) return nil
defer can()
return monitor.server.Shutdown(ctx)
} }

View File

@ -1,8 +1,6 @@
package monitor package monitor
import ( import (
"bytes"
"net/http"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -12,43 +10,10 @@ import (
const testmport = ":13152" const testmport = ":13152"
func Test_MonitorListen(t *testing.T) {
m, err := New(testmport, func(string) {})
if err != nil {
t.Fatalf("cannot create new monitor: %v", err)
}
if err := m.Start(); err != nil {
t.Fatalf("cannot start monitor: %v", err)
}
defer m.Stop()
if resp, err := http.Get("http://localhost" + testmport + "/mia"); err != nil {
t.Fatalf("GET error: %v", err)
} else if resp.StatusCode != http.StatusNotFound {
t.Errorf("GET /mia didn't 404: got %v", resp.StatusCode)
}
if resp, err := http.Get("http://localhost" + testmport + "/newfeed"); err != nil {
t.Fatalf("GET error: %v", err)
} else if resp.StatusCode != http.StatusNotFound {
t.Errorf("GET /newfeed didn't 404: got %v", resp.StatusCode)
}
if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(""))); err != nil {
t.Fatalf("POST error: %v", err)
} else if resp.StatusCode != http.StatusBadRequest {
t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode)
}
if resp, err := http.Post("http://localhost"+testmport+"/newfeed", "application/json", bytes.NewBuffer([]byte(`{"URL":"hello", "Interval":"5m"}`))); err != nil {
t.Fatalf("POST error: %v", err)
} else if resp.StatusCode != http.StatusOK {
t.Errorf("POST /newfeed didn't 200: got %v", resp.StatusCode)
}
}
func Test_Monitor(t *testing.T) { func Test_Monitor(t *testing.T) {
m, err := New(testmport, func(string) {}) numItems := 2
completed := make(chan struct{}, numItems)
m, err := New(testmport, func(string) { completed <- struct{}{} })
if err != nil { if err != nil {
t.Fatalf("cannot create new monitor: %v", err) t.Fatalf("cannot create new monitor: %v", err)
} }
@ -59,8 +24,8 @@ func Test_Monitor(t *testing.T) {
t.Fatalf("cannot start monitor: %v", err) t.Fatalf("cannot start monitor: %v", err)
} }
for i := 0; i < 2; i++ { for i := 0; i < numItems; i++ {
item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*10) item := NewItem("item"+strconv.Itoa(i), time.Second+time.Second*time.Duration(i)*1)
select { select {
case itemsNew <- *item: case itemsNew <- *item:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
@ -68,6 +33,14 @@ func Test_Monitor(t *testing.T) {
} }
} }
for i := 0; i < numItems; i++ {
select {
case <-completed:
case <-time.After(time.Second * 5):
t.Errorf("did not complete item %d", i)
}
}
if err := m.Stop(); err != nil { if err := m.Stop(); err != nil {
t.Fatalf("could not stop monitor: %v", err) t.Fatalf("could not stop monitor: %v", err)
} }