Compare commits

...

7 Commits

Author SHA1 Message Date
Bel LaPointe
4da502676a readme 2023-11-06 08:09:51 -07:00
Bel LaPointe
d16056bcc2 mvp 2023-11-06 06:28:39 -07:00
Bel LaPointe
11f1c9cefd Duration for JSON/YAML marshalling 2023-11-06 06:17:13 -07:00
Bel LaPointe
7a88fc6f5e more test and test replicator parallel 2023-11-06 06:00:22 -07:00
Bel LaPointe
38a3df7bda it is a naiive thing 2023-11-06 05:49:34 -07:00
Bel LaPointe
071935f0f3 it is a naiive thing 2023-11-06 05:49:26 -07:00
Bel LaPointe
1a586de656 it is a naiive thing 2023-11-06 05:49:08 -07:00
9 changed files with 240 additions and 26 deletions

View File

@@ -2,5 +2,5 @@
Given source DB,
1. fully sync to local db
1. continuously sync to local db
1. fully sync from src to dest db
1. continuously sync from src to dest db

2
go.mod
View File

@@ -1,3 +1,5 @@
module replicator
go 1.21.1
require gopkg.in/yaml.v3 v3.0.1 // indirect

3
go.sum Normal file
View File

@@ -0,0 +1,3 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

53
main.go
View File

@@ -2,16 +2,67 @@ package main
import (
"context"
"flag"
"net/url"
"os"
"os/signal"
"replicator/replicator"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
if err := replicator.Main(ctx); err != nil && ctx.Err() == nil {
config, err := getConfig()
if err != nil {
panic(err)
}
if err := replicator.Main(ctx, config); err != nil && ctx.Err() == nil {
panic(err)
}
}
func getConfig() (replicator.Config, error) {
var config replicator.Config
if p := os.Getenv("CONFIG"); p != "" {
f, err := os.Open(p)
if err != nil {
return config, err
}
defer f.Close()
if err := yaml.NewDecoder(f).Decode(&config); err != nil {
return config, err
}
}
var srcDriver, destDriver, interval string
flag.StringVar(&srcDriver, "src", "map://", "source db")
flag.StringVar(&destDriver, "dest", "map://", "dest db")
flag.StringVar(&interval, "interval", "1s", "interval")
flag.Parse()
if srcDriver, err := url.Parse(srcDriver); err != nil {
return config, err
} else {
config.Src.Driver = *srcDriver
}
if destDriver, err := url.Parse(destDriver); err != nil {
return config, err
} else {
config.Dest.Driver = *destDriver
}
if interval, err := time.ParseDuration(interval); err != nil {
return config, err
} else {
config.Interval = replicator.Duration(interval)
}
return config, nil
}

View File

@@ -1,20 +1,53 @@
package replicator
import "net/url"
import (
"encoding/json"
"net/url"
"time"
"gopkg.in/yaml.v3"
)
type (
Config struct {
Local Storage
Remote Storage
Src Storage
Dest Storage
Interval Duration
}
Storage struct {
Driver url.URL
Full Stream
Incremental Stream
}
Stream struct {
Disabled bool
}
Duration time.Duration
)
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
d2, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(d2)
return nil
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
var s string
if err := value.Decode(&s); err != nil {
return err
}
b, _ := json.Marshal(s)
return d.UnmarshalJSON(b)
}
func (d Duration) MarshalYAML() (interface{}, error) {
return time.Duration(d).String(), nil
}

30
replicator/config_test.go Normal file
View File

@@ -0,0 +1,30 @@
package replicator
import (
"encoding/json"
"testing"
"time"
"gopkg.in/yaml.v3"
)
func TestDuration(t *testing.T) {
d := Duration(3*time.Second + time.Minute)
var d2 Duration
if b, err := json.Marshal(d); err != nil {
t.Fatal(err)
} else if err := json.Unmarshal(b, &d2); err != nil {
t.Fatal(err)
} else if d != d2 {
t.Fatal(d, d2)
}
var d3 Duration
if b, err := yaml.Marshal(d); err != nil {
t.Fatal(err)
} else if err := yaml.Unmarshal(b, &d3); err != nil {
t.Fatal(err)
} else if d != d3 {
t.Fatal(d, d3)
}
}

View File

@@ -2,9 +2,19 @@ package replicator
import (
"context"
"errors"
)
func Main(ctx context.Context) error {
return errors.New("not impl")
func Main(ctx context.Context, config Config) error {
src, err := NewDriver(ctx, config.Src.Driver)
if err != nil {
return err
}
dest, err := NewDriver(ctx, config.Dest.Driver)
if err != nil {
return err
}
replicator := NewReplicator(dest, src)
return replicator.Stream(ctx)
}

View File

@@ -2,21 +2,88 @@ package replicator
import (
"context"
"io"
"time"
)
type Replicator struct {
Src Driver
Dest Driver
Interval time.Duration
}
func NewReplicator(src, dest Driver) Replicator {
func NewReplicator(dest, src Driver) Replicator {
return Replicator{
Src: src,
Dest: dest,
Interval: time.Second,
}
}
func (r Replicator) Stream(ctx context.Context) error {
return io.EOF
last, err := r.getContinuation(ctx)
if err != nil {
return err
}
if last == nil {
if err := r.fullStream(ctx); err != nil {
return err
}
}
return r.deltaStream(ctx)
}
func (r Replicator) getContinuation(ctx context.Context) (Version, error) {
return nil, nil // TODO
}
func (r Replicator) setContinuation(ctx context.Context, continuation Version) error {
return nil // TODO
}
func (r Replicator) deltaStream(ctx context.Context) error {
continuation, err := r.getContinuation(ctx)
if err != nil {
return err
}
for ctx.Err() == nil {
if err := r._deltaStream(ctx, continuation); err != nil {
return err
}
select {
case <-ctx.Done():
case <-time.After(r.Interval):
}
}
return ctx.Err()
}
func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error {
last, err := continuation.AsTime()
if err != nil {
return err
}
keys, after := r.Src.KeysSince(ctx, last)
for keyVersion := range keys {
if t, err := keyVersion.Version.AsTime(); err != nil {
return err
} else if t.After(last) {
last = t
}
valueVersion, err := r.Src.Get(ctx, keyVersion.Key)
if err != nil {
return err
}
if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil {
return err
}
r.setContinuation(ctx, TimeAsVersion(last))
}
return *after
}
func (r Replicator) fullStream(ctx context.Context) error {
return r._deltaStream(ctx, nil)
}

View File

@@ -9,8 +9,11 @@ import (
)
func TestReplicatorStream(t *testing.T) {
t.Parallel()
key := Key{Namespace: "x/y", Key: "z"}
value := Value("hello world")
oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Hour))
version := TimeAsVersion(time.Now())
cases := map[string]struct {
@@ -32,7 +35,19 @@ func TestReplicatorStream(t *testing.T) {
"one during op moves": {
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
time.Sleep(time.Millisecond * 200)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"dont overwrite newer received later": {
before: func(t *testing.T, r Replicator) {
r.Dest.Set(nil, key, value, version)
},
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, oldVersion)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
@@ -46,6 +61,7 @@ func TestReplicatorStream(t *testing.T) {
c := d
t.Run(name, func(t *testing.T) {
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
replicator.Interval = time.Millisecond * 10
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
@@ -55,15 +71,17 @@ func TestReplicatorStream(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go func(t *testing.T) {
defer wg.Done()
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
t.Fatal(err)
t.Error(err)
}
}()
}(t)
time.Sleep(time.Millisecond * 150)
if c.during != nil {
c.during(t, replicator)
}
time.Sleep(time.Millisecond * 150)
can()
wg.Wait()