Compare commits

...

14 Commits

Author SHA1 Message Date
Bel LaPointe
4da502676a readme 2023-11-06 08:09:51 -07:00
Bel LaPointe
d16056bcc2 mvp 2023-11-06 06:28:39 -07:00
Bel LaPointe
11f1c9cefd Duration for JSON/YAML marshalling 2023-11-06 06:17:13 -07:00
Bel LaPointe
7a88fc6f5e more test and test replicator parallel 2023-11-06 06:00:22 -07:00
Bel LaPointe
38a3df7bda it is a naiive thing 2023-11-06 05:49:34 -07:00
Bel LaPointe
071935f0f3 it is a naiive thing 2023-11-06 05:49:26 -07:00
Bel LaPointe
1a586de656 it is a naiive thing 2023-11-06 05:49:08 -07:00
bel
6d6df1da80 tdd is fun sometimes 2023-11-05 21:40:01 -07:00
bel
5e7381c04b test fails appropriately 2023-11-05 21:35:28 -07:00
bel
aa8f348b11 add Must driver 2023-11-05 21:21:30 -07:00
bel
9aa56d2b9a tests pass 2023-11-05 20:50:47 -07:00
bel
2d30ee39af new tests show one case fails 2023-11-05 20:42:32 -07:00
bel
6856882ba8 stub more tests 2023-11-05 20:04:56 -07:00
bel
e227483614 todos 2023-11-05 20:00:22 -07:00
14 changed files with 589 additions and 37 deletions

View File

@@ -2,5 +2,5 @@
Given source DB,
1. fully sync to local db
1. continuously sync to local db
1. fully sync from src to dest db
1. continuously sync from src to dest db

2
go.mod
View File

@@ -1,3 +1,5 @@
module replicator
go 1.21.1
require gopkg.in/yaml.v3 v3.0.1 // indirect

3
go.sum Normal file
View File

@@ -0,0 +1,3 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

53
main.go
View File

@@ -2,16 +2,67 @@ package main
import (
"context"
"flag"
"net/url"
"os"
"os/signal"
"replicator/replicator"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := replicator.Main(ctx); err != nil && ctx.Err() == nil {
config, err := getConfig()
if err != nil {
panic(err)
}
if err := replicator.Main(ctx, config); err != nil && ctx.Err() == nil {
panic(err)
}
}
func getConfig() (replicator.Config, error) {
var config replicator.Config
if p := os.Getenv("CONFIG"); p != "" {
f, err := os.Open(p)
if err != nil {
return config, err
}
defer f.Close()
if err := yaml.NewDecoder(f).Decode(&config); err != nil {
return config, err
}
}
var srcDriver, destDriver, interval string
flag.StringVar(&srcDriver, "src", "map://", "source db")
flag.StringVar(&destDriver, "dest", "map://", "dest db")
flag.StringVar(&interval, "interval", "1s", "interval")
flag.Parse()
if srcDriver, err := url.Parse(srcDriver); err != nil {
return config, err
} else {
config.Src.Driver = *srcDriver
}
if destDriver, err := url.Parse(destDriver); err != nil {
return config, err
} else {
config.Dest.Driver = *destDriver
}
if interval, err := time.ParseDuration(interval); err != nil {
return config, err
} else {
config.Interval = replicator.Duration(interval)
}
return config, nil
}

View File

@@ -1,20 +1,53 @@
package replicator
import "net/url"
import (
"encoding/json"
"net/url"
"time"
"gopkg.in/yaml.v3"
)
type (
Config struct {
Local Storage
Remote Storage
Src Storage
Dest Storage
Interval Duration
}
Storage struct {
Driver url.URL
Full Stream
Incremental Stream
Driver url.URL
}
Stream struct {
Disabled bool
}
Duration time.Duration
)
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
d2, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(d2)
return nil
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
b, _ := json.Marshal(s)
return d.UnmarshalJSON(b)
}
func (d Duration) MarshalYAML() (interface{}, error) {
return time.Duration(d).String(), nil
}

30
replicator/config_test.go Normal file
View File

@@ -0,0 +1,30 @@
package replicator
import (
"encoding/json"
"testing"
"time"
"gopkg.in/yaml.v3"
)
func TestDuration(t *testing.T) {
d := Duration(3*time.Second + time.Minute)
var d2 Duration
if b, err := json.Marshal(d); err != nil {
t.Fatal(err)
} else if err := json.Unmarshal(b, &d2); err != nil {
t.Fatal(err)
} else if d != d2 {
t.Fatal(d, d2)
}
var d3 Duration
if b, err := yaml.Marshal(d); err != nil {
t.Fatal(err)
} else if err := yaml.Unmarshal(b, &d3); err != nil {
t.Fatal(err)
} else if d != d3 {
t.Fatal(d, d3)
}
}

View File

@@ -1,10 +1,12 @@
package replicator
import (
"bytes"
"context"
"fmt"
"net/url"
"path"
"strconv"
"time"
)
@@ -27,11 +29,11 @@ type (
KeyVersion struct {
Key Key
Version []byte
Version Version
}
ValueVersion struct {
Value []byte
Value Value
Version Version
}
)
@@ -51,10 +53,27 @@ func NewDriver(ctx context.Context, driver url.URL) (Driver, error) {
}
}
func (version Version) ToTime() (time.Time, error) {
panic(nil) // TODO
func (version Version) AsTime() (time.Time, error) {
if len(version) == 0 {
return time.Time{}, nil
}
n, err := strconv.ParseInt(string(version), 10, 64)
return time.Unix(0, n), err
}
func VersionFromTime(t time.Time) Version {
panic(nil) // TODO
func TimeAsVersion(t time.Time) Version {
n := t.UnixNano()
return []byte(strconv.FormatInt(n, 10))
}
func (version Version) Equal(other Version) bool {
return bytes.Equal(version, other)
}
func (key Key) Equal(other Key) bool {
return key == other
}
func (value Value) Equal(other Value) bool {
return bytes.Equal(value, other)
}

View File

@@ -10,6 +10,7 @@ import (
func TestDriverInterface(t *testing.T) {
var _ Driver = FileTree("")
var _ Driver = Map{}
var _ Driver = Must{}
}
func testDriver(t *testing.T, d Driver) {
@@ -17,7 +18,8 @@ func testDriver(t *testing.T, d Driver) {
defer can()
key := Key{Namespace: "x/y", Key: "z"}
value := ValueVersion{Value: []byte(t.Name()), Version: []byte("1")}
version := TimeAsVersion(time.Now())
value := Value([]byte(t.Name()))
t.Run("get does not exist", func(t *testing.T) {
v, err := d.Get(ctx, key)
@@ -30,7 +32,7 @@ func testDriver(t *testing.T, d Driver) {
})
t.Run("404 set get", func(t *testing.T) {
if err := d.Set(ctx, key, value.Value, value.Version); err != nil {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
@@ -38,11 +40,11 @@ func testDriver(t *testing.T, d Driver) {
if err != nil {
t.Errorf("getting key returned an err: %v", err)
}
if !bytes.Equal(v.Value, value.Value) {
t.Errorf("value didnt match set-get: want %q, got %q", value.Value, v.Value)
if !bytes.Equal(v.Value, value) {
t.Errorf("value didnt match set-get: want %q, got %q", value, v.Value)
}
if !bytes.Equal(v.Version, value.Version) {
t.Errorf("version didnt match set-get: want %q, got %q", value.Version, v.Version)
if !bytes.Equal(v.Version, version) {
t.Errorf("version didnt match set-get: want %q, got %q", version, v.Version)
}
if err := d.Del(ctx, key, nil); err != nil {
@@ -61,7 +63,7 @@ func testDriver(t *testing.T, d Driver) {
})
t.Run("keys of one key", func(t *testing.T) {
if err := d.Set(ctx, key, value.Value, value.Version); err != nil {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
@@ -72,7 +74,7 @@ func testDriver(t *testing.T, d Driver) {
n += 1
if got.Key != key {
t.Error(got)
} else if !bytes.Equal(got.Version, value.Version) {
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
}
@@ -83,4 +85,178 @@ func testDriver(t *testing.T, d Driver) {
t.Error(*err)
}
})
t.Run("conditional set vs nothing", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional set vs older", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, oldValue) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional set vs equal", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, oldValue) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
if err := d.Set(ctx, key, value, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, oldVersion) {
t.Error(got)
}
})
t.Run("conditional set vs newer", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
t.Run("conditional del vs nothing", func(t *testing.T) {
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
})
t.Run("conditional del vs older", func(t *testing.T) {
oldValue := []byte("teehee")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, oldValue, oldVersion); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if got.Value != nil {
t.Error(got)
} else if got.Version != nil {
t.Error(got)
}
})
t.Run("conditional del vs equal", func(t *testing.T) {
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if err := d.Del(ctx, key, version); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if got.Value != nil {
t.Error(got)
} else if got.Version != nil {
t.Error(got)
}
})
t.Run("conditional del vs newer", func(t *testing.T) {
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Minute))
if err := d.Set(ctx, key, value, version); err != nil {
t.Fatal(err)
}
defer d.Del(ctx, key, nil)
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
if err := d.Del(ctx, key, oldVersion); err != nil {
t.Fatal(err)
}
if got, err := d.Get(ctx, key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(got.Value, value) {
t.Error(got)
} else if !bytes.Equal(got.Version, version) {
t.Error(got)
}
})
}

View File

@@ -118,7 +118,13 @@ func (tree FileTree) Del(ctx context.Context, key Key, version Version) error {
}
}
return os.Remove(tree.realpath(key))
if err := os.Remove(tree.realpath(key)); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
return nil
}
func (tree FileTree) realpath(key Key) string {
@@ -132,12 +138,12 @@ func (tree FileTree) realpath(key Key) string {
return path.Join(string(tree), namespace, key.Key+".bin")
}
func (tree FileTree) getVersion(key Key) ([]byte, error) {
func (tree FileTree) getVersion(key Key) (Version, error) {
v, err := tree._getVersion(key)
if v == (time.Time{}) {
return nil, err
}
return []byte(strconv.FormatInt(v.UnixNano(), 10)), err
return TimeAsVersion(v), err
}
func (tree FileTree) _getVersion(key Key) (time.Time, error) {
@@ -151,7 +157,7 @@ func (tree FileTree) _getVersion(key Key) (time.Time, error) {
return info.ModTime(), nil
}
func (tree FileTree) fromVersion(v []byte) (time.Time, error) {
func (tree FileTree) fromVersion(v Version) (time.Time, error) {
if len(v) == 0 {
return time.Time{}, nil
}

20
replicator/main.go Normal file
View File

@@ -0,0 +1,20 @@
package replicator
import (
"context"
)
func Main(ctx context.Context, config Config) error {
src, err := NewDriver(ctx, config.Src.Driver)
if err != nil {
return err
}
dest, err := NewDriver(ctx, config.Dest.Driver)
if err != nil {
return err
}
replicator := NewReplicator(dest, src)
return replicator.Stream(ctx)
}

View File

@@ -49,9 +49,9 @@ func (m Map) Set(_ context.Context, key Key, value Value, version Version) error
if version != nil {
if was, ok := m.m[key]; !ok {
} else if wasVersion, err := was.Version.ToTime(); err != nil {
} else if wasVersion, err := was.Version.AsTime(); err != nil {
return err
} else if wantVersion, err := version.ToTime(); err != nil {
} else if wantVersion, err := version.AsTime(); err != nil {
return err
} else if wantVersion.Before(wasVersion) {
return nil // conflict
@@ -69,9 +69,9 @@ func (m Map) Del(_ context.Context, k Key, v Version) error {
if v != nil {
if was, ok := m.m[k]; !ok {
return nil
} else if wasVersion, err := was.Version.ToTime(); err != nil {
} else if wasVersion, err := was.Version.AsTime(); err != nil {
return err
} else if wantVersion, err := v.ToTime(); err != nil {
} else if wantVersion, err := v.AsTime(); err != nil {
return err
} else if wantVersion.Before(wasVersion) {
return nil // conflict

40
replicator/must.go Normal file
View File

@@ -0,0 +1,40 @@
package replicator
import (
"context"
"time"
)
type Must struct {
driver Driver
}
func NewMust(driver Driver) Must {
return Must{driver: driver}
}
func (must Must) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
return must.driver.KeysSince(ctx, t)
}
func (must Must) Get(ctx context.Context, k Key) (ValueVersion, error) {
got, err := must.driver.Get(ctx, k)
if err != nil {
panic(err)
}
return got, nil
}
func (must Must) Set(ctx context.Context, k Key, v Value, ver Version) error {
if err := must.driver.Set(ctx, k, v, ver); err != nil {
panic(err)
}
return nil
}
func (must Must) Del(ctx context.Context, k Key, ver Version) error {
if err := must.driver.Del(ctx, k, ver); err != nil {
panic(err)
}
return nil
}

View File

@@ -2,9 +2,88 @@ package replicator
import (
"context"
"errors"
"time"
)
func Main(ctx context.Context) error {
return errors.New("not impl")
type Replicator struct {
Src Driver
Dest Driver
Interval time.Duration
}
func NewReplicator(dest, src Driver) Replicator {
return Replicator{
Src: src,
Dest: dest,
Interval: time.Second,
}
}
func (r Replicator) Stream(ctx context.Context) error {
last, err := r.getContinuation(ctx)
if err != nil {
return err
}
if last == nil {
if err := r.fullStream(ctx); err != nil {
return err
}
}
return r.deltaStream(ctx)
}
func (r Replicator) getContinuation(ctx context.Context) (Version, error) {
return nil, nil // TODO
}
func (r Replicator) setContinuation(ctx context.Context, continuation Version) error {
return nil // TODO
}
func (r Replicator) deltaStream(ctx context.Context) error {
continuation, err := r.getContinuation(ctx)
if err != nil {
return err
}
for ctx.Err() == nil {
if err := r._deltaStream(ctx, continuation); err != nil {
return err
}
select {
case <-ctx.Done():
case <-time.After(r.Interval):
}
}
return ctx.Err()
}
func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error {
last, err := continuation.AsTime()
if err != nil {
return err
}
keys, after := r.Src.KeysSince(ctx, last)
for keyVersion := range keys {
if t, err := keyVersion.Version.AsTime(); err != nil {
return err
} else if t.After(last) {
last = t
}
valueVersion, err := r.Src.Get(ctx, keyVersion.Key)
if err != nil {
return err
}
if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil {
return err
}
r.setContinuation(ctx, TimeAsVersion(last))
}
return *after
}
func (r Replicator) fullStream(ctx context.Context) error {
return r._deltaStream(ctx, nil)
}

View File

@@ -0,0 +1,93 @@
package replicator
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestReplicatorStream(t *testing.T) {
t.Parallel()
key := Key{Namespace: "x/y", Key: "z"}
value := Value("hello world")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Hour))
version := TimeAsVersion(time.Now())
cases := map[string]struct {
before func(*testing.T, Replicator)
during func(*testing.T, Replicator)
after func(*testing.T, Replicator)
}{
"noop": {},
"one prior op moves": {
before: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"one during op moves": {
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"dont overwrite newer received later": {
before: func(t *testing.T, r Replicator) {
r.Dest.Set(nil, key, value, version)
},
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, oldVersion)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
replicator.Interval = time.Millisecond * 10
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
if c.before != nil {
c.before(t, replicator)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
t.Error(err)
}
}(t)
time.Sleep(time.Millisecond * 150)
if c.during != nil {
c.during(t, replicator)
}
time.Sleep(time.Millisecond * 150)
can()
wg.Wait()
if c.after != nil {
c.after(t, replicator)
}
})
}
}