versioned interface

main
bel 2023-11-05 13:47:14 -07:00
parent ceb28081d6
commit 25b7bd21e8
5 changed files with 130 additions and 37 deletions

View File

@ -5,14 +5,15 @@ import (
"fmt"
"net/url"
"path"
"time"
)
type (
Driver interface {
Keys(context.Context) (chan Key, *error)
Get(context.Context, Key) (Value, error)
Set(context.Context, Key, Value) error
Del(context.Context, Key) error
KeysSince(context.Context, time.Time) (chan KeyVersion, *error)
Get(context.Context, Key) (ValueVersion, error)
Set(context.Context, Key, Value, Version) error
Del(context.Context, Key, Version) error
}
Key struct {
@ -20,9 +21,18 @@ type (
Key string
}
Value struct {
Value []byte
Version []byte
KeyVersion struct {
Key Key
Version []byte
}
ValueVersion struct {
Value []byte
Version Version
}
)
@ -34,6 +44,8 @@ func NewDriver(ctx context.Context, driver url.URL) (Driver, error) {
p = path.Join(driver.Host, p)
}
return NewFileTree(p)
case "map":
return NewMap(), nil
default:
return nil, fmt.Errorf("unknown driver spec %s", driver.String())
}

View File

@ -9,6 +9,7 @@ import (
func TestDriverInterface(t *testing.T) {
var _ Driver = FileTree("")
var _ Driver = Map{}
}
func testDriver(t *testing.T, d Driver) {
@ -16,7 +17,7 @@ func testDriver(t *testing.T, d Driver) {
defer can()
key := Key{Namespace: "x/y", Key: "z"}
value := Value{Value: []byte(t.Name()), Version: []byte("1")}
value := ValueVersion{Value: []byte(t.Name()), Version: []byte("1")}
t.Run("get does not exist", func(t *testing.T) {
v, err := d.Get(ctx, key)
@ -29,7 +30,7 @@ func testDriver(t *testing.T, d Driver) {
})
t.Run("404 set get", func(t *testing.T) {
if err := d.Set(ctx, key, value); err != nil {
if err := d.Set(ctx, key, value.Value, value.Version); err != nil {
t.Fatal(err)
}
@ -44,13 +45,13 @@ func testDriver(t *testing.T, d Driver) {
t.Errorf("version didnt match set-get: want %q, got %q", value.Version, v.Version)
}
if err := d.Del(ctx, key); err != nil {
if err := d.Del(ctx, key, nil); err != nil {
t.Errorf("failed to clean up: %v", err)
}
})
t.Run("keys of nothing", func(t *testing.T) {
ch, err := d.Keys(ctx)
ch, err := d.KeysSince(ctx, time.Time{})
for got := range ch {
t.Error("expected nothing but got", got)
}
@ -60,16 +61,18 @@ func testDriver(t *testing.T, d Driver) {
})
t.Run("keys of one key", func(t *testing.T) {
if err := d.Set(ctx, key, value); err != nil {
if err := d.Set(ctx, key, value.Value, value.Version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key)
defer d.Del(ctx, key, nil)
ch, err := d.Keys(ctx)
ch, err := d.KeysSince(ctx, time.Time{})
n := 0
for got := range ch {
n += 1
if got != key {
if got.Key != key {
t.Error(got)
} else if !bytes.Equal(got.Version, value.Version) {
t.Error(got)
}
}

View File

@ -20,8 +20,8 @@ func NewFileTree(root string) (FileTree, error) {
return FileTree(root), nil
}
func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) {
result := make(chan Key)
func (tree FileTree) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
result := make(chan KeyVersion)
var final error
go func() {
defer close(result)
@ -33,8 +33,12 @@ func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) {
return ctx.Err()
}
key := tree.toKey(p)
version, err := tree.getVersion(key)
if err != nil {
return err
}
select {
case result <- key:
case result <- KeyVersion{Key: key, Version: version}:
case <-ctx.Done():
}
return ctx.Err()
@ -45,40 +49,46 @@ func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) {
return result, &final
}
func (tree FileTree) Get(ctx context.Context, key Key) (Value, error) {
func (tree FileTree) Get(ctx context.Context, key Key) (ValueVersion, error) {
version, err := tree.getVersion(key)
if version == nil {
return Value{}, err
return ValueVersion{}, err
}
f, err := os.Open(tree.realpath(key))
if err != nil {
return Value{}, err
return ValueVersion{}, err
}
defer f.Close()
b, _ := io.ReadAll(f)
return Value{
return ValueVersion{
Value: b,
Version: version,
}, nil
}
func (tree FileTree) Set(ctx context.Context, key Key, value Value) error {
version, err := tree.fromVersion(value.Version)
func (tree FileTree) Set(ctx context.Context, key Key, value Value, version Version) error {
v, err := tree.fromVersion(version)
if err != nil {
return err
}
if oldv, err := tree._getVersion(key); err != nil {
return err
} else if v.Before(oldv) {
return nil // conflict
}
f2, err := ioutil.TempFile(os.TempDir(), "*.*")
if err != nil {
return err
}
defer os.Remove(f2.Name())
if n, err := f2.Write(value.Value); err != nil {
if n, err := f2.Write(value); err != nil {
return err
} else if n != len(value.Value) {
} else if n != len(value) {
return fmt.Errorf("truncated write")
}
@ -86,7 +96,7 @@ func (tree FileTree) Set(ctx context.Context, key Key, value Value) error {
return err
}
if err := os.Chtimes(f2.Name(), time.Time{}, version); err != nil {
if err := os.Chtimes(f2.Name(), time.Time{}, v); err != nil {
return err
}
@ -94,12 +104,20 @@ func (tree FileTree) Set(ctx context.Context, key Key, value Value) error {
return os.Rename(f2.Name(), tree.realpath(key))
}
func (tree FileTree) Del(ctx context.Context, key Key) error {
if _, err := os.Stat(tree.realpath(key)); os.IsNotExist(err) {
return nil
} else if err != nil {
func (tree FileTree) Del(ctx context.Context, key Key, version Version) error {
if version != nil {
v, err := tree.fromVersion(version)
if err != nil {
return err
}
if oldv, err := tree._getVersion(key); err != nil {
return err
} else if v.Before(oldv) {
return nil // conflict
}
}
return os.Remove(tree.realpath(key))
}
@ -115,14 +133,22 @@ func (tree FileTree) realpath(key Key) string {
}
func (tree FileTree) getVersion(key Key) ([]byte, error) {
info, err := os.Stat(tree.realpath(key))
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
v, err := tree._getVersion(key)
if v == (time.Time{}) {
return nil, err
}
return []byte(strconv.FormatInt(info.ModTime().UnixNano(), 10)), nil
return []byte(strconv.FormatInt(v.UnixNano(), 10)), err
}
func (tree FileTree) _getVersion(key Key) (time.Time, error) {
info, err := os.Stat(tree.realpath(key))
if os.IsNotExist(err) {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, err
}
return info.ModTime(), nil
}
func (tree FileTree) fromVersion(v []byte) (time.Time, error) {

43
replicator/map.go Normal file
View File

@ -0,0 +1,43 @@
package replicator
import (
"context"
"io"
"time"
)
type Map map[Key]ValueVersion
func NewMap() Map {
return make(Map)
}
func (m Map) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
result := make(chan KeyVersion)
var err error
go func() {
defer close(result)
for k := range m {
select {
case result <- KeyVersion{Key: k, Version: m[k].Version}:
case <-ctx.Done():
err = ctx.Err()
return
}
}
}()
return result, &err
}
func (m Map) Get(_ context.Context, k Key) (ValueVersion, error) {
return ValueVersion{}, io.EOF
}
func (m Map) Set(_ context.Context, key Key, value Value, version Version) error {
m[key] = ValueVersion{Value: value, Version: version}
return nil
}
func (m Map) Del(_ context.Context, k Key, v Version) error {
return io.EOF
}

9
replicator/map_test.go Normal file
View File

@ -0,0 +1,9 @@
package replicator
import (
"testing"
)
func TestMap(t *testing.T) {
testDriver(t, NewMap())
}