Implement namespace optional arg
parent
b79cb97ba6
commit
b7a231feaf
10
bolt.go
10
bolt.go
|
|
@ -13,10 +13,11 @@ func NewBolt(path string) (*Bolt, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Bolt) Get(key string) ([]byte, error) {
|
func (b *Bolt) Get(key string, ns ...string) ([]byte, error) {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
var result []byte
|
var result []byte
|
||||||
err := b.db.View(func(tx *bolt.Tx) error {
|
err := b.db.View(func(tx *bolt.Tx) error {
|
||||||
bkt := tx.Bucket([]byte(DefaultNamespace))
|
bkt := tx.Bucket([]byte(namespace))
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
@ -29,9 +30,10 @@ func (b *Bolt) Get(key string) ([]byte, error) {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Bolt) Set(key string, value []byte) error {
|
func (b *Bolt) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
return b.db.Update(func(tx *bolt.Tx) error {
|
return b.db.Update(func(tx *bolt.Tx) error {
|
||||||
bkt, err := tx.CreateBucketIfNotExists([]byte(DefaultNamespace))
|
bkt, err := tx.CreateBucketIfNotExists([]byte(namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
11
cache.go
11
cache.go
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cache "github.com/patrickmn/go-cache"
|
cache "github.com/patrickmn/go-cache"
|
||||||
|
|
@ -24,8 +25,9 @@ func NewCache(path ...string) (*Cache, error) {
|
||||||
return c, err
|
return c, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache) Get(key string) ([]byte, error) {
|
func (c *Cache) Get(key string, ns ...string) ([]byte, error) {
|
||||||
v, ok := c.db.Get(key)
|
namespace := resolveNamespace(ns)
|
||||||
|
v, ok := c.db.Get(path.Join(namespace, key))
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
@ -36,8 +38,9 @@ func (c *Cache) Get(key string) ([]byte, error) {
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache) Set(key string, value []byte) error {
|
func (c *Cache) Set(key string, value []byte, ns ...string) error {
|
||||||
c.db.Set(key, value, 0)
|
namespace := resolveNamespace(ns)
|
||||||
|
c.db.Set(path.Join(namespace, key), value, 0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
12
db.go
12
db.go
|
|
@ -5,8 +5,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type DB interface {
|
type DB interface {
|
||||||
Get(string) ([]byte, error)
|
Get(string, ...string) ([]byte, error)
|
||||||
Set(string, []byte) error
|
Set(string, []byte, ...string) error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -38,3 +38,11 @@ func New(key Type, params ...string) (db DB, err error) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func resolveNamespace(ns []string) string {
|
||||||
|
namespace := DefaultNamespace
|
||||||
|
if len(ns) > 0 {
|
||||||
|
namespace = ns[0]
|
||||||
|
}
|
||||||
|
return namespace
|
||||||
|
}
|
||||||
|
|
|
||||||
16
db_test.go
16
db_test.go
|
|
@ -15,16 +15,18 @@ type mock struct {
|
||||||
m map[string][]byte
|
m map[string][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mock *mock) Get(key string) ([]byte, error) {
|
func (mock *mock) Get(key string, ns ...string) ([]byte, error) {
|
||||||
v, ok := mock.m[key]
|
namespace := resolveNamespace(ns)
|
||||||
|
v, ok := mock.m[path.Join(namespace, key)]
|
||||||
if ok {
|
if ok {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mock *mock) Set(key string, value []byte) error {
|
func (mock *mock) Set(key string, value []byte, ns ...string) error {
|
||||||
mock.m[key] = value
|
namespace := resolveNamespace(ns)
|
||||||
|
mock.m[path.Join(namespace, key)] = value
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,6 +120,12 @@ func TestImplementations(t *testing.T) {
|
||||||
cases = append(cases, memcacheCluster)
|
cases = append(cases, memcacheCluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if minio, err := NewMinio("localhost:9000", "accesskey", "secretkey"); err != nil {
|
||||||
|
t.Errorf("cannot make minio: %v", err)
|
||||||
|
} else {
|
||||||
|
cases = append(cases, minio)
|
||||||
|
}
|
||||||
|
|
||||||
validKey := "key"
|
validKey := "key"
|
||||||
validValue := []byte("value")
|
validValue := []byte("value")
|
||||||
|
|
||||||
|
|
|
||||||
12
leveldb.go
12
leveldb.go
|
|
@ -1,6 +1,8 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
|
|
@ -19,14 +21,15 @@ func NewLevelDB(path string) (*LevelDB, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldb *LevelDB) Get(key string) ([]byte, error) {
|
func (ldb *LevelDB) Get(key string, ns ...string) ([]byte, error) {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
snapshot, err := ldb.db.GetSnapshot()
|
snapshot, err := ldb.db.GetSnapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer snapshot.Release()
|
defer snapshot.Release()
|
||||||
|
|
||||||
v, err := snapshot.Get([]byte(key), nil)
|
v, err := snapshot.Get([]byte(path.Join(namespace, key)), nil)
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
err = ErrNotFound
|
err = ErrNotFound
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
|
@ -36,9 +39,10 @@ func (ldb *LevelDB) Get(key string) ([]byte, error) {
|
||||||
return v, err
|
return v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ldb *LevelDB) Set(key string, value []byte) error {
|
func (ldb *LevelDB) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
batch := &leveldb.Batch{}
|
batch := &leveldb.Batch{}
|
||||||
batch.Put([]byte(key), value)
|
batch.Put([]byte(path.Join(namespace, key)), value)
|
||||||
return ldb.db.Write(batch, nil)
|
return ldb.db.Write(batch, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
30
map.go
30
map.go
|
|
@ -4,10 +4,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Map map[string][]byte
|
type Map map[string]map[string][]byte
|
||||||
|
|
||||||
func NewMap() *Map {
|
func NewMap() *Map {
|
||||||
m := make(map[string][]byte)
|
m := make(map[string]map[string][]byte)
|
||||||
n := Map(m)
|
n := Map(m)
|
||||||
return &n
|
return &n
|
||||||
}
|
}
|
||||||
|
|
@ -28,20 +28,30 @@ func (m *Map) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Map) Get(key string) ([]byte, error) {
|
func (m *Map) Get(key string, ns ...string) ([]byte, error) {
|
||||||
if _, ok := (*m)[key]; !ok {
|
namespace := resolveNamespace(ns)
|
||||||
|
if _, ok := (*m)[namespace]; !ok {
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
return (*m)[key], nil
|
if _, ok := (*m)[namespace][key]; !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
return (*m)[namespace][key], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Map) Set(key string, value []byte) error {
|
func (m *Map) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
if value == nil {
|
if value == nil {
|
||||||
if _, ok := (*m)[key]; ok {
|
if _, ok := (*m)[namespace]; !ok {
|
||||||
delete(*m, key)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
|
} else if _, ok := (*m)[namespace][key]; ok {
|
||||||
|
delete((*m)[namespace], key)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if _, ok := (*m)[namespace]; !ok {
|
||||||
|
(*m)[namespace] = make(map[string][]byte)
|
||||||
|
}
|
||||||
|
(*m)[namespace][key] = value
|
||||||
}
|
}
|
||||||
(*m)[key] = value
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
24
memcache.go
24
memcache.go
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/bradfitz/gomemcache/memcache"
|
"github.com/bradfitz/gomemcache/memcache"
|
||||||
)
|
)
|
||||||
|
|
@ -24,8 +25,13 @@ func (a *netAddr) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMemcache(addr string, addrs ...string) (*Memcache, error) {
|
func NewMemcache(addr string, addrs ...string) (*Memcache, error) {
|
||||||
|
for i := len(addrs) - 1; i >= 0; i-- {
|
||||||
|
if len(addrs[i]) == 0 {
|
||||||
|
addrs = append(addrs[:i], addrs[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
ss := &memcache.ServerList{}
|
ss := &memcache.ServerList{}
|
||||||
if err := ss.SetServers(append(addrs, addr)...); err != nil {
|
if err := ss.SetServers(append([]string{addr}, addrs...)...); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := ss.Each(func(addr net.Addr) error {
|
if err := ss.Each(func(addr net.Addr) error {
|
||||||
|
|
@ -41,14 +47,22 @@ func NewMemcache(addr string, addrs ...string) (*Memcache, error) {
|
||||||
return &Memcache{db: db}, nil
|
return &Memcache{db: db}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *Memcache) Get(key string) ([]byte, error) {
|
func (mc *Memcache) Get(key string, ns ...string) ([]byte, error) {
|
||||||
v, err := mc.db.Get(key)
|
namespace := resolveNamespace(ns)
|
||||||
|
v, err := mc.db.Get(path.Join(namespace, key))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if v == nil {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
return v.Value, err
|
return v.Value, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *Memcache) Set(key string, value []byte) error {
|
func (mc *Memcache) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
return mc.db.Set(&memcache.Item{
|
return mc.db.Set(&memcache.Item{
|
||||||
Key: key,
|
Key: path.Join(namespace, key),
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/bradfitz/gomemcache/memcache"
|
"github.com/bradfitz/gomemcache/memcache"
|
||||||
"github.com/buraksezer/consistent"
|
"github.com/buraksezer/consistent"
|
||||||
|
|
@ -68,14 +69,16 @@ func NewMemcacheCluster(addr string, addrs ...string) (*MemcacheCluster, error)
|
||||||
return &MemcacheCluster{db: db}, nil
|
return &MemcacheCluster{db: db}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MemcacheCluster) Get(key string) ([]byte, error) {
|
func (mc *MemcacheCluster) Get(key string, ns ...string) ([]byte, error) {
|
||||||
v, err := mc.db.Get(key)
|
namespace := resolveNamespace(ns)
|
||||||
|
v, err := mc.db.Get(path.Join(namespace, key))
|
||||||
return v.Value, err
|
return v.Value, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MemcacheCluster) Set(key string, value []byte) error {
|
func (mc *MemcacheCluster) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
return mc.db.Set(&memcache.Item{
|
return mc.db.Set(&memcache.Item{
|
||||||
Key: key,
|
Key: path.Join(namespace, key),
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
10
mongo.go
10
mongo.go
|
|
@ -53,10 +53,11 @@ func NewMongo(addr string, auth ...string) (*Mongo, error) {
|
||||||
return &Mongo{db: db}, nil
|
return &Mongo{db: db}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mg *Mongo) Get(key string) ([]byte, error) {
|
func (mg *Mongo) Get(key string, ns ...string) ([]byte, error) {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
collection := mg.db.Database(DefaultNamespace).Collection(DefaultNamespace)
|
collection := mg.db.Database(DefaultNamespace).Collection(namespace)
|
||||||
filter := bson.M{"_id": key}
|
filter := bson.M{"_id": key}
|
||||||
cursor, err := collection.Find(ctx, filter)
|
cursor, err := collection.Find(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -82,10 +83,11 @@ func (mg *Mongo) Get(key string) ([]byte, error) {
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mg *Mongo) Set(key string, value []byte) error {
|
func (mg *Mongo) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
collection := mg.db.Database(DefaultNamespace).Collection(DefaultNamespace)
|
collection := mg.db.Database(DefaultNamespace).Collection(namespace)
|
||||||
filter := bson.M{"_id": key}
|
filter := bson.M{"_id": key}
|
||||||
document := bson.M{"value": value}
|
document := bson.M{"value": value}
|
||||||
_, err := collection.ReplaceOne(ctx, filter, document, options.Replace().SetUpsert(true))
|
_, err := collection.ReplaceOne(ctx, filter, document, options.Replace().SetUpsert(true))
|
||||||
|
|
|
||||||
10
riak.go
10
riak.go
|
|
@ -28,11 +28,12 @@ func NewRiak(addr string, addrs ...string) (*Riak, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Riak) Get(key string) ([]byte, error) {
|
func (r *Riak) Get(key string, ns ...string) ([]byte, error) {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
obj := &riak.Object{}
|
obj := &riak.Object{}
|
||||||
|
|
||||||
cmd, err := riak.NewFetchValueCommandBuilder().
|
cmd, err := riak.NewFetchValueCommandBuilder().
|
||||||
WithBucket(DefaultNamespace).
|
WithBucket(namespace).
|
||||||
WithKey(key).
|
WithKey(key).
|
||||||
Build()
|
Build()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -43,13 +44,14 @@ func (r *Riak) Get(key string) ([]byte, error) {
|
||||||
return obj.Value, err
|
return obj.Value, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Riak) Set(key string, value []byte) error {
|
func (r *Riak) Set(key string, value []byte, ns ...string) error {
|
||||||
|
namespace := resolveNamespace(ns)
|
||||||
obj := &riak.Object{
|
obj := &riak.Object{
|
||||||
Value: value,
|
Value: value,
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd, err := riak.NewStoreValueCommandBuilder().
|
cmd, err := riak.NewStoreValueCommandBuilder().
|
||||||
WithBucket(DefaultNamespace).
|
WithBucket(namespace).
|
||||||
WithContent(obj).
|
WithContent(obj).
|
||||||
Build()
|
Build()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue