Create dbstream interface

master
Bel LaPointe 2021-02-07 12:53:56 -06:00
parent c0d561aa50
commit dcf1594e17
4 changed files with 64 additions and 9 deletions

6
db.go
View File

@ -2,10 +2,16 @@ package storage
import ( import (
"fmt" "fmt"
"io"
"strconv" "strconv"
"strings" "strings"
) )
type DBStream interface {
GetStream(string, ...string) (io.Reader, error)
SetStream(string, io.Reader, ...string) error
}
type DB interface { type DB interface {
Get(string, ...string) ([]byte, error) Get(string, ...string) ([]byte, error)
Set(string, []byte, ...string) error Set(string, []byte, ...string) error

View File

@ -207,6 +207,22 @@ type = local
} else if keys[0] != validKey { } else if keys[0] != validKey {
t.Errorf("%T) List(prefix)[0] != %s: %s", db, validKey, keys[0]) t.Errorf("%T) List(prefix)[0] != %s: %s", db, validKey, keys[0])
} }
if dbstream, ok := db.(DBStream); ok {
log.Printf("trying %T as DBStream", dbstream)
raw := "raw"
if err := dbstream.SetStream("k", strings.NewReader(raw), "ns1", "ns2"); err != nil {
t.Errorf("%T) cannot setstream: %v", dbstream, err)
}
if r, err := dbstream.GetStream("k", "ns1", "ns2"); err != nil {
t.Errorf("%T) cannot getstream: %v", dbstream, err)
} else if b, err := ioutil.ReadAll(r); err != nil {
t.Errorf("%T) cannot readall getstream: %v", dbstream, err)
} else if string(b) != raw {
t.Errorf("%T) wrong getstream: %v", dbstream, string(b))
}
}
t.Logf(" %T: close", db) t.Logf(" %T: close", db)
if err := db.Close(); err != nil { if err := db.Close(); err != nil {
t.Errorf("cannot close %T: %v", db, err) t.Errorf("cannot close %T: %v", db, err)

View File

@ -1,6 +1,8 @@
package storage package storage
import ( import (
"bytes"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
@ -49,15 +51,31 @@ func (b *Files) List(ns []string, limits ...string) ([]string, error) {
} }
func (b *Files) Get(key string, ns ...string) ([]byte, error) { func (b *Files) Get(key string, ns ...string) ([]byte, error) {
r, err := b.GetStream(key, ns...)
if err != nil {
return nil, err
}
return ioutil.ReadAll(r)
}
func (b *Files) GetStream(key string, ns ...string) (io.Reader, error) {
namespace := resolveNamespace(ns) namespace := resolveNamespace(ns)
path := path.Join(b.root, namespace, key) path := path.Join(b.root, namespace, key)
return ioutil.ReadFile(path) return os.Open(path)
} }
func (b *Files) Set(key string, value []byte, ns ...string) error { func (b *Files) Set(key string, value []byte, ns ...string) error {
r := bytes.NewReader(value)
if value == nil {
r = nil
}
return b.SetStream(key, r, ns...)
}
func (b *Files) SetStream(key string, r io.Reader, ns ...string) error {
namespace := resolveNamespace(ns) namespace := resolveNamespace(ns)
dir := path.Join(b.root, namespace) dir := path.Join(b.root, namespace)
if value == nil { if r == nil {
err := os.Remove(path.Join(dir, key)) err := os.Remove(path.Join(dir, key))
if os.IsNotExist(err) { if os.IsNotExist(err) {
err = nil err = nil
@ -68,7 +86,13 @@ func (b *Files) Set(key string, value []byte, ns ...string) error {
return err return err
} }
path := path.Join(dir, key) path := path.Join(dir, key)
return ioutil.WriteFile(path, value, os.ModePerm) f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, r)
return err
} }
func (b *Files) Close() error { func (b *Files) Close() error {

View File

@ -2,6 +2,7 @@ package storage
import ( import (
"bytes" "bytes"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
@ -35,6 +36,14 @@ func NewRClone(rclone string, ns ...string) (*RClone, error) {
} }
func (rc *RClone) Get(key string, ns ...string) ([]byte, error) { func (rc *RClone) Get(key string, ns ...string) ([]byte, error) {
r, err := rc.GetStream(key, ns...)
if err != nil {
return nil, err
}
return ioutil.ReadAll(r)
}
func (rc *RClone) GetStream(key string, ns ...string) (io.Reader, error) {
namespace := rc.ns namespace := rc.ns
if len(ns) > 0 { if len(ns) > 0 {
namespace = path.Join(rc.ns, resolveNamespace(ns)) namespace = path.Join(rc.ns, resolveNamespace(ns))
@ -49,17 +58,17 @@ func (rc *RClone) Get(key string, ns ...string) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
r, err := obj.Open() return obj.Open()
if err != nil {
return nil, err
}
return ioutil.ReadAll(r)
} }
func (rc *RClone) Set(key string, value []byte, ns ...string) error { func (rc *RClone) Set(key string, value []byte, ns ...string) error {
if len(value) == 0 { if len(value) == 0 {
return rc.Del(key, ns...) return rc.Del(key, ns...)
} }
return rc.SetStream(key, bytes.NewReader(value), ns...)
}
func (rc *RClone) SetStream(key string, r io.Reader, ns ...string) error {
namespace := rc.ns namespace := rc.ns
if len(ns) > 0 { if len(ns) > 0 {
namespace = path.Join(rc.ns, resolveNamespace(ns)) namespace = path.Join(rc.ns, resolveNamespace(ns))
@ -71,7 +80,7 @@ func (rc *RClone) Set(key string, value []byte, ns ...string) error {
return err return err
} }
obj := object.NewStaticObjectInfo(path.Base(key), time.Now(), 0, true, nil, f) obj := object.NewStaticObjectInfo(path.Base(key), time.Now(), 0, true, nil, f)
_, err = f.Put(bytes.NewReader(value), obj) _, err = f.Put(r, obj)
return err return err
} }