Compare commits

...

10 Commits

Author SHA1 Message Date
Bel LaPointe
4da502676a readme 2023-11-06 08:09:51 -07:00
Bel LaPointe
d16056bcc2 mvp 2023-11-06 06:28:39 -07:00
Bel LaPointe
11f1c9cefd Duration for JSON/YAML marshalling 2023-11-06 06:17:13 -07:00
Bel LaPointe
7a88fc6f5e more test and test replicator parallel 2023-11-06 06:00:22 -07:00
Bel LaPointe
38a3df7bda it is a naiive thing 2023-11-06 05:49:34 -07:00
Bel LaPointe
071935f0f3 it is a naiive thing 2023-11-06 05:49:26 -07:00
Bel LaPointe
1a586de656 it is a naiive thing 2023-11-06 05:49:08 -07:00
bel
6d6df1da80 tdd is fun sometimes 2023-11-05 21:40:01 -07:00
bel
5e7381c04b test fails appropriately 2023-11-05 21:35:28 -07:00
bel
aa8f348b11 add Must driver 2023-11-05 21:21:30 -07:00
12 changed files with 367 additions and 24 deletions

View File

@@ -2,5 +2,5 @@
Given source DB, Given source DB,
1. fully sync to local db 1. fully sync from src to dest db
1. continuously sync to local db 1. continuously sync from src to dest db

2
go.mod
View File

@@ -1,3 +1,5 @@
module replicator module replicator
go 1.21.1 go 1.21.1
require gopkg.in/yaml.v3 v3.0.1 // indirect

3
go.sum Normal file
View File

@@ -0,0 +1,3 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

53
main.go
View File

@@ -2,16 +2,67 @@ package main
import ( import (
"context" "context"
"flag"
"net/url"
"os"
"os/signal" "os/signal"
"replicator/replicator" "replicator/replicator"
"syscall" "syscall"
"time"
"gopkg.in/yaml.v3"
) )
func main() { func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can() defer can()
if err := replicator.Main(ctx); err != nil && ctx.Err() == nil { config, err := getConfig()
if err != nil {
panic(err)
}
if err := replicator.Main(ctx, config); err != nil && ctx.Err() == nil {
panic(err) panic(err)
} }
} }
func getConfig() (replicator.Config, error) {
var config replicator.Config
if p := os.Getenv("CONFIG"); p != "" {
f, err := os.Open(p)
if err != nil {
return config, err
}
defer f.Close()
if err := yaml.NewDecoder(f).Decode(&config); err != nil {
return config, err
}
}
var srcDriver, destDriver, interval string
flag.StringVar(&srcDriver, "src", "map://", "source db")
flag.StringVar(&destDriver, "dest", "map://", "dest db")
flag.StringVar(&interval, "interval", "1s", "interval")
flag.Parse()
if srcDriver, err := url.Parse(srcDriver); err != nil {
return config, err
} else {
config.Src.Driver = *srcDriver
}
if destDriver, err := url.Parse(destDriver); err != nil {
return config, err
} else {
config.Dest.Driver = *destDriver
}
if interval, err := time.ParseDuration(interval); err != nil {
return config, err
} else {
config.Interval = replicator.Duration(interval)
}
return config, nil
}

View File

@@ -1,20 +1,53 @@
package replicator package replicator
import "net/url" import (
"encoding/json"
"net/url"
"time"
"gopkg.in/yaml.v3"
)
type ( type (
Config struct { Config struct {
Local Storage Src Storage
Remote Storage Dest Storage
Interval Duration
} }
Storage struct { Storage struct {
Driver url.URL Driver url.URL
Full Stream
Incremental Stream
} }
Stream struct { Duration time.Duration
Disabled bool
}
) )
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
d2, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(d2)
return nil
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
b, _ := json.Marshal(s)
return d.UnmarshalJSON(b)
}
func (d Duration) MarshalYAML() (interface{}, error) {
return time.Duration(d).String(), nil
}

30
replicator/config_test.go Normal file
View File

@@ -0,0 +1,30 @@
package replicator
import (
"encoding/json"
"testing"
"time"
"gopkg.in/yaml.v3"
)
func TestDuration(t *testing.T) {
d := Duration(3*time.Second + time.Minute)
var d2 Duration
if b, err := json.Marshal(d); err != nil {
t.Fatal(err)
} else if err := json.Unmarshal(b, &d2); err != nil {
t.Fatal(err)
} else if d != d2 {
t.Fatal(d, d2)
}
var d3 Duration
if b, err := yaml.Marshal(d); err != nil {
t.Fatal(err)
} else if err := yaml.Unmarshal(b, &d3); err != nil {
t.Fatal(err)
} else if d != d3 {
t.Fatal(d, d3)
}
}

View File

@@ -1,6 +1,7 @@
package replicator package replicator
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"net/url" "net/url"
@@ -28,11 +29,11 @@ type (
KeyVersion struct { KeyVersion struct {
Key Key Key Key
Version []byte Version Version
} }
ValueVersion struct { ValueVersion struct {
Value []byte Value Value
Version Version Version Version
} }
) )
@@ -64,3 +65,15 @@ func TimeAsVersion(t time.Time) Version {
n := t.UnixNano() n := t.UnixNano()
return []byte(strconv.FormatInt(n, 10)) return []byte(strconv.FormatInt(n, 10))
} }
func (version Version) Equal(other Version) bool {
return bytes.Equal(version, other)
}
func (key Key) Equal(other Key) bool {
return key == other
}
func (value Value) Equal(other Value) bool {
return bytes.Equal(value, other)
}

View File

@@ -10,6 +10,7 @@ import (
func TestDriverInterface(t *testing.T) { func TestDriverInterface(t *testing.T) {
var _ Driver = FileTree("") var _ Driver = FileTree("")
var _ Driver = Map{} var _ Driver = Map{}
var _ Driver = Must{}
} }
func testDriver(t *testing.T, d Driver) { func testDriver(t *testing.T, d Driver) {

View File

@@ -2,9 +2,19 @@ package replicator
import ( import (
"context" "context"
"errors"
) )
func Main(ctx context.Context) error { func Main(ctx context.Context, config Config) error {
return errors.New("not impl") src, err := NewDriver(ctx, config.Src.Driver)
if err != nil {
return err
}
dest, err := NewDriver(ctx, config.Dest.Driver)
if err != nil {
return err
}
replicator := NewReplicator(dest, src)
return replicator.Stream(ctx)
} }

40
replicator/must.go Normal file
View File

@@ -0,0 +1,40 @@
package replicator
import (
"context"
"time"
)
type Must struct {
driver Driver
}
func NewMust(driver Driver) Must {
return Must{driver: driver}
}
func (must Must) KeysSince(ctx context.Context, t time.Time) (chan KeyVersion, *error) {
return must.driver.KeysSince(ctx, t)
}
func (must Must) Get(ctx context.Context, k Key) (ValueVersion, error) {
got, err := must.driver.Get(ctx, k)
if err != nil {
panic(err)
}
return got, nil
}
func (must Must) Set(ctx context.Context, k Key, v Value, ver Version) error {
if err := must.driver.Set(ctx, k, v, ver); err != nil {
panic(err)
}
return nil
}
func (must Must) Del(ctx context.Context, k Key, ver Version) error {
if err := must.driver.Del(ctx, k, ver); err != nil {
panic(err)
}
return nil
}

View File

@@ -2,21 +2,88 @@ package replicator
import ( import (
"context" "context"
"io" "time"
) )
type Replicator struct { type Replicator struct {
Src Driver Src Driver
Dest Driver Dest Driver
Interval time.Duration
} }
func NewReplicator(src, dest Driver) Replicator { func NewReplicator(dest, src Driver) Replicator {
return Replicator{ return Replicator{
Src: src, Src: src,
Dest: dest, Dest: dest,
Interval: time.Second,
} }
} }
func (r Replicator) Stream(ctx context.Context) error { func (r Replicator) Stream(ctx context.Context) error {
return io.EOF last, err := r.getContinuation(ctx)
if err != nil {
return err
}
if last == nil {
if err := r.fullStream(ctx); err != nil {
return err
}
}
return r.deltaStream(ctx)
}
func (r Replicator) getContinuation(ctx context.Context) (Version, error) {
return nil, nil // TODO
}
func (r Replicator) setContinuation(ctx context.Context, continuation Version) error {
return nil // TODO
}
func (r Replicator) deltaStream(ctx context.Context) error {
continuation, err := r.getContinuation(ctx)
if err != nil {
return err
}
for ctx.Err() == nil {
if err := r._deltaStream(ctx, continuation); err != nil {
return err
}
select {
case <-ctx.Done():
case <-time.After(r.Interval):
}
}
return ctx.Err()
}
func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error {
last, err := continuation.AsTime()
if err != nil {
return err
}
keys, after := r.Src.KeysSince(ctx, last)
for keyVersion := range keys {
if t, err := keyVersion.Version.AsTime(); err != nil {
return err
} else if t.After(last) {
last = t
}
valueVersion, err := r.Src.Get(ctx, keyVersion.Key)
if err != nil {
return err
}
if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil {
return err
}
r.setContinuation(ctx, TimeAsVersion(last))
}
return *after
}
func (r Replicator) fullStream(ctx context.Context) error {
return r._deltaStream(ctx, nil)
} }

View File

@@ -0,0 +1,93 @@
package replicator
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestReplicatorStream(t *testing.T) {
t.Parallel()
key := Key{Namespace: "x/y", Key: "z"}
value := Value("hello world")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Hour))
version := TimeAsVersion(time.Now())
cases := map[string]struct {
before func(*testing.T, Replicator)
during func(*testing.T, Replicator)
after func(*testing.T, Replicator)
}{
"noop": {},
"one prior op moves": {
before: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"one during op moves": {
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"dont overwrite newer received later": {
before: func(t *testing.T, r Replicator) {
r.Dest.Set(nil, key, value, version)
},
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, oldVersion)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
replicator.Interval = time.Millisecond * 10
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
if c.before != nil {
c.before(t, replicator)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
t.Error(err)
}
}(t)
time.Sleep(time.Millisecond * 150)
if c.during != nil {
c.during(t, replicator)
}
time.Sleep(time.Millisecond * 150)
can()
wg.Wait()
if c.after != nil {
c.after(t, replicator)
}
})
}
}