interview/render-2023-10-11/cmd/cache-service/oplog.go

78 lines
1.5 KiB
Go

package main
import (
"fmt"
"render231011/internal/thestore"
"slices"
"sync"
"time"
)
type Oplog struct {
mutex *sync.RWMutex
events []timestampedEvent
idx int // todo
lastTruncated time.Time
}
type timestampedEvent struct {
t time.Time
e thestore.Event
}
func (timestampedEvent timestampedEvent) String() string {
return fmt.Sprintf("@%v: %+v", timestampedEvent.t.Unix(), timestampedEvent.e)
}
// gotcha duration? bigger N? whatever
func NewOplog() *Oplog {
return &Oplog{
events: make([]timestampedEvent, 2),
mutex: &sync.RWMutex{},
}
}
// todo circular queue if we got time
func (oplog *Oplog) Push(event thestore.Event) {
oplog.push(time.Now(), event)
}
func (oplog *Oplog) push(t time.Time, event thestore.Event) {
oplog.mutex.Lock()
defer oplog.mutex.Unlock()
oplog.lastTruncated = oplog.events[oplog.idx].t
oplog.events[oplog.idx] = timestampedEvent{
t: t,
e: event,
}
oplog.idx++
if oplog.idx >= len(oplog.events) {
oplog.idx = 0
}
}
func (oplog *Oplog) Since(t time.Time) ([]thestore.Event, bool) {
oplog.mutex.RLock()
defer oplog.mutex.RUnlock()
if !t.After(oplog.lastTruncated) {
return nil, false
}
result := make([]thestore.Event, 0, 5)
i := (oplog.idx + len(oplog.events) - 1) % len(oplog.events)
for i != oplog.idx {
if !oplog.events[i].t.After(t) {
break
}
result = append(result, oplog.events[i].e)
i--
if i < 0 {
i = len(oplog.events)
}
}
slices.Reverse(result)
return result, len(result) > 0
}