test fails appropriately
parent
aa8f348b11
commit
5e7381c04b
|
|
@ -1,6 +1,7 @@
|
||||||
package replicator
|
package replicator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
@ -28,11 +29,11 @@ type (
|
||||||
|
|
||||||
KeyVersion struct {
|
KeyVersion struct {
|
||||||
Key Key
|
Key Key
|
||||||
Version []byte
|
Version Version
|
||||||
}
|
}
|
||||||
|
|
||||||
ValueVersion struct {
|
ValueVersion struct {
|
||||||
Value []byte
|
Value Value
|
||||||
Version Version
|
Version Version
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
@ -64,3 +65,15 @@ func TimeAsVersion(t time.Time) Version {
|
||||||
n := t.UnixNano()
|
n := t.UnixNano()
|
||||||
return []byte(strconv.FormatInt(n, 10))
|
return []byte(strconv.FormatInt(n, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (version Version) Equal(other Version) bool {
|
||||||
|
return bytes.Equal(version, other)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (key Key) Equal(other Key) bool {
|
||||||
|
return key == other
|
||||||
|
}
|
||||||
|
|
||||||
|
func (value Value) Equal(other Value) bool {
|
||||||
|
return bytes.Equal(value, other)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package replicator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -13,16 +14,19 @@ func TestReplicatorStream(t *testing.T) {
|
||||||
version := TimeAsVersion(time.Now())
|
version := TimeAsVersion(time.Now())
|
||||||
|
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
before func(Replicator)
|
before func(*testing.T, Replicator)
|
||||||
during func(Replicator)
|
during func(*testing.T, Replicator)
|
||||||
after func(Replicator)
|
after func(*testing.T, Replicator)
|
||||||
}{
|
}{
|
||||||
"noop": {},
|
"noop": {},
|
||||||
"one prior op moves": {
|
"one prior op moves": {
|
||||||
before: func(r Replicator) {
|
before: func(t *testing.T, r Replicator) {
|
||||||
r.Src.Set(nil, key, value, version)
|
r.Src.Set(nil, key, value, version)
|
||||||
},
|
},
|
||||||
after: func(r Replicator) {
|
after: func(t *testing.T, r Replicator) {
|
||||||
|
if got, _ := r.Dest.Get(nil, key); !got.Version.Equal(version) || !got.Value.Equal(value) {
|
||||||
|
t.Error(got)
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -30,30 +34,30 @@ func TestReplicatorStream(t *testing.T) {
|
||||||
for name, d := range cases {
|
for name, d := range cases {
|
||||||
c := d
|
c := d
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
replicator := NewReplicator(NewMap(), NewMap())
|
replicator := NewReplicator(NewMust(NewMap()), NewMust(NewMap()))
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
if c.before != nil {
|
if c.before != nil {
|
||||||
c.before(replicator)
|
c.before(t, replicator)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := replicator.Stream(ctx); err != nil && ctx.Err() == nil {
|
if err := replicator.Stream(ctx); err != nil && !errors.Is(err, ctx.Err()) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if c.during != nil {
|
if c.during != nil {
|
||||||
c.during(replicator)
|
c.during(t, replicator)
|
||||||
}
|
}
|
||||||
|
|
||||||
can()
|
can()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if c.after != nil {
|
if c.after != nil {
|
||||||
c.after(replicator)
|
c.after(t, replicator)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue