This commit is contained in:
bel
2023-10-15 10:35:36 -06:00
commit 664a5f195e
15 changed files with 1356 additions and 0 deletions

22
cmd/cache-service/main.go Normal file
View File

@@ -0,0 +1,22 @@
package main
import (
"flag"
"fmt"
"log"
"net/http"
)
func main() {
port := flag.Int("p", 8080, "port to listen on")
flag.Parse()
server := NewServer()
addr := fmt.Sprintf(":%d", *port) // check port >0 // accept IP for mvp security
// add signal catching for clean shutdown via context and httpServer or the like
log.Printf("listening on %s", addr)
if err := http.ListenAndServe(addr, server); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,77 @@
package main
import (
"fmt"
"render231011/internal/thestore"
"slices"
"sync"
"time"
)
type Oplog struct {
mutex *sync.RWMutex
events []timestampedEvent
idx int // todo
lastTruncated time.Time
}
type timestampedEvent struct {
t time.Time
e thestore.Event
}
func (timestampedEvent timestampedEvent) String() string {
return fmt.Sprintf("@%v: %+v", timestampedEvent.t.Unix(), timestampedEvent.e)
}
// gotcha duration? bigger N? whatever
func NewOplog() *Oplog {
return &Oplog{
events: make([]timestampedEvent, 2),
mutex: &sync.RWMutex{},
}
}
// todo circular queue if we got time
func (oplog *Oplog) Push(event thestore.Event) {
oplog.push(time.Now(), event)
}
func (oplog *Oplog) push(t time.Time, event thestore.Event) {
oplog.mutex.Lock()
defer oplog.mutex.Unlock()
oplog.lastTruncated = oplog.events[oplog.idx].t
oplog.events[oplog.idx] = timestampedEvent{
t: t,
e: event,
}
oplog.idx++
if oplog.idx >= len(oplog.events) {
oplog.idx = 0
}
}
func (oplog *Oplog) Since(t time.Time) ([]thestore.Event, bool) {
oplog.mutex.RLock()
defer oplog.mutex.RUnlock()
if !t.After(oplog.lastTruncated) {
return nil, false
}
result := make([]thestore.Event, 0, 5)
i := (oplog.idx + len(oplog.events) - 1) % len(oplog.events)
for i != oplog.idx {
if !oplog.events[i].t.After(t) {
break
}
result = append(result, oplog.events[i].e)
i--
if i < 0 {
i = len(oplog.events)
}
}
slices.Reverse(result)
return result, len(result) > 0
}

View File

@@ -0,0 +1,27 @@
package main
import (
"render231011/internal/thestore"
"testing"
"time"
)
func TestOplog(t *testing.T) {
oplog := NewOplog()
oplog.push(time.Unix(1, 0), thestore.Event{ID: "1"})
oplog.push(time.Unix(2, 0), thestore.Event{ID: "2"})
oplog.push(time.Unix(3, 0), thestore.Event{ID: "3"})
oplog.push(time.Unix(4, 0), thestore.Event{ID: "4"})
t.Logf("%+v, %s", oplog, oplog.lastTruncated.UTC())
got, _ := oplog.Since(time.Unix(2, 0))
if len(got) != 2 {
t.Error(got)
} else if got[0].ID != "3" {
t.Error(got[0])
} else if got[1].ID != "4" {
t.Error(got[1])
}
t.Logf("%+v", got)
}

View File

@@ -0,0 +1,68 @@
package main
import (
"encoding/json"
"log"
"net/http"
"render231011/internal/thestore"
"strconv"
"strings"
"time"
)
type Server struct {
store *thestore.Store
oplog *Oplog
}
func NewServer() Server {
return Server{
store: thestore.NewStore(),
oplog: NewOplog(),
}
}
func (server Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// todo middleware: rate limit
// todo middleware: timeout requests
// todo: replace with an actual router
if strings.HasPrefix(r.URL.Path, "/event") && r.Method == http.MethodPost {
server.servePostEvent(w, r)
} else if r.URL.Path == "/events" && r.Method == http.MethodGet {
server.serveGetEvents(w, r)
}
}
func (server Server) servePostEvent(w http.ResponseWriter, r *http.Request) {
var newEvent thestore.Event
if err := json.NewDecoder(r.Body).Decode(&newEvent); err != nil {
w.WriteHeader(http.StatusBadRequest)
// todo error msg pls
return
}
// todo check if action==updated service-id already exists
server.store.Push(newEvent)
log.Printf("ingested event %+v", newEvent)
// todo store the op log
server.oplog.Push(newEvent)
}
func (server Server) serveGetEvents(w http.ResponseWriter, r *http.Request) {
// todo my circular queue magic
sinceString := r.URL.Query().Get("since")
sinceInt, err := strconv.ParseInt(sinceString, 10, 64)
if err != nil {
// todo emit helpful err message
w.WriteHeader(http.StatusBadRequest)
return
}
since := time.Unix(sinceInt, 0)
result, ok := server.oplog.Since(since)
if !ok {
result = server.store.Dump()
}
json.NewEncoder(w).Encode(result)
}

View File

@@ -0,0 +1,22 @@
package main
import (
"flag"
"fmt"
"log"
"net/http"
)
func main() {
port := flag.Int("p", 8080, "port to listen on")
flag.Parse()
server := NewServer()
addr := fmt.Sprintf(":%d", *port) // check port >0 // accept IP for mvp security
// add signal catching for clean shutdown via context and httpServer or the like
log.Printf("listening on %s", addr)
if err := http.ListenAndServe(addr, server); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,119 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/signal"
"path"
"render231011/internal/thestore"
"strings"
"syscall"
"time"
)
type Server struct {
store *thestore.Store
}
func NewServer() Server {
s := Server{
store: thestore.NewStore(),
}
// todo timeout via context
if err := s.pollCacheServiceForUpdates(context.Background()); err != nil {
panic(err)
}
go s.PollCacheServiceForUpdates()
return s
}
// todo add a context to spin this goroutine-method down
func (server Server) PollCacheServiceForUpdates() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
for ctx.Err() == nil {
if err := server.pollCacheServiceForUpdates(ctx); err != nil {
log.Println("failed to PollCacheServiceForUpdates:", err)
}
// poll cache service
select {
case <-ctx.Done():
case <-time.After(time.Second): // make configurable
}
}
}
var lastEventsPoll time.Time
func (server Server) pollCacheServiceForUpdates(ctx context.Context) error {
c := http.Client{Timeout: time.Minute}
req, err := http.NewRequest(
http.MethodGet,
// todo configurable cache-service addr
fmt.Sprintf("http://localhost:8080/events?since=%v", lastEventsPoll.Unix()), // todo not a global
nil,
)
if err != nil {
panic(err)
}
log.Printf("+%v", req)
resp, err := c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
// todo check status code
var pollResult []thestore.Event
if err := json.NewDecoder(resp.Body).Decode(&pollResult); err != nil {
return fmt.Errorf("failed to read events from cache-service: %v", err)
}
for i := range pollResult {
server.store.Push(pollResult[i])
}
// !!!!!!!todo cache-service needs to emit lastEventsPoll value
lastEventsPoll = time.Now()
return nil
}
func (server Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// todo middleware: rate limit
// todo middleware: timeout requests
// todo: replace with an actual router
if strings.HasPrefix(r.URL.Path, "/lookup") && r.Method == http.MethodGet {
server.serveGetLookup(w, r)
}
}
// GET /lookup/xyz
func (server Server) serveGetLookup(w http.ResponseWriter, r *http.Request) {
k := path.Base(r.URL.Path)
if k == "" {
// log err here
w.WriteHeader(http.StatusBadRequest)
return
}
state, ok := server.store.Get(k)
if !ok {
// todo something clearer
w.WriteHeader(http.StatusNoContent)
return
}
if err := json.NewEncoder(w).Encode(state); err != nil {
w.WriteHeader(499)
log.Println("failed to responsd", err)
}
}