diff --git a/replicator/replicator_test.go b/replicator/replicator_test.go index 8cda71d..3b1b337 100644 --- a/replicator/replicator_test.go +++ b/replicator/replicator_test.go @@ -9,8 +9,11 @@ import ( ) func TestReplicatorStream(t *testing.T) { + t.Parallel() + key := Key{Namespace: "x/y", Key: "z"} value := Value("hello world") + oldVersion := TimeAsVersion(time.Now().Add(-1 * time.Hour)) version := TimeAsVersion(time.Now()) cases := map[string]struct { @@ -23,9 +26,6 @@ func TestReplicatorStream(t *testing.T) { before: func(t *testing.T, r Replicator) { r.Src.Set(nil, key, value, version) }, - during: func(t *testing.T, r Replicator) { - time.Sleep(time.Millisecond * 200) - }, after: func(t *testing.T, r Replicator) { if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) { t.Error(got) @@ -35,7 +35,19 @@ func TestReplicatorStream(t *testing.T) { "one during op moves": { during: func(t *testing.T, r Replicator) { r.Src.Set(nil, key, value, version) - time.Sleep(time.Millisecond * 200) + }, + after: func(t *testing.T, r Replicator) { + if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) { + t.Error(got) + } + }, + }, + "dont overwrite newer received later": { + before: func(t *testing.T, r Replicator) { + r.Dest.Set(nil, key, value, version) + }, + during: func(t *testing.T, r Replicator) { + r.Src.Set(nil, key, value, oldVersion) }, after: func(t *testing.T, r Replicator) { if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) { @@ -49,6 +61,7 @@ func TestReplicatorStream(t *testing.T) { c := d t.Run(name, func(t *testing.T) { replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap())) + replicator.Interval = time.Millisecond * 10 ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() @@ -64,9 +77,11 @@ func TestReplicatorStream(t *testing.T) { t.Error(err) } }(t) + time.Sleep(time.Millisecond * 150) if c.during != nil { c.during(t, replicator) } + time.Sleep(time.Millisecond * 150) can() wg.Wait()