storage/leveldb.go

129 lines
3.0 KiB
Go
Executable File

package storage
import (
"gogs.inhome.blapointe.com/local/storage/resolve"
"path"
"strconv"
"strings"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type LevelDB struct {
db *leveldb.DB
}
func NewLevelDB(path string) (*LevelDB, error) {
db, err := leveldb.OpenFile(path, &opt.Options{
Filter: filter.NewBloomFilter(32),
OpenFilesCacheCapacity: 25,
})
if err != nil {
db, err = leveldb.RecoverFile(path, &opt.Options{
Filter: filter.NewBloomFilter(32),
OpenFilesCacheCapacity: 25,
})
if err == nil && db.Close() == nil {
db, err = leveldb.OpenFile(path, &opt.Options{
Filter: filter.NewBloomFilter(32),
OpenFilesCacheCapacity: 25,
})
}
}
return &LevelDB{
db: db,
}, err
}
func (ldb *LevelDB) Get(key string, ns ...string) ([]byte, error) {
namespace := resolve.Namespace(ns)
snapshot, err := ldb.db.GetSnapshot()
if err != nil {
return nil, err
}
defer snapshot.Release()
v, err := snapshot.Get([]byte(path.Join(namespace, key)), nil)
if err == leveldb.ErrNotFound {
err = ErrNotFound
} else if err != nil {
return nil, err
}
return v, err
}
func (ldb *LevelDB) Set(key string, value []byte, ns ...string) error {
namespace := resolve.Namespace(ns)
key = path.Join(namespace, key)
batch := &leveldb.Batch{}
if value != nil {
batch.Put([]byte(key), value)
} else {
batch.Delete([]byte(key))
}
return ldb.db.Write(batch, nil)
}
func (ldb *LevelDB) Close() error {
return ldb.db.Close()
}
func (ldb *LevelDB) List(ns []string, limits ...string) ([]string, error) {
namespace := path.Join(resolve.Namespace(ns))
limits = resolve.Limits(limits)
it, next := ldb.getIterator(namespace, limits)
defer it.Release()
keys := ldb.useIterator(it, next, namespace, limits)
err := it.Error()
return keys, err
}
func (ldb *LevelDB) getIterator(namespace string, limits []string) (iterator.Iterator, func() bool) {
limits[0] = path.Join(namespace, limits[0])
limits[1] = path.Join(namespace, limits[1])
asc := limits[3] == "true"
r := util.BytesPrefix([]byte(namespace))
it := ldb.db.NewIterator(r, nil)
next := it.Next
it.First()
if !asc {
next = it.Prev
it.Last()
}
return it, next
}
func (ldb *LevelDB) useIterator(it iterator.Iterator, next func() bool, namespace string, limits []string) []string {
limits = resolve.Limits(limits)
n, _ := strconv.Atoi(limits[2])
m := 0
for it.Error() == nil && !ldb.inRange(string(it.Key()), limits[0], limits[1]) && next() {
}
keys := []string{}
for it.Error() == nil && ldb.inRange(string(it.Key()), limits[0], limits[1]) {
k := string(it.Key())
k = strings.TrimPrefix(k, namespace+"/")
keys = append(keys, k)
m += 1
if n > 0 && m >= n {
break
}
if !next() {
break
}
}
return keys
}
func (ldb *LevelDB) inRange(k, start, stop string) bool {
return k != "" && k >= start && k <= stop
}