diff --git a/replicator/driver.go b/replicator/driver.go index fbb3ebe..7f8f03d 100644 --- a/replicator/driver.go +++ b/replicator/driver.go @@ -5,14 +5,15 @@ import ( "fmt" "net/url" "path" + "time" ) type ( Driver interface { - Keys(context.Context) (chan Key, *error) - Get(context.Context, Key) (Value, error) - Set(context.Context, Key, Value) error - Del(context.Context, Key) error + KeysSince(context.Context, time.Time) (chan KeyVersion, *error) + Get(context.Context, Key) (ValueVersion, error) + Set(context.Context, Key, Value, Version) error + Del(context.Context, Key, Version) error } Key struct { @@ -20,10 +21,19 @@ type ( Key string } - Value struct { - Value []byte + Value []byte + + Version []byte + + KeyVersion struct { + Key Key Version []byte } + + ValueVersion struct { + Value []byte + Version Version + } ) func NewDriver(ctx context.Context, driver url.URL) (Driver, error) { @@ -34,6 +44,8 @@ func NewDriver(ctx context.Context, driver url.URL) (Driver, error) { p = path.Join(driver.Host, p) } return NewFileTree(p) + case "map": + return NewMap(), nil default: return nil, fmt.Errorf("unknown driver spec %s", driver.String()) } diff --git a/replicator/driver_test.go b/replicator/driver_test.go index b91dada..f5a05b7 100644 --- a/replicator/driver_test.go +++ b/replicator/driver_test.go @@ -9,6 +9,7 @@ import ( func TestDriverInterface(t *testing.T) { var _ Driver = FileTree("") + var _ Driver = Map{} } func testDriver(t *testing.T, d Driver) { @@ -16,7 +17,7 @@ func testDriver(t *testing.T, d Driver) { defer can() key := Key{Namespace: "x/y", Key: "z"} - value := Value{Value: []byte(t.Name()), Version: []byte("1")} + value := ValueVersion{Value: []byte(t.Name()), Version: []byte("1")} t.Run("get does not exist", func(t *testing.T) { v, err := d.Get(ctx, key) @@ -29,7 +30,7 @@ func testDriver(t *testing.T, d Driver) { }) t.Run("404 set get", func(t *testing.T) { - if err := d.Set(ctx, key, value); err != nil { + if err := d.Set(ctx, key, value.Value, value.Version); err != nil { t.Fatal(err) } @@ -44,13 +45,13 @@ func testDriver(t *testing.T, d Driver) { t.Errorf("version didnt match set-get: want %q, got %q", value.Version, v.Version) } - if err := d.Del(ctx, key); err != nil { + if err := d.Del(ctx, key, nil); err != nil { t.Errorf("failed to clean up: %v", err) } }) t.Run("keys of nothing", func(t *testing.T) { - ch, err := d.Keys(ctx) + ch, err := d.KeysSince(ctx, time.Time{}) for got := range ch { t.Error("expected nothing but got", got) } @@ -60,16 +61,18 @@ func testDriver(t *testing.T, d Driver) { }) t.Run("keys of one key", func(t *testing.T) { - if err := d.Set(ctx, key, value); err != nil { + if err := d.Set(ctx, key, value.Value, value.Version); err != nil { t.Fatal(err) } - defer d.Del(ctx, key) + defer d.Del(ctx, key, nil) - ch, err := d.Keys(ctx) + ch, err := d.KeysSince(ctx, time.Time{}) n := 0 for got := range ch { n += 1 - if got != key { + if got.Key != key { + t.Error(got) + } else if !bytes.Equal(got.Version, value.Version) { t.Error(got) } } diff --git a/replicator/filetree.go b/replicator/filetree.go index 91230b2..f3fb93d 100644 --- a/replicator/filetree.go +++ b/replicator/filetree.go @@ -20,8 +20,8 @@ func NewFileTree(root string) (FileTree, error) { return FileTree(root), nil } -func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) { - result := make(chan Key) +func (tree FileTree) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) { + result := make(chan KeyVersion) var final error go func() { defer close(result) @@ -33,8 +33,12 @@ func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) { return ctx.Err() } key := tree.toKey(p) + version, err := tree.getVersion(key) + if err != nil { + return err + } select { - case result <- key: + case result <- KeyVersion{Key: key, Version: version}: case <-ctx.Done(): } return ctx.Err() @@ -45,40 +49,46 @@ func (tree FileTree) Keys(ctx context.Context) (chan Key, *error) { return result, &final } -func (tree FileTree) Get(ctx context.Context, key Key) (Value, error) { +func (tree FileTree) Get(ctx context.Context, key Key) (ValueVersion, error) { version, err := tree.getVersion(key) if version == nil { - return Value{}, err + return ValueVersion{}, err } f, err := os.Open(tree.realpath(key)) if err != nil { - return Value{}, err + return ValueVersion{}, err } defer f.Close() b, _ := io.ReadAll(f) - return Value{ + return ValueVersion{ Value: b, Version: version, }, nil } -func (tree FileTree) Set(ctx context.Context, key Key, value Value) error { - version, err := tree.fromVersion(value.Version) +func (tree FileTree) Set(ctx context.Context, key Key, value Value, version Version) error { + v, err := tree.fromVersion(version) if err != nil { return err } + if oldv, err := tree._getVersion(key); err != nil { + return err + } else if v.Before(oldv) { + return nil // conflict + } + f2, err := ioutil.TempFile(os.TempDir(), "*.*") if err != nil { return err } defer os.Remove(f2.Name()) - if n, err := f2.Write(value.Value); err != nil { + if n, err := f2.Write(value); err != nil { return err - } else if n != len(value.Value) { + } else if n != len(value) { return fmt.Errorf("truncated write") } @@ -86,7 +96,7 @@ func (tree FileTree) Set(ctx context.Context, key Key, value Value) error { return err } - if err := os.Chtimes(f2.Name(), time.Time{}, version); err != nil { + if err := os.Chtimes(f2.Name(), time.Time{}, v); err != nil { return err } @@ -94,12 +104,20 @@ func (tree FileTree) Set(ctx context.Context, key Key, value Value) error { return os.Rename(f2.Name(), tree.realpath(key)) } -func (tree FileTree) Del(ctx context.Context, key Key) error { - if _, err := os.Stat(tree.realpath(key)); os.IsNotExist(err) { - return nil - } else if err != nil { - return err +func (tree FileTree) Del(ctx context.Context, key Key, version Version) error { + if version != nil { + v, err := tree.fromVersion(version) + if err != nil { + return err + } + + if oldv, err := tree._getVersion(key); err != nil { + return err + } else if v.Before(oldv) { + return nil // conflict + } } + return os.Remove(tree.realpath(key)) } @@ -115,14 +133,22 @@ func (tree FileTree) realpath(key Key) string { } func (tree FileTree) getVersion(key Key) ([]byte, error) { - info, err := os.Stat(tree.realpath(key)) - if os.IsNotExist(err) { - return nil, nil - } - if err != nil { + v, err := tree._getVersion(key) + if v == (time.Time{}) { return nil, err } - return []byte(strconv.FormatInt(info.ModTime().UnixNano(), 10)), nil + return []byte(strconv.FormatInt(v.UnixNano(), 10)), err +} + +func (tree FileTree) _getVersion(key Key) (time.Time, error) { + info, err := os.Stat(tree.realpath(key)) + if os.IsNotExist(err) { + return time.Time{}, nil + } + if err != nil { + return time.Time{}, err + } + return info.ModTime(), nil } func (tree FileTree) fromVersion(v []byte) (time.Time, error) { diff --git a/replicator/map.go b/replicator/map.go new file mode 100644 index 0000000..1654301 --- /dev/null +++ b/replicator/map.go @@ -0,0 +1,43 @@ +package replicator + +import ( + "context" + "io" + "time" +) + +type Map map[Key]ValueVersion + +func NewMap() Map { + return make(Map) +} + +func (m Map) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) { + result := make(chan KeyVersion) + var err error + go func() { + defer close(result) + for k := range m { + select { + case result <- KeyVersion{Key: k, Version: m[k].Version}: + case <-ctx.Done(): + err = ctx.Err() + return + } + } + }() + return result, &err +} + +func (m Map) Get(_ context.Context, k Key) (ValueVersion, error) { + return ValueVersion{}, io.EOF +} + +func (m Map) Set(_ context.Context, key Key, value Value, version Version) error { + m[key] = ValueVersion{Value: value, Version: version} + return nil +} + +func (m Map) Del(_ context.Context, k Key, v Version) error { + return io.EOF +} diff --git a/replicator/map_test.go b/replicator/map_test.go new file mode 100644 index 0000000..e9ba044 --- /dev/null +++ b/replicator/map_test.go @@ -0,0 +1,9 @@ +package replicator + +import ( + "testing" +) + +func TestMap(t *testing.T) { + testDriver(t, NewMap()) +}