From c5b6ad08e4f601dc244504533f4137203bca8c4f Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Thu, 14 Mar 2019 15:20:23 -0600 Subject: [PATCH] Implement simple client --- client/client.go | 89 ++++++++++++++++++++++++++++++++++++++++++ client/client_test.go | 58 +++++++++++++++++++++++++++ server/serve/new.go | 4 +- server/serve/routes.go | 4 +- server/serve/server.go | 2 +- 5 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 client/client.go create mode 100644 client/client_test.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..442c256 --- /dev/null +++ b/client/client.go @@ -0,0 +1,89 @@ +package client + +import ( + "bytes" + "errors" + "io/ioutil" + "net" + "net/http" + "strings" + + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash" +) + +type Client struct { + hash *consistent.Consistent +} + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +type netAddr string + +func (na netAddr) String() string { + return string(na) +} + +func (na netAddr) Network() string { + return "tcp" +} + +func New(addrs ...string) (*Client, error) { + cfg := consistent.Config{ + PartitionCount: 71, + ReplicationFactor: 20, + Load: 1.25, + Hasher: hasher{}, + } + hash := consistent.New(nil, cfg) + for _, addr := range addrs { + hash.Add(netAddr(addr)) + } + for _, addr := range hash.GetMembers() { + conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.String(), "http://"), "https://")) + if err != nil { + return nil, err + } + conn.Close() + } + return &Client{ + hash: hash, + }, nil +} + +func (c *Client) Get(key string) ([]byte, error) { + addr := c.hash.LocateKey([]byte(key)).String() + resp, err := http.Get(addr + "/" + key) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + if resp.StatusCode != http.StatusOK { + return nil, errors.New("bad status on get") + } + return ioutil.ReadAll(resp.Body) +} + +func (c *Client) Set(key string, value []byte) error { + addr := c.hash.LocateKey([]byte(key)).String() + r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value)) + if err != nil { + return err + } + resp, err := (&http.Client{}).Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return errors.New("bad status on set") + } + return nil +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..a1389f6 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,58 @@ +package client + +import ( + "local/dynamodb/server/config" + "local/dynamodb/server/serve" + "net/http/httptest" + "os" + "strings" + "testing" +) + +func TestAll(t *testing.T) { + validKey := "key" + validValue := "value" + + servers := []*serve.Server{} + addresses := []string{} + for i := 0; i < 10; i++ { + s := newServer(t) + servers = append(servers, s) + addresses = append(addresses, "http://localhost"+s.Port) + go s.Run() + } + + client, err := New(addresses...) + if err != nil { + t.Fatalf("cannot make client: %v", err) + } + + if err := client.Set(validKey, []byte(validValue)); err != nil { + t.Fatalf("cannot set with client: %v", err) + } + + if v, err := client.Get(validKey); err != nil { + t.Fatalf("cannot get with client: %v", err) + } else if string(v) != validValue { + t.Fatalf("wrong get with client: got %q, want %q", v, validValue) + } +} + +func newServer(t *testing.T) *serve.Server { + os.Setenv("DB", "MAP") + os.Setenv("PORT", getPort()) + if err := config.New(); err != nil { + t.Fatal(err) + } + s := serve.New() + if err := s.Routes(); err != nil { + t.Fatal(err) + } + return s +} + +func getPort() string { + s := httptest.NewServer(nil) + s.Close() + return s.URL[strings.LastIndex(s.URL, ":"):] +} diff --git a/server/serve/new.go b/server/serve/new.go index 12d6446..0a47913 100644 --- a/server/serve/new.go +++ b/server/serve/new.go @@ -7,14 +7,14 @@ import ( ) type Server struct { - port string + Port string router *router.Router } func New() *Server { config := config.Values() return &Server{ - port: ":" + strings.TrimPrefix(config.Port, ":"), + Port: ":" + strings.TrimPrefix(config.Port, ":"), router: router.New(), } } diff --git a/server/serve/routes.go b/server/serve/routes.go index a91d7cc..47cacd1 100644 --- a/server/serve/routes.go +++ b/server/serve/routes.go @@ -32,14 +32,14 @@ func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) { func (s *Server) get(w http.ResponseWriter, r *http.Request) { config := config.Values() key := strings.Split(r.URL.Path, "/")[1] - if v, err := config.DB.Get(key); err == storage.ErrNotFound { + if value, err := config.DB.Get(key); err == storage.ErrNotFound { w.WriteHeader(http.StatusNotFound) return } else if err != nil { w.WriteHeader(http.StatusInternalServerError) return } else { - w.Write(v) + w.Write(value) } } diff --git a/server/serve/server.go b/server/serve/server.go index 7e2a66a..aa2e661 100644 --- a/server/serve/server.go +++ b/server/serve/server.go @@ -5,7 +5,7 @@ import ( ) func (s *Server) Run() error { - return http.ListenAndServe(s.port, s) + return http.ListenAndServe(s.Port, s) } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {