mock cache-service oplog truncation functionality
parent
49cd477db0
commit
1a1640b772
|
|
@ -8,6 +8,8 @@ import (
|
||||||
|
|
||||||
type Oplog struct {
|
type Oplog struct {
|
||||||
events []timestampedEvent
|
events []timestampedEvent
|
||||||
|
idx int // todo
|
||||||
|
lastTruncated time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type timestampedEvent struct {
|
type timestampedEvent struct {
|
||||||
|
|
@ -34,7 +36,11 @@ func (oplog *Oplog) push(t time.Time, event thestore.Event) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oplog *Oplog) Since(t time.Time) []thestore.Event {
|
func (oplog *Oplog) Since(t time.Time) ([]thestore.Event, bool) {
|
||||||
|
if !t.After(oplog.lastTruncated) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
result := make([]thestore.Event, 0, 5)
|
result := make([]thestore.Event, 0, 5)
|
||||||
for i := len(oplog.events) - 1; i >= 0; i-- {
|
for i := len(oplog.events) - 1; i >= 0; i-- {
|
||||||
if !oplog.events[i].t.After(t) {
|
if !oplog.events[i].t.After(t) {
|
||||||
|
|
@ -43,5 +49,5 @@ func (oplog *Oplog) Since(t time.Time) []thestore.Event {
|
||||||
result = append(result, oplog.events[i].e)
|
result = append(result, oplog.events[i].e)
|
||||||
}
|
}
|
||||||
slices.Reverse(result)
|
slices.Reverse(result)
|
||||||
return result
|
return result, len(result) > 0
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ func TestOplog(t *testing.T) {
|
||||||
oplog.push(time.Unix(3, 0), thestore.Event{ID: "3"})
|
oplog.push(time.Unix(3, 0), thestore.Event{ID: "3"})
|
||||||
oplog.push(time.Unix(4, 0), thestore.Event{ID: "4"})
|
oplog.push(time.Unix(4, 0), thestore.Event{ID: "4"})
|
||||||
|
|
||||||
got := oplog.Since(time.Unix(2, 0))
|
got, _ := oplog.Since(time.Unix(2, 0))
|
||||||
if len(got) != 2 {
|
if len(got) != 2 {
|
||||||
t.Error(got)
|
t.Error(got)
|
||||||
} else if got[0].ID != "3" {
|
} else if got[0].ID != "3" {
|
||||||
|
|
|
||||||
|
|
@ -56,10 +56,13 @@ func (server Server) serveGetEvents(w http.ResponseWriter, r *http.Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// todo emit helpful err message
|
// todo emit helpful err message
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
since := time.Unix(sinceInt, 0)
|
since := time.Unix(sinceInt, 0)
|
||||||
|
|
||||||
result := server.oplog.Since(since)
|
result, ok := server.oplog.Since(since)
|
||||||
|
if !ok {
|
||||||
|
result = server.store.Dump()
|
||||||
|
}
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,16 @@ func NewStore() *Store {
|
||||||
return &store
|
return &store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo pass context.context and failfast
|
||||||
|
func (store *Store) Dump() []Event {
|
||||||
|
result := make([]Event, 0)
|
||||||
|
(*sync.Map)(store).Range(func(_, v any) bool {
|
||||||
|
result = append(result, v.(Event))
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
func (store *Store) Push(op Event) {
|
func (store *Store) Push(op Event) {
|
||||||
k := op.ID
|
k := op.ID
|
||||||
event, ok := store.Get(k)
|
event, ok := store.Get(k)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue