Compare commits
10 Commits
1f679f4c06
...
c70508b3fb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c70508b3fb | ||
|
|
b1466a3e48 | ||
|
|
f912272912 | ||
|
|
aa1fdd367d | ||
|
|
73423064a8 | ||
|
|
b6baf90dc4 | ||
|
|
232fa86c61 | ||
|
|
648ce19313 | ||
|
|
b7771dee17 | ||
|
|
c5b6ad08e4 |
72
client/cli.go
Normal file
72
client/cli.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"local/args"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func main() {
|
||||
as := args.NewArgSet()
|
||||
as.Append(args.STRING, "addr", "csv key;addr(s) of server(s)", "key;http://localhost:21412")
|
||||
as.Append(args.INT, "r", "read concern", 1)
|
||||
as.Append(args.INT, "w", "write concern", 1)
|
||||
if err := as.Parse(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rConcern := as.Get("r").GetInt()
|
||||
wConcern := as.Get("w").GetInt()
|
||||
addrs := as.Get("addr").GetString()
|
||||
client, err := New(rConcern, wConcern, strings.Split(addrs, ",")...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
for {
|
||||
fmt.Print("> ")
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
do(client, strings.TrimSpace(line))
|
||||
}
|
||||
}
|
||||
|
||||
func do(client *Client, line string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Printf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
switch strings.ToLower(strings.Split(line, " ")[0]) {
|
||||
case "help":
|
||||
log.Printf("get key1")
|
||||
log.Printf("set key1 value value value...")
|
||||
case "get":
|
||||
get(client, line)
|
||||
case "set":
|
||||
set(client, line)
|
||||
case "put":
|
||||
set(client, line)
|
||||
default:
|
||||
log.Printf("unknown command %v", line)
|
||||
}
|
||||
}
|
||||
|
||||
func get(client *Client, line string) {
|
||||
words := strings.Split(line, " ")
|
||||
key := words[1]
|
||||
v, err := client.Get(key)
|
||||
log.Printf("get: %v: %s", err, v)
|
||||
}
|
||||
|
||||
func set(client *Client, line string) {
|
||||
words := strings.Split(line, " ")
|
||||
key := words[1]
|
||||
value := strings.Join(words[2:], " ")
|
||||
err := client.Set(key, []byte(value))
|
||||
log.Printf("set: %v", err)
|
||||
}
|
||||
195
client/client.go
Normal file
195
client/client.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/buraksezer/consistent"
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
hash *consistent.Consistent
|
||||
wConcern int
|
||||
rConcern int
|
||||
}
|
||||
|
||||
type hasher struct{}
|
||||
|
||||
func (h hasher) Sum64(data []byte) uint64 {
|
||||
return xxhash.Sum64(data)
|
||||
}
|
||||
|
||||
type netAddr string
|
||||
|
||||
func (na netAddr) String() string {
|
||||
return strings.Split(string(na), ";")[0]
|
||||
}
|
||||
|
||||
func (na netAddr) Addr() string {
|
||||
ind := strings.Index(string(na), ";")
|
||||
if ind < 0 {
|
||||
ind = 0
|
||||
} else {
|
||||
ind += 1
|
||||
}
|
||||
return string(na)[ind:]
|
||||
}
|
||||
|
||||
func (na netAddr) Network() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
type result struct {
|
||||
b []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func (r result) String() string {
|
||||
return fmt.Sprintf("%v : %v", r.err, r.b)
|
||||
}
|
||||
|
||||
func New(rConcern, wConcern int, addrs ...string) (*Client, error) {
|
||||
if rConcern > len(addrs)-1 {
|
||||
rConcern = len(addrs) - 1
|
||||
}
|
||||
if wConcern > len(addrs)-1 {
|
||||
wConcern = len(addrs) - 1
|
||||
}
|
||||
if rConcern > wConcern {
|
||||
rConcern = wConcern
|
||||
}
|
||||
if rConcern < 1 {
|
||||
rConcern = 1
|
||||
}
|
||||
if wConcern < 1 {
|
||||
wConcern = 1
|
||||
}
|
||||
cfg := consistent.Config{
|
||||
PartitionCount: 71,
|
||||
ReplicationFactor: 20,
|
||||
Load: 1.25,
|
||||
Hasher: hasher{},
|
||||
}
|
||||
hash := consistent.New(nil, cfg)
|
||||
for _, addr := range addrs {
|
||||
hash.Add(netAddr(addr))
|
||||
}
|
||||
for _, addr := range hash.GetMembers() {
|
||||
conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.(netAddr).Addr(), "http://"), "https://"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
return &Client{
|
||||
hash: hash,
|
||||
wConcern: wConcern,
|
||||
rConcern: rConcern,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) scatterGather(key string, forEach func(addr string, each chan result), try, need int) ([]byte, error) {
|
||||
final := make(chan result)
|
||||
each := make(chan result)
|
||||
var members []consistent.Member
|
||||
var err error
|
||||
if try > 1 {
|
||||
members, err = c.hash.GetClosestN([]byte(key), try)
|
||||
} else {
|
||||
members = append(members, c.hash.LocateKey([]byte(key)))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, member := range members {
|
||||
go forEach(member.(netAddr).Addr(), each)
|
||||
}
|
||||
go func() {
|
||||
out := make(map[string]int)
|
||||
done := false
|
||||
for i := 0; i < len(members); i++ {
|
||||
var one result
|
||||
select {
|
||||
case <-time.After(time.Second * 10):
|
||||
continue
|
||||
case one = <-each:
|
||||
}
|
||||
if done {
|
||||
continue
|
||||
}
|
||||
if _, ok := out[one.String()]; !ok {
|
||||
out[one.String()] = 0
|
||||
}
|
||||
out[one.String()] += 1
|
||||
if out[one.String()] >= need {
|
||||
final <- one
|
||||
done = true
|
||||
}
|
||||
}
|
||||
close(each)
|
||||
if !done {
|
||||
final <- result{err: errors.New("no consensus")}
|
||||
}
|
||||
close(final)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(time.Second * 10):
|
||||
return nil, errors.New("timeout on consensus")
|
||||
case consensus := <-final:
|
||||
return consensus.b, consensus.err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Get(key string) ([]byte, error) {
|
||||
return c.scatterGather(key, func(addr string, each chan result) {
|
||||
b, err := c.get(addr, key)
|
||||
each <- result{b: b, err: err}
|
||||
}, c.wConcern, c.rConcern)
|
||||
}
|
||||
|
||||
func (c *Client) get(addr, key string) ([]byte, error) {
|
||||
resp, err := http.Get(addr + "/" + key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, errors.New("bad status on get")
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
return b, err
|
||||
}
|
||||
|
||||
func (c *Client) Set(key string, value []byte) error {
|
||||
_, err := c.scatterGather(key, func(addr string, each chan result) {
|
||||
err := c.set(addr, key, value)
|
||||
each <- result{err: err}
|
||||
}, c.wConcern, c.rConcern)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) set(addr, key string, value []byte) error {
|
||||
r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := (&http.Client{}).Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.New("bad status on set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
61
client/client_test.go
Normal file
61
client/client_test.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"local/dynamodb/server/config"
|
||||
"local/dynamodb/server/serve"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAll(t *testing.T) {
|
||||
validKey := "key"
|
||||
validValue := "value"
|
||||
|
||||
addresses := []string{}
|
||||
for i := 0; i < 10; i++ {
|
||||
s := newServer(t)
|
||||
addresses = append(addresses, "http://localhost"+s.Port)
|
||||
go s.Run()
|
||||
}
|
||||
|
||||
client, err := New(1, 1, addresses...)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot make client: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
key := fmt.Sprintf("%s-%d", validKey, i)
|
||||
value := validValue + string([]byte{byte(i) + 'a'})
|
||||
if err := client.Set(key, []byte(value)); err != nil {
|
||||
t.Fatalf("cannot set with client: %v", err)
|
||||
}
|
||||
|
||||
if v, err := client.Get(key); err != nil {
|
||||
t.Fatalf("cannot get with client: %v", err)
|
||||
} else if string(v) != value {
|
||||
t.Fatalf("wrong get with client: got %q, want %q", v, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newServer(t *testing.T) *serve.Server {
|
||||
os.Setenv("DB", "MAP")
|
||||
os.Setenv("PORT", getPort())
|
||||
if err := config.New(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := serve.New()
|
||||
if err := s.Routes(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func getPort() string {
|
||||
s := httptest.NewServer(nil)
|
||||
s.Close()
|
||||
return s.URL[strings.LastIndex(s.URL, ":"):]
|
||||
}
|
||||
9
notes/consistent_and_namespaces
Normal file
9
notes/consistent_and_namespaces
Normal file
@@ -0,0 +1,9 @@
|
||||
a key goes to a consistent hash to determine a partition id. Look up the addr which holds that partition ID.
|
||||
end:1 key -> client
|
||||
client:1 key -> 1+ partitionID
|
||||
client:1 partitionID -> serverAddr
|
||||
client:partition,key,serverAddr -> server
|
||||
x server:partition,key -> value
|
||||
1+ server:value -> client
|
||||
client:1+ value -> 1 end:value
|
||||
|
||||
@@ -10,12 +10,13 @@ var config Config
|
||||
var lock = &sync.RWMutex{}
|
||||
|
||||
type Config struct {
|
||||
db string
|
||||
DB storage.DB
|
||||
Port string
|
||||
Addr string
|
||||
Username string
|
||||
Password string
|
||||
db string
|
||||
DB storage.DB
|
||||
Port string
|
||||
Addr string
|
||||
Username string
|
||||
Password string
|
||||
DefaultNamespace string
|
||||
}
|
||||
|
||||
func Values() Config {
|
||||
|
||||
@@ -1,18 +1,31 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"local/args"
|
||||
"local/storage"
|
||||
"os"
|
||||
)
|
||||
|
||||
func New() error {
|
||||
config = Config{
|
||||
db: orEnv(storage.MAP.String(), "DB", "DATABASE"),
|
||||
Addr: orEnv("", "ADDR", "FILE"),
|
||||
Username: orEnv("", "USER", "USERNAME"),
|
||||
Password: orEnv("", "PASS", "PASSWORD"),
|
||||
Port: orEnv("21412", "PORT", "LISTEN"),
|
||||
as := args.NewArgSet()
|
||||
as.Append(args.STRING, "addr", "address/path to database/file", "")
|
||||
as.Append(args.STRING, "user", "username to database", "")
|
||||
as.Append(args.STRING, "pass", "password to database", "")
|
||||
as.Append(args.STRING, "port", "port to listen on", "21412")
|
||||
as.Append(args.STRING, "db", "database type code", storage.MAP.String())
|
||||
as.Append(args.STRING, "ns", "namespace", storage.DefaultNamespace)
|
||||
if err := as.Parse(); err != nil {
|
||||
return err
|
||||
}
|
||||
config = Config{
|
||||
db: as.Get("db").GetString(),
|
||||
Addr: as.Get("addr").GetString(),
|
||||
Username: as.Get("user").GetString(),
|
||||
Password: as.Get("pass").GetString(),
|
||||
Port: as.Get("port").GetString(),
|
||||
DefaultNamespace: as.Get("ns").GetString(),
|
||||
}
|
||||
storage.DefaultNamespace = config.DefaultNamespace
|
||||
DB, err := storage.New(storage.TypeFromString(config.db), config.Addr, config.Username, config.Password)
|
||||
config.DB = DB
|
||||
return err
|
||||
|
||||
@@ -2,19 +2,19 @@ package serve
|
||||
|
||||
import (
|
||||
"local/dynamodb/server/config"
|
||||
"local/s2sa/s2sa/server/router"
|
||||
"local/router"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
port string
|
||||
Port string
|
||||
router *router.Router
|
||||
}
|
||||
|
||||
func New() *Server {
|
||||
config := config.Values()
|
||||
return &Server{
|
||||
port: ":" + strings.TrimPrefix(config.Port, ":"),
|
||||
Port: ":" + strings.TrimPrefix(config.Port, ":"),
|
||||
router: router.New(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,20 +3,30 @@ package serve
|
||||
import (
|
||||
"io/ioutil"
|
||||
"local/dynamodb/server/config"
|
||||
"local/s2sa/s2sa/server/router"
|
||||
"local/router"
|
||||
"local/storage"
|
||||
"log"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (s *Server) Routes() error {
|
||||
if err := s.router.Add("/"+router.Wildcard, s.CatchAll); err != nil {
|
||||
if err := s.router.Add("/"+router.Wildcard, s.SimpleCatchAll); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.router.Add("/"+router.Wildcard+"/"+router.Wildcard, s.NSCatchAll); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) {
|
||||
func (s *Server) SimpleCatchAll(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path = path.Join("/"+storage.DefaultNamespace, r.URL.Path)
|
||||
s.NSCatchAll(w, r)
|
||||
}
|
||||
|
||||
func (s *Server) NSCatchAll(w http.ResponseWriter, r *http.Request) {
|
||||
foo := http.NotFound
|
||||
switch strings.ToLower(r.Method) {
|
||||
case "get":
|
||||
@@ -30,22 +40,31 @@ func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (s *Server) get(w http.ResponseWriter, r *http.Request) {
|
||||
config := config.Values()
|
||||
key := strings.Split(r.URL.Path, "/")[1]
|
||||
if v, err := config.DB.Get(key); err == storage.ErrNotFound {
|
||||
db := config.Values().DB
|
||||
ns := strings.Split(r.URL.Path, "/")[1]
|
||||
var key string
|
||||
if len(strings.Split(r.URL.Path, "/")) < 3 {
|
||||
key = ns
|
||||
ns = ""
|
||||
} else {
|
||||
key = strings.Split(r.URL.Path, "/")[2]
|
||||
}
|
||||
if value, err := db.Get(key, ns); err == storage.ErrNotFound {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Println(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
} else {
|
||||
w.Write(v)
|
||||
w.Write(value)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) put(w http.ResponseWriter, r *http.Request) {
|
||||
config := config.Values()
|
||||
key := strings.Split(r.URL.Path, "/")[1]
|
||||
db := config.Values().DB
|
||||
ns := strings.Split(r.URL.Path, "/")[1]
|
||||
key := strings.Split(r.URL.Path, "/")[2]
|
||||
if r == nil || r.Body == nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
@@ -55,7 +74,8 @@ func (s *Server) put(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if err := config.DB.Set(key, value); err != nil {
|
||||
if err := db.Set(key, value, ns); err != nil {
|
||||
log.Println(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
var validNS = "ns"
|
||||
var validKey = "key"
|
||||
var validValue = "value"
|
||||
|
||||
@@ -112,7 +113,7 @@ func TestPutGet(t *testing.T) {
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
r, err := http.NewRequest("PUT", "/"+validKey, strings.NewReader(validValue))
|
||||
r, err := http.NewRequest("PUT", "/"+validNS+"/"+validKey, strings.NewReader(validValue))
|
||||
if err != nil {
|
||||
t.Fatalf("err making put request: %v", err)
|
||||
}
|
||||
@@ -122,7 +123,17 @@ func TestPutGet(t *testing.T) {
|
||||
}
|
||||
|
||||
w = httptest.NewRecorder()
|
||||
r, err = http.NewRequest("GET", "/"+validKey, nil)
|
||||
r, err = http.NewRequest("GET", "/not_"+validNS+"/"+validKey, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err making get request: %v", err)
|
||||
}
|
||||
s.get(w, r)
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("err status on bad get: %v", w.Code)
|
||||
}
|
||||
|
||||
w = httptest.NewRecorder()
|
||||
r, err = http.NewRequest("GET", "/"+validNS+"/"+validKey, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err making get request: %v", err)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) Run() error {
|
||||
return http.ListenAndServe(s.port, s)
|
||||
return http.ListenAndServe(s.Port, s)
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
Reference in New Issue
Block a user