97 lines
2.3 KiB
Go
97 lines
2.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ErrNotFound = errors.New("not found")
|
|
)
|
|
|
|
type Storage struct {
|
|
driver Driver
|
|
}
|
|
|
|
func NewStorage(driver Driver) Storage {
|
|
return Storage{driver: driver}
|
|
}
|
|
|
|
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
|
|
return s.messagesWhere(ctx, func(m Message) bool {
|
|
return !t.After(m.Time())
|
|
})
|
|
}
|
|
|
|
func (s Storage) Threads(ctx context.Context) ([]string, error) {
|
|
return s.ThreadsSince(ctx, time.Unix(0, 0))
|
|
}
|
|
|
|
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
|
|
}
|
|
|
|
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
|
|
}
|
|
|
|
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
|
|
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
|
|
}
|
|
|
|
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
|
|
messages, err := s.MessagesSince(ctx, t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
values := map[string]struct{}{}
|
|
for _, m := range messages {
|
|
values[fielder(m)] = struct{}{}
|
|
}
|
|
result := make([]string, 0, len(values))
|
|
for k := range values {
|
|
result = append(result, k)
|
|
}
|
|
sort.Strings(result)
|
|
return result, nil
|
|
}
|
|
|
|
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
|
|
return s.messagesWhere(ctx, func(m Message) bool {
|
|
return m.Thread == thread
|
|
})
|
|
}
|
|
|
|
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
|
|
result := make([]Message, 0)
|
|
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
|
|
m := MustDeserialize(v)
|
|
if !where(m) {
|
|
return nil
|
|
}
|
|
result = append(result, m)
|
|
return nil
|
|
})
|
|
sort.Slice(result, func(i, j int) bool {
|
|
return result[i].TS < result[j].TS
|
|
})
|
|
return result, err
|
|
}
|
|
|
|
func (s Storage) Upsert(ctx context.Context, m Message) error {
|
|
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
|
|
}
|
|
|
|
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
|
|
b, err := s.driver.Get(ctx, "m", id)
|
|
if err != nil {
|
|
return Message{}, err
|
|
}
|
|
if b == nil {
|
|
return Message{}, ErrNotFound
|
|
}
|
|
return MustDeserialize(b), nil
|
|
}
|