Compare commits

...

23 Commits

Author SHA1 Message Date
Bel LaPointe
4da502676a readme 2023-11-06 08:09:51 -07:00
Bel LaPointe
d16056bcc2 mvp 2023-11-06 06:28:39 -07:00
Bel LaPointe
11f1c9cefd Duration for JSON/YAML marshalling 2023-11-06 06:17:13 -07:00
Bel LaPointe
7a88fc6f5e more test and test replicator parallel 2023-11-06 06:00:22 -07:00
Bel LaPointe
38a3df7bda it is a naiive thing 2023-11-06 05:49:34 -07:00
Bel LaPointe
071935f0f3 it is a naiive thing 2023-11-06 05:49:26 -07:00
Bel LaPointe
1a586de656 it is a naiive thing 2023-11-06 05:49:08 -07:00
bel
6d6df1da80 tdd is fun sometimes 2023-11-05 21:40:01 -07:00
bel
5e7381c04b test fails appropriately 2023-11-05 21:35:28 -07:00
bel
aa8f348b11 add Must driver 2023-11-05 21:21:30 -07:00
bel
9aa56d2b9a tests pass 2023-11-05 20:50:47 -07:00
bel
2d30ee39af new tests show one case fails 2023-11-05 20:42:32 -07:00
bel
6856882ba8 stub more tests 2023-11-05 20:04:56 -07:00
bel
e227483614 todos 2023-11-05 20:00:22 -07:00
bel
70ad21156f todo 2023-11-05 13:53:31 -07:00
bel
be7fa572aa map passes driver test 2023-11-05 13:53:18 -07:00
bel
b215a81c59 map to thread safe 2023-11-05 13:48:48 -07:00
bel
25b7bd21e8 versioned interface 2023-11-05 13:47:14 -07:00
bel
ceb28081d6 change NewFileTree arg from url to path 2023-11-05 09:33:28 -07:00
bel
a2b66a03db impl filetree driver 2023-11-05 09:32:00 -07:00
bel
fcc7567a7d stub 2023-11-05 08:01:43 -07:00
bel
a51995b9bd stubby 2023-11-05 07:19:41 -07:00
bel
b8b4e1289b stub 2023-11-05 07:19:03 -07:00
16 changed files with 1084 additions and 2 deletions

View File

@@ -2,5 +2,5 @@
Given source DB,
1. fully sync to local db
1. continuously sync to local db
1. fully sync from src to dest db
1. continuously sync from src to dest db

5
go.mod Normal file
View File

@@ -0,0 +1,5 @@
module replicator
go 1.21.1
require gopkg.in/yaml.v3 v3.0.1 // indirect

3
go.sum Normal file
View File

@@ -0,0 +1,3 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

68
main.go Normal file
View File

@@ -0,0 +1,68 @@
package main
import (
"context"
"flag"
"net/url"
"os"
"os/signal"
"replicator/replicator"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
config, err := getConfig()
if err != nil {
panic(err)
}
if err := replicator.Main(ctx, config); err != nil && ctx.Err() == nil {
panic(err)
}
}
func getConfig() (replicator.Config, error) {
var config replicator.Config
if p := os.Getenv("CONFIG"); p != "" {
f, err := os.Open(p)
if err != nil {
return config, err
}
defer f.Close()
if err := yaml.NewDecoder(f).Decode(&config); err != nil {
return config, err
}
}
var srcDriver, destDriver, interval string
flag.StringVar(&srcDriver, "src", "map://", "source db")
flag.StringVar(&destDriver, "dest", "map://", "dest db")
flag.StringVar(&interval, "interval", "1s", "interval")
flag.Parse()
if srcDriver, err := url.Parse(srcDriver); err != nil {
return config, err
} else {
config.Src.Driver = *srcDriver
}
if destDriver, err := url.Parse(destDriver); err != nil {
return config, err
} else {
config.Dest.Driver = *destDriver
}
if interval, err := time.ParseDuration(interval); err != nil {
return config, err
} else {
config.Interval = replicator.Duration(interval)
}
return config, nil
}

53
replicator/config.go Normal file
View File

@@ -0,0 +1,53 @@
package replicator
import (
"encoding/json"
"net/url"
"time"
"gopkg.in/yaml.v3"
)
type (
Config struct {
Src Storage
Dest Storage
Interval Duration
}
Storage struct {
Driver url.URL
}
Duration time.Duration
)
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
d2, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(d2)
return nil
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
b, _ := json.Marshal(s)
return d.UnmarshalJSON(b)
}
func (d Duration) MarshalYAML() (interface{}, error) {
return time.Duration(d).String(), nil
}

30
replicator/config_test.go Normal file
View File

@@ -0,0 +1,30 @@
package replicator
import (
"encoding/json"
"testing"
"time"
"gopkg.in/yaml.v3"
)
func TestDuration(t *testing.T) {
d := Duration(3*time.Second + time.Minute)
var d2 Duration
if b, err := json.Marshal(d); err != nil {
t.Fatal(err)
} else if err := json.Unmarshal(b, &d2); err != nil {
t.Fatal(err)
} else if d != d2 {
t.Fatal(d, d2)
}
var d3 Duration
if b, err := yaml.Marshal(d); err != nil {
t.Fatal(err)
} else if err := yaml.Unmarshal(b, &d3); err != nil {
t.Fatal(err)
} else if d != d3 {
t.Fatal(d, d3)
}
}

79
replicator/driver.go Normal file
View File

@@ -0,0 +1,79 @@
package replicator
import (
"bytes"
"context"
"fmt"
"net/url"
"path"
"strconv"
"time"
)
type (
Driver interface {
KeysSince(context.Context, time.Time) (chan KeyVersion, *error)
Get(context.Context, Key) (ValueVersion, error)
Set(context.Context, Key, Value, Version) error
Del(context.Context, Key, Version) error
}
Key struct {
Namespace string
Key string
}
Value []byte
Version []byte
KeyVersion struct {
Key Key
Version Version
}
ValueVersion struct {
Value Value
Version Version
}
)
func NewDriver(ctx context.Context, driver url.URL) (Driver, error) {
switch driver.Scheme {
case "filetree":
p := driver.Path
if driver.Host != "" {
p = path.Join(driver.Host, p)
}
return NewFileTree(p)
case "map":
return NewMap(), nil
default:
return nil, fmt.Errorf("unknown driver spec %s", driver.String())
}
}
func (version Version) AsTime() (time.Time, error) {
if len(version) == 0 {
return time.Time{}, nil
}
n, err := strconv.ParseInt(string(version), 10, 64)
return time.Unix(0, n), err
}
func TimeAsVersion(t time.Time) Version {
n := t.UnixNano()
return []byte(strconv.FormatInt(n, 10))
}
func (version Version) Equal(other Version) bool {
return bytes.Equal(version, other)
}
func (key Key) Equal(other Key) bool {
return key == other
}
func (value Value) Equal(other Value) bool {
return bytes.Equal(value, other)
}

262
replicator/driver_test.go Normal file
View File

@@ -0,0 +1,262 @@
package replicator
import (
"bytes"
"context"
"testing"
"time"
)
func TestDriverInterface(t *testing.T) {
var _ Driver = FileTree("")
var _ Driver = Map{}
var _ Driver = Must{}
}
func testDriver(t *testing.T, d Driver) {
ctx, can := context.WithTimeout(context.Background(), time.Second*2)
defer can()
key := Key{Namespace: "x/y", Key: "z"}
version := TimeAsVersion(time.Now())
value := Value([]byte(t.Name()))
t.Run("get does not exist", func(t *testing.T) {
v, err := d.Get(ctx, key)
if err != nil {
t.Errorf("getting 404 returned an err: %v", err)
}
if v.Value != nil || v.Version != nil {
t.Errorf("expected nil for 404 but got %q/%s", v.Value, v.Version)
}
})
t.Run("404 set get", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
v, err := d.Get(ctx, key)
if err != nil {
t.Errorf("getting key returned an err: %v", err)
}
if !bytes.Equal(v.Value, value) {
t.Errorf("value didnt match set-get: want %q, got %q", value, v.Value)
}
if !bytes.Equal(v.Version, version) {
t.Errorf("version didnt match set-get: want %q, got %q", version, v.Version)
}
if err := d.Del(ctx, key, nil); err != nil {
t.Errorf("failed to clean up: %v", err)
}
})
t.Run("keys of nothing", func(t *testing.T) {
ch, err := d.KeysSince(ctx, time.Time{})
for got := range ch {
t.Error("expected nothing but got", got)
}
if *err != nil {
t.Error(*err)
}
})
t.Run("keys of one key", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
ch, err := d.KeysSince(ctx, time.Time{})
n := 0
for got := range ch {
n += 1
if got.Key != key {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
}
if n == 0 {
t.Error("expected to find at least 1 key")
}
if *err != nil {
t.Error(*err)
}
})
t.Run("conditional set vs nothing", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional set vs older", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, oldValue) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional set vs equal", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, oldValue) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
if err := d.Set(ctx, key, value, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
})
t.Run("conditional set vs newer", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional del vs nothing", func(t *testing.T) {
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
})
t.Run("conditional del vs older", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if got.Value != nil {
t.Error(got)
} else if got.Version != nil {
t.Error(got)
}
})
t.Run("conditional del vs equal", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if got.Value != nil {
t.Error(got)
} else if got.Version != nil {
t.Error(got)
}
})
t.Run("conditional del vs newer", func(t *testing.T) {
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
if err := d.Del(ctx, key, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
}

190
replicator/filetree.go Normal file
View File

@@ -0,0 +1,190 @@
package replicator
import (
"context"
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
)
type FileTree string
func NewFileTree(root string) (FileTree, error) {
return FileTree(root), nil
}
func (tree FileTree) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
result := make(chan KeyVersion)
var final error
go func() {
defer close(result)
if err := filepath.Walk(path.Dir(tree.realpath(Key{})), func(p string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return ctx.Err()
}
key := tree.toKey(p)
version, err := tree.getVersion(key)
if err != nil {
return err
}
select {
case result <- KeyVersion{Key: key, Version: version}:
case <-ctx.Done():
}
return ctx.Err()
}); err != nil && !os.IsNotExist(err) {
final = err
}
}()
return result, &final
}
func (tree FileTree) Get(ctx context.Context, key Key) (ValueVersion, error) {
version, err := tree.getVersion(key)
if version == nil {
return ValueVersion{}, err
}
f, err := os.Open(tree.realpath(key))
if err != nil {
return ValueVersion{}, err
}
defer f.Close()
b, _ := io.ReadAll(f)
return ValueVersion{
Value: b,
Version: version,
}, nil
}
func (tree FileTree) Set(ctx context.Context, key Key, value Value, version Version) error {
v, err := tree.fromVersion(version)
if err != nil {
return err
}
if oldv, err := tree._getVersion(key); err != nil {
return err
} else if v.Before(oldv) {
return nil // conflict
}
f2, err := ioutil.TempFile(os.TempDir(), "*.*")
if err != nil {
return err
}
defer os.Remove(f2.Name())
if n, err := f2.Write(value); err != nil {
return err
} else if n != len(value) {
return fmt.Errorf("truncated write")
}
if err := f2.Close(); err != nil {
return err
}
if err := os.Chtimes(f2.Name(), time.Time{}, v); err != nil {
return err
}
os.MkdirAll(path.Dir(tree.realpath(key)), os.ModePerm)
return os.Rename(f2.Name(), tree.realpath(key))
}
func (tree FileTree) Del(ctx context.Context, key Key, version Version) error {
if version != nil {
v, err := tree.fromVersion(version)
if err != nil {
return err
}
if oldv, err := tree._getVersion(key); err != nil {
return err
} else if v.Before(oldv) {
return nil // conflict
}
}
if err := os.Remove(tree.realpath(key)); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
return nil
}
func (tree FileTree) realpath(key Key) string {
namespace := ""
tmp := path.Clean("/" + key.Namespace)
for path.Dir(tmp) != tmp {
_, base := path.Split(tmp)
namespace = path.Join(base+".d", namespace)
tmp = path.Dir(tmp)
}
return path.Join(string(tree), namespace, key.Key+".bin")
}
func (tree FileTree) getVersion(key Key) (Version, error) {
v, err := tree._getVersion(key)
if v == (time.Time{}) {
return nil, err
}
return TimeAsVersion(v), err
}
func (tree FileTree) _getVersion(key Key) (time.Time, error) {
info, err := os.Stat(tree.realpath(key))
if os.IsNotExist(err) {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, err
}
return info.ModTime(), nil
}
func (tree FileTree) fromVersion(v Version) (time.Time, error) {
if len(v) == 0 {
return time.Time{}, nil
}
n, err := strconv.ParseInt(string(v), 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, n), nil
}
func (tree FileTree) toKey(p string) Key {
p = path.Clean(p)
p = strings.TrimPrefix(p, path.Clean(string(tree)))
p = strings.TrimPrefix(p, "/")
p = strings.TrimSuffix(p, ".bin")
p = strings.ReplaceAll(p, ".d/", "/")
namespace := path.Dir(p)
if namespace == "/" || namespace == "." {
namespace = ""
}
key := path.Base(p)
if key == "" {
key = namespace
namespace = ""
}
return Key{
Namespace: namespace,
Key: key,
}
}

View File

@@ -0,0 +1,57 @@
package replicator
import (
"testing"
)
func TestFileTree(t *testing.T) {
d := t.TempDir()
tree, err := NewFileTree(d)
if err != nil {
t.Fatal(err)
}
testDriver(t, tree)
}
func TestFileTreeRealpath(t *testing.T) {
root := "/root"
base := "base"
cases := map[string]string{
"a/b": "/root/a.d/b.d/base.bin",
"a": "/root/a.d/base.bin",
"": "/root/base.bin",
}
for given, wantd := range cases {
want := wantd
t.Run(given, func(t *testing.T) {
key := Key{Key: base, Namespace: given}
got := FileTree(root).realpath(key)
if got != want {
t.Error(got)
}
})
}
}
func TestFileTreeToKey(t *testing.T) {
root := "/root"
cases := map[string]Key{
"/root/a.d/b.bin": {Namespace: "a", Key: "b"},
"//root/a.d/b.bin": {Namespace: "a", Key: "b"},
"/root/b.bin": {Namespace: "", Key: "b"},
"/root/c.d/a.d/b.bin": {Namespace: "c/a", Key: "b"},
"/root/c/a.d/b.bin": {Namespace: "c/a", Key: "b"},
}
for given, wantd := range cases {
want := wantd
t.Run(given, func(t *testing.T) {
got := FileTree(root).toKey(given)
if got != want {
t.Error(got)
}
})
}
}

20
replicator/main.go Normal file
View File

@@ -0,0 +1,20 @@
package replicator
import (
"context"
)
func Main(ctx context.Context, config Config) error {
src, err := NewDriver(ctx, config.Src.Driver)
if err != nil {
return err
}
dest, err := NewDriver(ctx, config.Dest.Driver)
if err != nil {
return err
}
replicator := NewReplicator(dest, src)
return replicator.Stream(ctx)
}

84
replicator/map.go Normal file
View File

@@ -0,0 +1,84 @@
package replicator
import (
"context"
"sync"
"time"
)
type Map struct {
m map[Key]ValueVersion
lock *sync.RWMutex
}
func NewMap() Map {
return Map{
m: make(map[Key]ValueVersion),
lock: &sync.RWMutex{},
}
}
func (m Map) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
result := make(chan KeyVersion)
var err error
go func() {
defer close(result)
m.lock.RLock()
defer m.lock.RUnlock()
for k := range m.m {
select {
case result <- KeyVersion{Key: k, Version: m.m[k].Version}:
case <-ctx.Done():
err = ctx.Err()
return
}
}
}()
return result, &err
}
func (m Map) Get(_ context.Context, k Key) (ValueVersion, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.m[k], nil
}
func (m Map) Set(_ context.Context, key Key, value Value, version Version) error {
m.lock.Lock()
defer m.lock.Unlock()
if version != nil {
if was, ok := m.m[key]; !ok {
} else if wasVersion, err := was.Version.AsTime(); err != nil {
return err
} else if wantVersion, err := version.AsTime(); err != nil {
return err
} else if wantVersion.Before(wasVersion) {
return nil // conflict
}
}
m.m[key] = ValueVersion{Value: value, Version: version}
return nil
}
func (m Map) Del(_ context.Context, k Key, v Version) error {
m.lock.Lock()
defer m.lock.Unlock()
if v != nil {
if was, ok := m.m[k]; !ok {
return nil
} else if wasVersion, err := was.Version.AsTime(); err != nil {
return err
} else if wantVersion, err := v.AsTime(); err != nil {
return err
} else if wantVersion.Before(wasVersion) {
return nil // conflict
}
}
delete(m.m, k)
return nil
}

9
replicator/map_test.go Normal file
View File

@@ -0,0 +1,9 @@
package replicator
import (
"testing"
)
func TestMap(t *testing.T) {
testDriver(t, NewMap())
}

40
replicator/must.go Normal file
View File

@@ -0,0 +1,40 @@
package replicator
import (
"context"
"time"
)
type Must struct {
driver Driver
}
func NewMust(driver Driver) Must {
return Must{driver: driver}
}
func (must Must) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
return must.driver.KeysSince(ctx, t)
}
func (must Must) Get(ctx context.Context, k Key) (ValueVersion, error) {
got, err := must.driver.Get(ctx, k)
if err != nil {
panic(err)
}
return got, nil
}
func (must Must) Set(ctx context.Context, k Key, v Value, ver Version) error {
if err := must.driver.Set(ctx, k, v, ver); err != nil {
panic(err)
}
return nil
}
func (must Must) Del(ctx context.Context, k Key, ver Version) error {
if err := must.driver.Del(ctx, k, ver); err != nil {
panic(err)
}
return nil
}

89
replicator/replicator.go Normal file
View File

@@ -0,0 +1,89 @@
package replicator
import (
"context"
"time"
)
type Replicator struct {
Src Driver
Dest Driver
Interval time.Duration
}
func NewReplicator(dest, src Driver) Replicator {
return Replicator{
Src: src,
Dest: dest,
Interval: time.Second,
}
}
func (r Replicator) Stream(ctx context.Context) error {
last, err := r.getContinuation(ctx)
if err != nil {
return err
}
if last == nil {
if err := r.fullStream(ctx); err != nil {
return err
}
}
return r.deltaStream(ctx)
}
func (r Replicator) getContinuation(ctx context.Context) (Version, error) {
return nil, nil // TODO
}
func (r Replicator) setContinuation(ctx context.Context, continuation Version) error {
return nil // TODO
}
func (r Replicator) deltaStream(ctx context.Context) error {
continuation, err := r.getContinuation(ctx)
if err != nil {
return err
}
for ctx.Err() == nil {
if err := r._deltaStream(ctx, continuation); err != nil {
return err
}
select {
case <-ctx.Done():
case <-time.After(r.Interval):
}
}
return ctx.Err()
}
func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error {
last, err := continuation.AsTime()
if err != nil {
return err
}
keys, after := r.Src.KeysSince(ctx, last)
for keyVersion := range keys {
if t, err := keyVersion.Version.AsTime(); err != nil {
return err
} else if t.After(last) {
last = t
}
valueVersion, err := r.Src.Get(ctx, keyVersion.Key)
if err != nil {
return err
}
if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil {
return err
}
r.setContinuation(ctx, TimeAsVersion(last))
}
return *after
}
func (r Replicator) fullStream(ctx context.Context) error {
return r._deltaStream(ctx, nil)
}

View File

@@ -0,0 +1,93 @@
package replicator
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestReplicatorStream(t *testing.T) {
t.Parallel()
key := Key{Namespace: "x/y", Key: "z"}
value := Value("hello world")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Hour))
version := TimeAsVersion(time.Now())
cases := map[string]struct {
before func(*testing.T, Replicator)
during func(*testing.T, Replicator)
after func(*testing.T, Replicator)
}{
"noop": {},
"one prior op moves": {
before: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"one during op moves": {
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"dont overwrite newer received later": {
before: func(t *testing.T, r Replicator) {
r.Dest.Set(nil, key, value, version)
},
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, oldVersion)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
replicator.Interval = time.Millisecond * 10
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
if c.before != nil {
c.before(t, replicator)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
t.Error(err)
}
}(t)
time.Sleep(time.Millisecond * 150)
if c.during != nil {
c.during(t, replicator)
}
time.Sleep(time.Millisecond * 150)
can()
wg.Wait()
if c.after != nil {
c.after(t, replicator)
}
})
}
}