DynamoDB/client/client.go

170 lines
3.4 KiB
Go

package main
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
)
type Client struct {
hash *consistent.Consistent
wConcern int
rConcern int
}
type hasher struct{}
func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
type netAddr string
func (na netAddr) String() string {
return string(na)
}
func (na netAddr) Network() string {
return "tcp"
}
type result struct {
b []byte
err error
}
func (r result) String() string {
return fmt.Sprintf("%v : %v", r.err, r.b)
}
func New(rConcern, wConcern int, addrs ...string) (*Client, error) {
if rConcern > len(addrs)-1 {
rConcern = len(addrs) - 1
}
if wConcern > len(addrs)-1 {
wConcern = len(addrs) - 1
}
if rConcern > wConcern {
rConcern = wConcern
}
if rConcern < 1 {
rConcern = 1
}
if wConcern < 1 {
wConcern = 1
}
cfg := consistent.Config{
PartitionCount: 71,
ReplicationFactor: 20,
Load: 1.25,
Hasher: hasher{},
}
hash := consistent.New(nil, cfg)
for _, addr := range addrs {
hash.Add(netAddr(addr))
}
for _, addr := range hash.GetMembers() {
conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.String(), "http://"), "https://"))
if err != nil {
return nil, err
}
conn.Close()
}
return &Client{
hash: hash,
wConcern: wConcern,
rConcern: rConcern,
}, nil
}
func (c *Client) scatterGather(key string, forEach func(addr string, each chan result), try, need int) ([]byte, error) {
final := make(chan result)
each := make(chan result)
members, err := c.hash.GetClosestN([]byte(key), try)
if err != nil {
return nil, err
}
for _, member := range members {
go forEach(member.String(), each)
}
go func() {
out := make(map[string]int)
done := false
for i := 0; i < len(members); i++ {
one := <-each
if done {
continue
}
if _, ok := out[one.String()]; !ok {
out[one.String()] = 0
}
out[one.String()] += 1
if out[one.String()] >= need {
final <- one
done = true
}
}
if !done {
final <- result{err: errors.New("no consensus")}
}
}()
consensus := <-final
return consensus.b, consensus.err
}
func (c *Client) Get(key string) ([]byte, error) {
return c.scatterGather(key, func(addr string, each chan result) {
b, err := c.get(addr, key)
each <- result{b: b, err: err}
}, c.wConcern, c.rConcern)
}
func (c *Client) get(addr, key string) ([]byte, error) {
log.Printf("GET %s FROM %s", key, addr)
resp, err := http.Get(addr + "/" + key)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, errors.New("not found")
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("bad status on get")
}
return ioutil.ReadAll(resp.Body)
}
func (c *Client) Set(key string, value []byte) error {
_, err := c.scatterGather(key, func(addr string, each chan result) {
err := c.set(addr, key, value)
each <- result{err: err}
}, c.wConcern, c.rConcern)
return err
}
func (c *Client) set(addr, key string, value []byte) error {
log.Printf("SET %s FROM %s", key, addr)
r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value))
if err != nil {
return err
}
resp, err := (&http.Client{}).Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("bad status on set")
}
return nil
}