From 1a586de656f63d1379e5abb2c1a7f07b2550e7c5 Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Mon, 6 Nov 2023 05:49:08 -0700 Subject: [PATCH] it is a naiive thing --- replicator/replicator.go | 79 ++++++++++++++++++++++++++++++++--- replicator/replicator_test.go | 3 ++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/replicator/replicator.go b/replicator/replicator.go index dfde17f..3004255 100644 --- a/replicator/replicator.go +++ b/replicator/replicator.go @@ -2,21 +2,88 @@ package replicator import ( "context" - "io" + "time" ) type Replicator struct { - Src Driver - Dest Driver + Src Driver + Dest Driver + Interval time.Duration } func NewReplicator(src, dest Driver) Replicator { return Replicator{ - Src: src, - Dest: dest, + Src: src, + Dest: dest, + Interval: time.Second, } } func (r Replicator) Stream(ctx context.Context) error { - return io.EOF + last, err := r.getContinuation(ctx) + if err != nil { + return err + } + if last == nil { + if err := r.fullStream(ctx); err != nil { + return err + } + } + return r.deltaStream(ctx) +} + +func (r Replicator) getContinuation(ctx context.Context) (Version, error) { + return nil, nil // TODO +} + +func (r Replicator) setContinuation(ctx context.Context, continuation Version) error { + return nil // TODO +} + +func (r Replicator) deltaStream(ctx context.Context) error { + continuation, err := r.getContinuation(ctx) + if err != nil { + return err + } + for ctx.Err() == nil { + if err := r._deltaStream(ctx, continuation); err != nil { + return err + } + select { + case <-ctx.Done(): + case <-time.After(r.Interval): + } + } + return ctx.Err() +} + +func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error { + last, err := continuation.AsTime() + if err != nil { + return err + } + keys, after := r.Src.KeysSince(ctx, last) + for keyVersion := range keys { + if t, err := keyVersion.Version.AsTime(); err != nil { + return err + } else if t.After(last) { + last = t + } + + valueVersion, err := r.Src.Get(ctx, keyVersion.Key) + if err != nil { + return err + } + + if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil { + return err + } + + r.setContinuation(ctx, TimeAsVersion(last)) + } + return *after +} + +func (r Replicator) fullStream(ctx context.Context) error { + return r._deltaStream(ctx, nil) } diff --git a/replicator/replicator_test.go b/replicator/replicator_test.go index 33b18b7..83c3219 100644 --- a/replicator/replicator_test.go +++ b/replicator/replicator_test.go @@ -23,6 +23,9 @@ func TestReplicatorStream(t *testing.T) { before: func(t *testing.T, r Replicator) { r.Src.Set(nil, key, value, version) }, + during: func(t *testing.T, r Replicator) { + time.Sleep(time.Millisecond * 200) + }, after: func(t *testing.T, r Replicator) { if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) { t.Error(got)