From 232fa86c61d3980dc4420b9cf5eacfd66e0ae751 Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Fri, 15 Mar 2019 11:42:31 -0600 Subject: [PATCH] Client scatter gathers by rw concern --- client/cli.go | 22 ++++++++++- client/client.go | 87 ++++++++++++++++++++++++++++++++++++++++--- client/client_test.go | 2 +- 3 files changed, 103 insertions(+), 8 deletions(-) diff --git a/client/cli.go b/client/cli.go index 6223826..b5396cf 100644 --- a/client/cli.go +++ b/client/cli.go @@ -5,14 +5,32 @@ import ( "fmt" "log" "os" + "strconv" "strings" ) func main() { - client, err := New(strings.Split(os.Getenv("ADDR"), ",")...) + rConcernS, ok := os.LookupEnv("RCONCERN") + if !ok { + rConcernS = "1" + } + wConcernS, ok := os.LookupEnv("WCONCERN") + if !ok { + wConcernS = "1" + } + rConcern, err := strconv.Atoi(rConcernS) if err != nil { panic(err) } + wConcern, err := strconv.Atoi(wConcernS) + if err != nil { + panic(err) + } + client, err := New(rConcern, wConcern, strings.Split(os.Getenv("ADDR"), ",")...) + if err != nil { + panic(err) + } + log.Printf("client: %v", client) reader := bufio.NewReader(os.Stdin) for { fmt.Print("> ") @@ -46,7 +64,7 @@ func get(client *Client, line string) { words := strings.Split(line, " ") key := words[1] v, err := client.Get(key) - log.Printf("set: %v: %s", err, v) + log.Printf("get: %v: %s", err, v) } func set(client *Client, line string) { diff --git a/client/client.go b/client/client.go index 0e17351..be025fb 100644 --- a/client/client.go +++ b/client/client.go @@ -3,6 +3,7 @@ package main import ( "bytes" "errors" + "fmt" "io/ioutil" "log" "net" @@ -14,7 +15,9 @@ import ( ) type Client struct { - hash *consistent.Consistent + hash *consistent.Consistent + wConcern int + rConcern int } type hasher struct{} @@ -33,7 +36,31 @@ func (na netAddr) Network() string { return "tcp" } -func New(addrs ...string) (*Client, error) { +type result struct { + b []byte + err error +} + +func (r result) String() string { + return fmt.Sprintf("%v : %v", r.err, r.b) +} + +func New(rConcern, wConcern int, addrs ...string) (*Client, error) { + if rConcern > len(addrs)-1 { + rConcern = len(addrs) - 1 + } + if wConcern > len(addrs)-1 { + wConcern = len(addrs) - 1 + } + if rConcern > wConcern { + rConcern = wConcern + } + if rConcern < 1 { + rConcern = 1 + } + if wConcern < 1 { + wConcern = 1 + } cfg := consistent.Config{ PartitionCount: 71, ReplicationFactor: 20, @@ -52,12 +79,55 @@ func New(addrs ...string) (*Client, error) { conn.Close() } return &Client{ - hash: hash, + hash: hash, + wConcern: wConcern, + rConcern: rConcern, }, nil } +func (c *Client) scatterGather(key string, forEach func(addr string, each chan result), try, need int) ([]byte, error) { + final := make(chan result) + each := make(chan result) + members, err := c.hash.GetClosestN([]byte(key), try) + if err != nil { + return nil, err + } + for _, member := range members { + go forEach(member.String(), each) + } + go func() { + out := make(map[string]int) + done := false + for i := 0; i < len(members); i++ { + one := <-each + if done { + continue + } + if _, ok := out[one.String()]; !ok { + out[one.String()] = 0 + } + out[one.String()] += 1 + if out[one.String()] >= need { + final <- one + done = true + } + } + if !done { + final <- result{err: errors.New("no consensus")} + } + }() + consensus := <-final + return consensus.b, consensus.err +} + func (c *Client) Get(key string) ([]byte, error) { - addr := c.hash.LocateKey([]byte(key)).String() + return c.scatterGather(key, func(addr string, each chan result) { + b, err := c.get(addr, key) + each <- result{b: b, err: err} + }, c.wConcern, c.rConcern) +} + +func (c *Client) get(addr, key string) ([]byte, error) { log.Printf("GET %s FROM %s", key, addr) resp, err := http.Get(addr + "/" + key) if err != nil { @@ -74,7 +144,14 @@ func (c *Client) Get(key string) ([]byte, error) { } func (c *Client) Set(key string, value []byte) error { - addr := c.hash.LocateKey([]byte(key)).String() + _, err := c.scatterGather(key, func(addr string, each chan result) { + err := c.set(addr, key, value) + each <- result{err: err} + }, c.wConcern, c.rConcern) + return err +} + +func (c *Client) set(addr, key string, value []byte) error { log.Printf("SET %s FROM %s", key, addr) r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value)) if err != nil { diff --git a/client/client_test.go b/client/client_test.go index daad964..5d25ab8 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -23,7 +23,7 @@ func TestAll(t *testing.T) { go s.Run() } - client, err := New(addresses...) + client, err := New(1, 1, addresses...) if err != nil { t.Fatalf("cannot make client: %v", err) }