From dcf1594e17745202333aa794102e8af93ec68aec Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Sun, 7 Feb 2021 12:53:56 -0600 Subject: [PATCH] Create dbstream interface --- db.go | 6 ++++++ db_test.go | 16 ++++++++++++++++ files.go | 30 +++++++++++++++++++++++++++--- rclone.go | 21 +++++++++++++++------ 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/db.go b/db.go index 44927cf..3ceeec1 100755 --- a/db.go +++ b/db.go @@ -2,10 +2,16 @@ package storage import ( "fmt" + "io" "strconv" "strings" ) +type DBStream interface { + GetStream(string, ...string) (io.Reader, error) + SetStream(string, io.Reader, ...string) error +} + type DB interface { Get(string, ...string) ([]byte, error) Set(string, []byte, ...string) error diff --git a/db_test.go b/db_test.go index 2aa25ef..b86c3ed 100755 --- a/db_test.go +++ b/db_test.go @@ -207,6 +207,22 @@ type = local } else if keys[0] != validKey { t.Errorf("%T) List(prefix)[0] != %s: %s", db, validKey, keys[0]) } + + if dbstream, ok := db.(DBStream); ok { + log.Printf("trying %T as DBStream", dbstream) + raw := "raw" + if err := dbstream.SetStream("k", strings.NewReader(raw), "ns1", "ns2"); err != nil { + t.Errorf("%T) cannot setstream: %v", dbstream, err) + } + if r, err := dbstream.GetStream("k", "ns1", "ns2"); err != nil { + t.Errorf("%T) cannot getstream: %v", dbstream, err) + } else if b, err := ioutil.ReadAll(r); err != nil { + t.Errorf("%T) cannot readall getstream: %v", dbstream, err) + } else if string(b) != raw { + t.Errorf("%T) wrong getstream: %v", dbstream, string(b)) + } + } + t.Logf(" %T: close", db) if err := db.Close(); err != nil { t.Errorf("cannot close %T: %v", db, err) diff --git a/files.go b/files.go index dcaaecb..c71abf0 100755 --- a/files.go +++ b/files.go @@ -1,6 +1,8 @@ package storage import ( + "bytes" + "io" "io/ioutil" "os" "path" @@ -49,15 +51,31 @@ func (b *Files) List(ns []string, limits ...string) ([]string, error) { } func (b *Files) Get(key string, ns ...string) ([]byte, error) { + r, err := b.GetStream(key, ns...) + if err != nil { + return nil, err + } + return ioutil.ReadAll(r) +} + +func (b *Files) GetStream(key string, ns ...string) (io.Reader, error) { namespace := resolveNamespace(ns) path := path.Join(b.root, namespace, key) - return ioutil.ReadFile(path) + return os.Open(path) } func (b *Files) Set(key string, value []byte, ns ...string) error { + r := bytes.NewReader(value) + if value == nil { + r = nil + } + return b.SetStream(key, r, ns...) +} + +func (b *Files) SetStream(key string, r io.Reader, ns ...string) error { namespace := resolveNamespace(ns) dir := path.Join(b.root, namespace) - if value == nil { + if r == nil { err := os.Remove(path.Join(dir, key)) if os.IsNotExist(err) { err = nil @@ -68,7 +86,13 @@ func (b *Files) Set(key string, value []byte, ns ...string) error { return err } path := path.Join(dir, key) - return ioutil.WriteFile(path, value, os.ModePerm) + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + _, err = io.Copy(f, r) + return err } func (b *Files) Close() error { diff --git a/rclone.go b/rclone.go index 19d73a8..4bf6c67 100755 --- a/rclone.go +++ b/rclone.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "io" "io/ioutil" "os" "path" @@ -35,6 +36,14 @@ func NewRClone(rclone string, ns ...string) (*RClone, error) { } func (rc *RClone) Get(key string, ns ...string) ([]byte, error) { + r, err := rc.GetStream(key, ns...) + if err != nil { + return nil, err + } + return ioutil.ReadAll(r) +} + +func (rc *RClone) GetStream(key string, ns ...string) (io.Reader, error) { namespace := rc.ns if len(ns) > 0 { namespace = path.Join(rc.ns, resolveNamespace(ns)) @@ -49,17 +58,17 @@ func (rc *RClone) Get(key string, ns ...string) ([]byte, error) { if err != nil { return nil, err } - r, err := obj.Open() - if err != nil { - return nil, err - } - return ioutil.ReadAll(r) + return obj.Open() } func (rc *RClone) Set(key string, value []byte, ns ...string) error { if len(value) == 0 { return rc.Del(key, ns...) } + return rc.SetStream(key, bytes.NewReader(value), ns...) +} + +func (rc *RClone) SetStream(key string, r io.Reader, ns ...string) error { namespace := rc.ns if len(ns) > 0 { namespace = path.Join(rc.ns, resolveNamespace(ns)) @@ -71,7 +80,7 @@ func (rc *RClone) Set(key string, value []byte, ns ...string) error { return err } obj := object.NewStaticObjectInfo(path.Base(key), time.Now(), 0, true, nil, f) - _, err = f.Put(bytes.NewReader(value), obj) + _, err = f.Put(r, obj) return err }