package main import ( "bytes" "errors" "fmt" "io/ioutil" "net" "net/http" "strings" "github.com/buraksezer/consistent" "github.com/cespare/xxhash" ) type Client struct { hash *consistent.Consistent wConcern int rConcern int } type hasher struct{} func (h hasher) Sum64(data []byte) uint64 { return xxhash.Sum64(data) } type netAddr string func (na netAddr) String() string { return string(na) } func (na netAddr) Network() string { return "tcp" } type result struct { b []byte err error } func (r result) String() string { return fmt.Sprintf("%v : %v", r.err, r.b) } func New(rConcern, wConcern int, addrs ...string) (*Client, error) { if rConcern > len(addrs)-1 { rConcern = len(addrs) - 1 } if wConcern > len(addrs)-1 { wConcern = len(addrs) - 1 } if rConcern > wConcern { rConcern = wConcern } if rConcern < 1 { rConcern = 1 } if wConcern < 1 { wConcern = 1 } cfg := consistent.Config{ PartitionCount: 71, ReplicationFactor: 20, Load: 1.25, Hasher: hasher{}, } hash := consistent.New(nil, cfg) for _, addr := range addrs { hash.Add(netAddr(addr)) } for _, addr := range hash.GetMembers() { conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.String(), "http://"), "https://")) if err != nil { return nil, err } conn.Close() } return &Client{ hash: hash, wConcern: wConcern, rConcern: rConcern, }, nil } func (c *Client) scatterGather(key string, forEach func(addr string, each chan result), try, need int) ([]byte, error) { final := make(chan result) each := make(chan result) members, err := c.hash.GetClosestN([]byte(key), try) if err != nil { return nil, err } for _, member := range members { go forEach(member.String(), each) } go func() { out := make(map[string]int) done := false for i := 0; i < len(members); i++ { one := <-each if done { continue } if _, ok := out[one.String()]; !ok { out[one.String()] = 0 } out[one.String()] += 1 if out[one.String()] >= need { final <- one done = true } } if !done { final <- result{err: errors.New("no consensus")} } }() consensus := <-final return consensus.b, consensus.err } func (c *Client) Get(key string) ([]byte, error) { return c.scatterGather(key, func(addr string, each chan result) { b, err := c.get(addr, key) each <- result{b: b, err: err} }, c.wConcern, c.rConcern) } func (c *Client) get(addr, key string) ([]byte, error) { resp, err := http.Get(addr + "/" + key) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { return nil, errors.New("not found") } if resp.StatusCode != http.StatusOK { return nil, errors.New("bad status on get") } b, err := ioutil.ReadAll(resp.Body) return b, err } func (c *Client) Set(key string, value []byte) error { _, err := c.scatterGather(key, func(addr string, each chan result) { err := c.set(addr, key, value) each <- result{err: err} }, c.wConcern, c.rConcern) return err } func (c *Client) set(addr, key string, value []byte) error { r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value)) if err != nil { return err } resp, err := (&http.Client{}).Do(r) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errors.New("bad status on set") } return nil }