Create fetch http server
parent
27479df8c7
commit
1c3ff9a8d2
|
|
@ -9,23 +9,27 @@ import (
|
||||||
const cdbpath = "DBPath"
|
const cdbpath = "DBPath"
|
||||||
const port = "port"
|
const port = "port"
|
||||||
const mport = "mport"
|
const mport = "mport"
|
||||||
|
const fport = "fport"
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
DBPath string
|
DBPath string
|
||||||
Port string
|
Port string
|
||||||
MonitorPort string
|
MonitorPort string
|
||||||
|
FetchPort string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *Config {
|
func New() *Config {
|
||||||
lookups := make(map[string]*string)
|
lookups := make(map[string]*string)
|
||||||
add(cdbpath, "./db", lookups)
|
add(cdbpath, "./db", lookups)
|
||||||
add(port, "9101", lookups)
|
add(port, ":9101", lookups)
|
||||||
add(mport, "9102", lookups)
|
add(mport, ":9102", lookups)
|
||||||
|
add(fport, ":9103", lookups)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
return &Config{
|
return &Config{
|
||||||
DBPath: *lookups[cdbpath],
|
DBPath: *lookups[cdbpath],
|
||||||
Port: *lookups[port],
|
Port: *lookups[port],
|
||||||
MonitorPort: *lookups[mport],
|
MonitorPort: *lookups[mport],
|
||||||
|
FetchPort: *lookups[fport],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,26 @@
|
||||||
package fetch
|
package fetch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"local1/logger"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Fetch struct {
|
type Fetch struct {
|
||||||
process func(string, []byte) error
|
process func(string, []byte) error
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
server *http.Server
|
||||||
|
port string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(process func(string, []byte) error) (*Fetch, error) {
|
func New(port string, process func(string, []byte) error) (*Fetch, error) {
|
||||||
return &Fetch{
|
return &Fetch{
|
||||||
process: process,
|
process: process,
|
||||||
client: &http.Client{},
|
client: &http.Client{},
|
||||||
|
port: port,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -36,3 +43,65 @@ func (fetcher *Fetch) FetchProcess(url string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fetcher *Fetch) listen() error {
|
||||||
|
fetcher.server = &http.Server{
|
||||||
|
Addr: fetcher.port,
|
||||||
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/fetchfeed" {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
logger.Logf("bad path: %q", r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r.Method != "POST" && r.Method != "PUT" {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
logger.Logf("bad method: %q", r.Method)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b, err := ioutil.ReadAll(r.Body)
|
||||||
|
defer r.Body.Close()
|
||||||
|
if err != nil || len(b) == 0 {
|
||||||
|
logger.Log(len(b), err)
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := fetcher.FetchProcess(string(b)); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
if err := fetcher.server.ListenAndServe(); err != http.ErrServerClosed && err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *Fetch) Start() error {
|
||||||
|
errs := make(chan error)
|
||||||
|
go func() {
|
||||||
|
if err := fetcher.listen(); err != nil {
|
||||||
|
select {
|
||||||
|
case errs <- err:
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case err := <-errs:
|
||||||
|
return fmt.Errorf("%s: %v", "fetcher server quit early", err)
|
||||||
|
case <-time.After(time.Second * 10):
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *Fetch) Close() error {
|
||||||
|
return fetcher.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *Fetch) Stop() error {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer can()
|
||||||
|
return fetcher.server.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const testfport = ":13153"
|
||||||
|
|
||||||
func Test_Fetch(t *testing.T) {
|
func Test_Fetch(t *testing.T) {
|
||||||
s := mockRemote()
|
s := mockRemote()
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
@ -28,7 +30,7 @@ func Test_Fetch(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
f, err := New(c.process)
|
f, err := New(testfport, c.process)
|
||||||
if err != nil && err != c.err {
|
if err != nil && err != c.err {
|
||||||
t.Errorf("cannot create new fetcher: %v", err)
|
t.Errorf("cannot create new fetcher: %v", err)
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue