diff --git a/config/config.go b/config/config.go index 5e01198..49b139d 100644 --- a/config/config.go +++ b/config/config.go @@ -9,23 +9,27 @@ import ( const cdbpath = "DBPath" const port = "port" const mport = "mport" +const fport = "fport" type Config struct { DBPath string Port string MonitorPort string + FetchPort string } func New() *Config { lookups := make(map[string]*string) add(cdbpath, "./db", lookups) - add(port, "9101", lookups) - add(mport, "9102", lookups) + add(port, ":9101", lookups) + add(mport, ":9102", lookups) + add(fport, ":9103", lookups) flag.Parse() return &Config{ DBPath: *lookups[cdbpath], Port: *lookups[port], MonitorPort: *lookups[mport], + FetchPort: *lookups[fport], } } diff --git a/fetch/fetch.go b/fetch/fetch.go index e0df643..7a6aec3 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -1,19 +1,26 @@ package fetch import ( + "context" + "fmt" "io/ioutil" + "local1/logger" "net/http" + "time" ) type Fetch struct { process func(string, []byte) error client *http.Client + server *http.Server + port string } -func New(process func(string, []byte) error) (*Fetch, error) { +func New(port string, process func(string, []byte) error) (*Fetch, error) { return &Fetch{ process: process, client: &http.Client{}, + port: port, }, nil } @@ -36,3 +43,65 @@ func (fetcher *Fetch) FetchProcess(url string) error { } return nil } + +func (fetcher *Fetch) listen() error { + fetcher.server = &http.Server{ + Addr: fetcher.port, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/fetchfeed" { + http.NotFound(w, r) + logger.Logf("bad path: %q", r.URL.Path) + return + } + if r.Method != "POST" && r.Method != "PUT" { + http.NotFound(w, r) + logger.Logf("bad method: %q", r.Method) + return + } + b, err := ioutil.ReadAll(r.Body) + defer r.Body.Close() + if err != nil || len(b) == 0 { + logger.Log(len(b), err) + w.WriteHeader(http.StatusBadRequest) + return + } + if err := fetcher.FetchProcess(string(b)); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + }), + } + if err := fetcher.server.ListenAndServe(); err != http.ErrServerClosed && err != nil { + return err + } + return nil +} + +func (fetcher *Fetch) Start() error { + errs := make(chan error) + go func() { + if err := fetcher.listen(); err != nil { + select { + case errs <- err: + case <-time.After(time.Second * 10): + panic(err) + } + } + }() + select { + case err := <-errs: + return fmt.Errorf("%s: %v", "fetcher server quit early", err) + case <-time.After(time.Second * 10): + } + return nil +} + +func (fetcher *Fetch) Close() error { + return fetcher.Stop() +} + +func (fetcher *Fetch) Stop() error { + ctx, can := context.WithTimeout(context.Background(), time.Second*10) + defer can() + return fetcher.server.Shutdown(ctx) +} diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index ee12216..553fad1 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -7,6 +7,8 @@ import ( "testing" ) +const testfport = ":13153" + func Test_Fetch(t *testing.T) { s := mockRemote() defer s.Close() @@ -28,7 +30,7 @@ func Test_Fetch(t *testing.T) { }, } for _, c := range cases { - f, err := New(c.process) + f, err := New(testfport, c.process) if err != nil && err != c.err { t.Errorf("cannot create new fetcher: %v", err) } else if err == nil {