Compare commits

..

10 Commits

Author SHA1 Message Date
Bel LaPointe
c70508b3fb Error logging and support no namespace 2019-06-19 12:02:01 -06:00
Bel LaPointe
b1466a3e48 I got it. Hash shouldnt be tied to addr. 2019-04-24 14:03:13 -06:00
Bel LaPointe
f912272912 enable single servers with cli/client 2019-04-23 18:55:48 -06:00
Bel LaPointe
aa1fdd367d nevermind on ns lookup, because thats just configsvr on mongo 2019-04-23 14:41:38 -06:00
Bel LaPointe
73423064a8 Implement optional namespace for server 2019-03-20 10:00:19 -06:00
Bel LaPointe
b6baf90dc4 nevermind unittest multi servers because config global shares the db 2019-03-15 12:54:31 -06:00
Bel LaPointe
232fa86c61 Client scatter gathers by rw concern 2019-03-15 11:42:31 -06:00
Bel LaPointe
648ce19313 works for r=1 w=1 2019-03-15 09:45:25 -06:00
Bel LaPointe
b7771dee17 Add multiple keys to client test 2019-03-15 09:33:00 -06:00
Bel LaPointe
c5b6ad08e4 Implement simple client 2019-03-14 15:20:23 -06:00
10 changed files with 410 additions and 28 deletions

72
client/cli.go Normal file
View File

@@ -0,0 +1,72 @@
package main
import (
"bufio"
"fmt"
"local/args"
"log"
"os"
"strings"
)
func main() {
as := args.NewArgSet()
as.Append(args.STRING, "addr", "csv key;addr(s) of server(s)", "key;http://localhost:21412")
as.Append(args.INT, "r", "read concern", 1)
as.Append(args.INT, "w", "write concern", 1)
if err := as.Parse(); err != nil {
panic(err)
}
rConcern := as.Get("r").GetInt()
wConcern := as.Get("w").GetInt()
addrs := as.Get("addr").GetString()
client, err := New(rConcern, wConcern, strings.Split(addrs, ",")...)
if err != nil {
panic(err)
}
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
line, err := reader.ReadString('\n')
if err != nil {
panic(err)
}
do(client, strings.TrimSpace(line))
}
}
func do(client *Client, line string) {
defer func() {
if err := recover(); err != nil {
log.Printf("err: %v", err)
}
}()
switch strings.ToLower(strings.Split(line, " ")[0]) {
case "help":
log.Printf("get key1")
log.Printf("set key1 value value value...")
case "get":
get(client, line)
case "set":
set(client, line)
case "put":
set(client, line)
default:
log.Printf("unknown command %v", line)
}
}
func get(client *Client, line string) {
words := strings.Split(line, " ")
key := words[1]
v, err := client.Get(key)
log.Printf("get: %v: %s", err, v)
}
func set(client *Client, line string) {
words := strings.Split(line, " ")
key := words[1]
value := strings.Join(words[2:], " ")
err := client.Set(key, []byte(value))
log.Printf("set: %v", err)
}

195
client/client.go Normal file
View File

@@ -0,0 +1,195 @@
package main
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"time"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
)
type Client struct {
hash *consistent.Consistent
wConcern int
rConcern int
}
type hasher struct{}
func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
type netAddr string
func (na netAddr) String() string {
return strings.Split(string(na), ";")[0]
}
func (na netAddr) Addr() string {
ind := strings.Index(string(na), ";")
if ind < 0 {
ind = 0
} else {
ind += 1
}
return string(na)[ind:]
}
func (na netAddr) Network() string {
return "tcp"
}
type result struct {
b []byte
err error
}
func (r result) String() string {
return fmt.Sprintf("%v : %v", r.err, r.b)
}
func New(rConcern, wConcern int, addrs ...string) (*Client, error) {
if rConcern > len(addrs)-1 {
rConcern = len(addrs) - 1
}
if wConcern > len(addrs)-1 {
wConcern = len(addrs) - 1
}
if rConcern > wConcern {
rConcern = wConcern
}
if rConcern < 1 {
rConcern = 1
}
if wConcern < 1 {
wConcern = 1
}
cfg := consistent.Config{
PartitionCount: 71,
ReplicationFactor: 20,
Load: 1.25,
Hasher: hasher{},
}
hash := consistent.New(nil, cfg)
for _, addr := range addrs {
hash.Add(netAddr(addr))
}
for _, addr := range hash.GetMembers() {
conn, err := net.Dial("tcp", strings.TrimPrefix(strings.TrimPrefix(addr.(netAddr).Addr(), "http://"), "https://"))
if err != nil {
return nil, err
}
conn.Close()
}
return &Client{
hash: hash,
wConcern: wConcern,
rConcern: rConcern,
}, nil
}
func (c *Client) scatterGather(key string, forEach func(addr string, each chan result), try, need int) ([]byte, error) {
final := make(chan result)
each := make(chan result)
var members []consistent.Member
var err error
if try > 1 {
members, err = c.hash.GetClosestN([]byte(key), try)
} else {
members = append(members, c.hash.LocateKey([]byte(key)))
}
if err != nil {
return nil, err
}
for _, member := range members {
go forEach(member.(netAddr).Addr(), each)
}
go func() {
out := make(map[string]int)
done := false
for i := 0; i < len(members); i++ {
var one result
select {
case <-time.After(time.Second * 10):
continue
case one = <-each:
}
if done {
continue
}
if _, ok := out[one.String()]; !ok {
out[one.String()] = 0
}
out[one.String()] += 1
if out[one.String()] >= need {
final <- one
done = true
}
}
close(each)
if !done {
final <- result{err: errors.New("no consensus")}
}
close(final)
}()
select {
case <-time.After(time.Second * 10):
return nil, errors.New("timeout on consensus")
case consensus := <-final:
return consensus.b, consensus.err
}
}
func (c *Client) Get(key string) ([]byte, error) {
return c.scatterGather(key, func(addr string, each chan result) {
b, err := c.get(addr, key)
each <- result{b: b, err: err}
}, c.wConcern, c.rConcern)
}
func (c *Client) get(addr, key string) ([]byte, error) {
resp, err := http.Get(addr + "/" + key)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, errors.New("not found")
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("bad status on get")
}
b, err := ioutil.ReadAll(resp.Body)
return b, err
}
func (c *Client) Set(key string, value []byte) error {
_, err := c.scatterGather(key, func(addr string, each chan result) {
err := c.set(addr, key, value)
each <- result{err: err}
}, c.wConcern, c.rConcern)
return err
}
func (c *Client) set(addr, key string, value []byte) error {
r, err := http.NewRequest("PUT", addr+"/"+key, bytes.NewBuffer(value))
if err != nil {
return err
}
resp, err := (&http.Client{}).Do(r)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("bad status on set")
}
return nil
}

61
client/client_test.go Normal file
View File

@@ -0,0 +1,61 @@
package main
import (
"fmt"
"local/dynamodb/server/config"
"local/dynamodb/server/serve"
"net/http/httptest"
"os"
"strings"
"testing"
)
func TestAll(t *testing.T) {
validKey := "key"
validValue := "value"
addresses := []string{}
for i := 0; i < 10; i++ {
s := newServer(t)
addresses = append(addresses, "http://localhost"+s.Port)
go s.Run()
}
client, err := New(1, 1, addresses...)
if err != nil {
t.Fatalf("cannot make client: %v", err)
}
for i := 0; i < 5; i++ {
key := fmt.Sprintf("%s-%d", validKey, i)
value := validValue + string([]byte{byte(i) + 'a'})
if err := client.Set(key, []byte(value)); err != nil {
t.Fatalf("cannot set with client: %v", err)
}
if v, err := client.Get(key); err != nil {
t.Fatalf("cannot get with client: %v", err)
} else if string(v) != value {
t.Fatalf("wrong get with client: got %q, want %q", v, value)
}
}
}
func newServer(t *testing.T) *serve.Server {
os.Setenv("DB", "MAP")
os.Setenv("PORT", getPort())
if err := config.New(); err != nil {
t.Fatal(err)
}
s := serve.New()
if err := s.Routes(); err != nil {
t.Fatal(err)
}
return s
}
func getPort() string {
s := httptest.NewServer(nil)
s.Close()
return s.URL[strings.LastIndex(s.URL, ":"):]
}

View File

@@ -0,0 +1,9 @@
a key goes to a consistent hash to determine a partition id. Look up the addr which holds that partition ID.
end:1 key -> client
client:1 key -> 1+ partitionID
client:1 partitionID -> serverAddr
client:partition,key,serverAddr -> server
x server:partition,key -> value
1+ server:value -> client
client:1+ value -> 1 end:value

View File

@@ -10,12 +10,13 @@ var config Config
var lock = &sync.RWMutex{} var lock = &sync.RWMutex{}
type Config struct { type Config struct {
db string db string
DB storage.DB DB storage.DB
Port string Port string
Addr string Addr string
Username string Username string
Password string Password string
DefaultNamespace string
} }
func Values() Config { func Values() Config {

View File

@@ -1,18 +1,31 @@
package config package config
import ( import (
"local/args"
"local/storage" "local/storage"
"os" "os"
) )
func New() error { func New() error {
config = Config{ as := args.NewArgSet()
db: orEnv(storage.MAP.String(), "DB", "DATABASE"), as.Append(args.STRING, "addr", "address/path to database/file", "")
Addr: orEnv("", "ADDR", "FILE"), as.Append(args.STRING, "user", "username to database", "")
Username: orEnv("", "USER", "USERNAME"), as.Append(args.STRING, "pass", "password to database", "")
Password: orEnv("", "PASS", "PASSWORD"), as.Append(args.STRING, "port", "port to listen on", "21412")
Port: orEnv("21412", "PORT", "LISTEN"), as.Append(args.STRING, "db", "database type code", storage.MAP.String())
as.Append(args.STRING, "ns", "namespace", storage.DefaultNamespace)
if err := as.Parse(); err != nil {
return err
} }
config = Config{
db: as.Get("db").GetString(),
Addr: as.Get("addr").GetString(),
Username: as.Get("user").GetString(),
Password: as.Get("pass").GetString(),
Port: as.Get("port").GetString(),
DefaultNamespace: as.Get("ns").GetString(),
}
storage.DefaultNamespace = config.DefaultNamespace
DB, err := storage.New(storage.TypeFromString(config.db), config.Addr, config.Username, config.Password) DB, err := storage.New(storage.TypeFromString(config.db), config.Addr, config.Username, config.Password)
config.DB = DB config.DB = DB
return err return err

View File

@@ -2,19 +2,19 @@ package serve
import ( import (
"local/dynamodb/server/config" "local/dynamodb/server/config"
"local/s2sa/s2sa/server/router" "local/router"
"strings" "strings"
) )
type Server struct { type Server struct {
port string Port string
router *router.Router router *router.Router
} }
func New() *Server { func New() *Server {
config := config.Values() config := config.Values()
return &Server{ return &Server{
port: ":" + strings.TrimPrefix(config.Port, ":"), Port: ":" + strings.TrimPrefix(config.Port, ":"),
router: router.New(), router: router.New(),
} }
} }

View File

@@ -3,20 +3,30 @@ package serve
import ( import (
"io/ioutil" "io/ioutil"
"local/dynamodb/server/config" "local/dynamodb/server/config"
"local/s2sa/s2sa/server/router" "local/router"
"local/storage" "local/storage"
"log"
"net/http" "net/http"
"path"
"strings" "strings"
) )
func (s *Server) Routes() error { func (s *Server) Routes() error {
if err := s.router.Add("/"+router.Wildcard, s.CatchAll); err != nil { if err := s.router.Add("/"+router.Wildcard, s.SimpleCatchAll); err != nil {
return err
}
if err := s.router.Add("/"+router.Wildcard+"/"+router.Wildcard, s.NSCatchAll); err != nil {
return err return err
} }
return nil return nil
} }
func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) { func (s *Server) SimpleCatchAll(w http.ResponseWriter, r *http.Request) {
r.URL.Path = path.Join("/"+storage.DefaultNamespace, r.URL.Path)
s.NSCatchAll(w, r)
}
func (s *Server) NSCatchAll(w http.ResponseWriter, r *http.Request) {
foo := http.NotFound foo := http.NotFound
switch strings.ToLower(r.Method) { switch strings.ToLower(r.Method) {
case "get": case "get":
@@ -30,22 +40,31 @@ func (s *Server) CatchAll(w http.ResponseWriter, r *http.Request) {
} }
func (s *Server) get(w http.ResponseWriter, r *http.Request) { func (s *Server) get(w http.ResponseWriter, r *http.Request) {
config := config.Values() db := config.Values().DB
key := strings.Split(r.URL.Path, "/")[1] ns := strings.Split(r.URL.Path, "/")[1]
if v, err := config.DB.Get(key); err == storage.ErrNotFound { var key string
if len(strings.Split(r.URL.Path, "/")) < 3 {
key = ns
ns = ""
} else {
key = strings.Split(r.URL.Path, "/")[2]
}
if value, err := db.Get(key, ns); err == storage.ErrNotFound {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} else if err != nil { } else if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} else { } else {
w.Write(v) w.Write(value)
} }
} }
func (s *Server) put(w http.ResponseWriter, r *http.Request) { func (s *Server) put(w http.ResponseWriter, r *http.Request) {
config := config.Values() db := config.Values().DB
key := strings.Split(r.URL.Path, "/")[1] ns := strings.Split(r.URL.Path, "/")[1]
key := strings.Split(r.URL.Path, "/")[2]
if r == nil || r.Body == nil { if r == nil || r.Body == nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
@@ -55,7 +74,8 @@ func (s *Server) put(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
if err := config.DB.Set(key, value); err != nil { if err := db.Set(key, value, ns); err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }

View File

@@ -10,6 +10,7 @@ import (
"testing" "testing"
) )
var validNS = "ns"
var validKey = "key" var validKey = "key"
var validValue = "value" var validValue = "value"
@@ -112,7 +113,7 @@ func TestPutGet(t *testing.T) {
} }
w := httptest.NewRecorder() w := httptest.NewRecorder()
r, err := http.NewRequest("PUT", "/"+validKey, strings.NewReader(validValue)) r, err := http.NewRequest("PUT", "/"+validNS+"/"+validKey, strings.NewReader(validValue))
if err != nil { if err != nil {
t.Fatalf("err making put request: %v", err) t.Fatalf("err making put request: %v", err)
} }
@@ -122,7 +123,17 @@ func TestPutGet(t *testing.T) {
} }
w = httptest.NewRecorder() w = httptest.NewRecorder()
r, err = http.NewRequest("GET", "/"+validKey, nil) r, err = http.NewRequest("GET", "/not_"+validNS+"/"+validKey, nil)
if err != nil {
t.Fatalf("err making get request: %v", err)
}
s.get(w, r)
if w.Code != http.StatusNotFound {
t.Fatalf("err status on bad get: %v", w.Code)
}
w = httptest.NewRecorder()
r, err = http.NewRequest("GET", "/"+validNS+"/"+validKey, nil)
if err != nil { if err != nil {
t.Fatalf("err making get request: %v", err) t.Fatalf("err making get request: %v", err)
} }

View File

@@ -5,7 +5,7 @@ import (
) )
func (s *Server) Run() error { func (s *Server) Run() error {
return http.ListenAndServe(s.port, s) return http.ListenAndServe(s.Port, s)
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {