package replicator import ( "context" "time" ) type Replicator struct { Src Driver Dest Driver Interval time.Duration } func NewReplicator(src, dest Driver) Replicator { return Replicator{ Src: src, Dest: dest, Interval: time.Second, } } func (r Replicator) Stream(ctx context.Context) error { last, err := r.getContinuation(ctx) if err != nil { return err } if last == nil { if err := r.fullStream(ctx); err != nil { return err } } return r.deltaStream(ctx) } func (r Replicator) getContinuation(ctx context.Context) (Version, error) { return nil, nil // TODO } func (r Replicator) setContinuation(ctx context.Context, continuation Version) error { return nil // TODO } func (r Replicator) deltaStream(ctx context.Context) error { continuation, err := r.getContinuation(ctx) if err != nil { return err } for ctx.Err() == nil { if err := r._deltaStream(ctx, continuation); err != nil { return err } select { case <-ctx.Done(): case <-time.After(r.Interval): } } return ctx.Err() } func (r Replicator) _deltaStream(ctx context.Context, continuation Version) error { last, err := continuation.AsTime() if err != nil { return err } keys, after := r.Src.KeysSince(ctx, last) for keyVersion := range keys { if t, err := keyVersion.Version.AsTime(); err != nil { return err } else if t.After(last) { last = t } valueVersion, err := r.Src.Get(ctx, keyVersion.Key) if err != nil { return err } if err := r.Dest.Set(ctx, keyVersion.Key, valueVersion.Value, valueVersion.Version); err != nil { return err } r.setContinuation(ctx, TimeAsVersion(last)) } return *after } func (r Replicator) fullStream(ctx context.Context) error { return r._deltaStream(ctx, nil) }