Implement simple client
parent
1f679f4c06
commit
c5b6ad08e4
|
|
@ -0,0 +1,89 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/buraksezer/consistent"
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
hash *consistent.Consistent
|
||||
}
|
||||
|
||||
type hasher struct{}
|
||||
|
||||
func (h hasher) Sum64(data []byte) uint64 {
|
||||
return xxhash.Sum64(data)
|
||||
}
|
||||
|
||||
type netAddr string
|
||||
|
||||
func (na netAddr) String() string {
|
||||
return string(na)
|
||||
}
|
||||
|
||||
func (na netAddr) Network() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
func New(addrs ...string) (*Client, error) {
|
||||
cfg := consistent.Config{
|
||||
PartitionCount: 71,
|
||||
ReplicationFactor: 20,
|
||||
Load: 1.25,
|
||||
Hasher: hasher{},
|
||||
}
|
||||
hash := consistent.New(nil, cfg)
|
||||
for _, addr := range addrs {
|
||||
hash.Add(netAddr(addr))
|
||||
}
|
||||
for _, addr := range hash.GetMembers() {
|
||||
conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.String(), "http://"), "https://"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
return &Client{
|
||||
hash: hash,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Get(key string) ([]byte, error) {
|
||||
addr := c.hash.LocateKey([]byte(key)).String()
|
||||
resp, err := http.Get(addr + "/" + key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, errors.New("bad status on get")
|
||||
}
|
||||
return ioutil.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
func (c *Client) Set(key string, value []byte) error {
|
||||
addr := c.hash.LocateKey([]byte(key)).String()
|
||||
r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := (&http.Client{}).Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.New("bad status on set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"local/dynamodb/server/config"
|
||||
"local/dynamodb/server/serve"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAll(t *testing.T) {
|
||||
validKey := "key"
|
||||
validValue := "value"
|
||||
|
||||
servers := []*serve.Server{}
|
||||
addresses := []string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
s := newServer(t)
|
||||
servers = append(servers, s)
|
||||
addresses = append(addresses, "http://localhost"+s.Port)
|
||||
go s.Run()
|
||||
}
|
||||
|
||||
client, err := New(addresses...)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot make client: %v", err)
|
||||
}
|
||||
|
||||
if err := client.Set(validKey, []byte(validValue)); err != nil {
|
||||
t.Fatalf("cannot set with client: %v", err)
|
||||
}
|
||||
|
||||
if v, err := client.Get(validKey); err != nil {
|
||||
t.Fatalf("cannot get with client: %v", err)
|
||||
} else if string(v) != validValue {
|
||||
t.Fatalf("wrong get with client: got %q, want %q", v, validValue)
|
||||
}
|
||||
}
|
||||
|
||||
func newServer(t *testing.T) *serve.Server {
|
||||
os.Setenv("DB", "MAP")
|
||||
os.Setenv("PORT", getPort())
|
||||
if err := config.New(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := serve.New()
|
||||
if err := s.Routes(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func getPort() string {
|
||||
s := httptest.NewServer(nil)
|
||||
s.Close()
|
||||
return s.URL[strings.LastIndex(s.URL, ":"):]
|
||||
}
|
||||
|
|
@ -7,14 +7,14 @@ import (
|
|||
)
|
||||
|
||||
type Server struct {
|
||||
port string
|
||||
Port string
|
||||
router *router.Router
|
||||
}
|
||||
|
||||
func New() *Server {
|
||||
config := config.Values()
|
||||
return &Server{
|
||||
port: ":" + strings.TrimPrefix(config.Port, ":"),
|
||||
Port: ":" + strings.TrimPrefix(config.Port, ":"),
|
||||
router: router.New(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,14 +32,14 @@ func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) {
|
|||
func (s *Server) get(w http.ResponseWriter, r *http.Request) {
|
||||
config := config.Values()
|
||||
key := strings.Split(r.URL.Path, "/")[1]
|
||||
if v, err := config.DB.Get(key); err == storage.ErrNotFound {
|
||||
if value, err := config.DB.Get(key); err == storage.ErrNotFound {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
} else {
|
||||
w.Write(v)
|
||||
w.Write(value)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
)
|
||||
|
||||
func (s *Server) Run() error {
|
||||
return http.ListenAndServe(s.port, s)
|
||||
return http.ListenAndServe(s.Port, s)
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue