replicator/replicator/replicator_test.go

79 lines
1.7 KiB
Go

package replicator
import (
"context"
"errors"
"sync"
"testing"
"time"
)
func TestReplicatorStream(t *testing.T) {
key := Key{Namespace: "x/y", Key: "z"}
value := Value("hello world")
version := TimeAsVersion(time.Now())
cases := map[string]struct {
before func(*testing.T, Replicator)
during func(*testing.T, Replicator)
after func(*testing.T, Replicator)
}{
"noop": {},
"one prior op moves": {
before: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
},
during: func(t *testing.T, r Replicator) {
time.Sleep(time.Millisecond * 200)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
"one during op moves": {
during: func(t *testing.T, r Replicator) {
r.Src.Set(nil, key, value, version)
time.Sleep(time.Millisecond * 200)
},
after: func(t *testing.T, r Replicator) {
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
t.Error(got)
}
},
},
}
for name, d := range cases {
c := d
t.Run(name, func(t *testing.T) {
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
if c.before != nil {
c.before(t, replicator)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
t.Fatal(err)
}
}()
if c.during != nil {
c.during(t, replicator)
}
can()
wg.Wait()
if c.after != nil {
c.after(t, replicator)
}
})
}
}