f sql jeez

main
Bel LaPointe 2024-04-15 13:08:21 -06:00
parent 2f3739b24f
commit 8193bf7377
21 changed files with 684 additions and 952 deletions

View File

View File

@ -17,7 +17,7 @@ type Config struct {
InitializeSlack bool
SlackToken string
SlackChannels []string
PostgresConn string
DriverConn string
BasicAuthUser string
BasicAuthPassword string
FillWithTestdata bool
@ -104,21 +104,18 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
return Config{}, err
}
result.driver = NewRAM()
if result.PostgresConn != "" {
ctx, can := context.WithTimeout(ctx, time.Second*10)
defer can()
pg, err := NewPostgres(ctx, result.PostgresConn)
if err != nil {
return Config{}, err
}
result.driver = pg
ctx, can := context.WithTimeout(ctx, time.Second*10)
defer can()
driver, err := NewDriver(ctx, result.DriverConn)
if err != nil {
return Config{}, err
}
if result.FillWithTestdata {
if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
return Config{}, err
}
result.driver = driver
if !result.FillWithTestdata {
} else if err := result.driver.FillWithTestdata(ctx, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil {
return Config{}, err
}
result.storage = NewStorage(result.driver)
result.queue = NewQueue(result.driver)

413
.main.go Normal file
View File

@ -0,0 +1,413 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
"time"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
cfg, err := newConfig(ctx)
if err != nil {
panic(err)
}
defer cfg.driver.Close()
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
panic(err)
}
}
func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-listenAndServe(ctx, cfg):
return err
}
}
func listenAndServe(ctx context.Context, cfg Config) chan error {
s := http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: http.HandlerFunc(newHandler(cfg)),
BaseContext: func(net.Listener) context.Context {
return ctx
},
}
errc := make(chan error)
go func() {
defer close(errc)
log.Printf("listening on %s", s.Addr)
errc <- s.ListenAndServe()
}()
return errc
}
func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
b, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b))
log.Printf("%s %s | %s", r.Method, r.URL, b)
}
mux.ServeHTTP(w, r)
}
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
var page struct {
OK bool
Messages []json.RawMessage
}
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
} else if !page.OK {
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if errors.Is(err, ErrIrrelevantMessage) {
} else if err != nil {
errs = append(errs, err)
} else if err := cfg.storage.Upsert(r.Context(), m); err != nil {
errs = append(errs, err)
} else {
log.Printf("re-ingested %v", m.ID)
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
}
}
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := cfg.storage.EventsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"events": events})
}
}
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
messages, err := cfg.storage.MessagesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"messages": messages})
}
}
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"threads": threads})
}
}
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
messages, err := cfg.storage.Thread(r.Context(), thread)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"thread": messages})
}
}
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)
return false
}
return true
}
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize
}
return _newHandlerPostAPIV1EventsSlack(cfg)
}
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var challenge struct {
Token string
Challenge string
Type string
}
if err := json.Unmarshal(b, &challenge); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
}
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b))
var allowList struct {
Token string
Event struct {
Channel string
}
}
if err := json.Unmarshal(b, &allowList); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
} else if allowList.Token != cfg.SlackToken {
http.Error(w, "invalid .token", http.StatusForbidden)
return
} else if !func() bool {
for _, slackChannel := range cfg.SlackChannels {
if slackChannel == allowList.Event.Channel {
return true
}
}
return false
}() {
return
}
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("ingested %v", m.ID)
}
}
func parseSince(s string) (time.Time, error) {
if s == "" {
return time.Unix(0, 0), nil
}
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
} else {
return time.Unix(n, 0), nil
}
if t, err := time.Parse(time.RFC3339, s); err != nil {
} else {
return t, nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
} else {
return t, nil
}
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
} else {
return t, nil
}
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
}
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
return encodeCSVResponse(w, v)
}
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
return encodeTSVResponse(w, v)
}
return encodeJSONResponse(w, v)
}
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
return json.NewEncoder(w).Encode(v)
}
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, "\t")
}
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, ",")
}
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
var data map[string][]map[string]json.RawMessage
if err := json.Unmarshal(b, &data); err != nil {
return err
}
var objects []map[string]json.RawMessage
for k := range data {
objects = data[k]
}
fields := []string{}
for i := range objects {
for k := range objects[i] {
b, _ := json.Marshal(k)
fields = append(fields, string(b))
}
break
}
sort.Strings(fields)
w.Write([]byte(strings.Join(fields, delim)))
w.Write([]byte("\n"))
for _, object := range objects {
for j, field := range fields {
json.Unmarshal([]byte(field), &field)
if j > 0 {
w.Write([]byte(delim))
}
w.Write(object[field])
}
w.Write([]byte("\n"))
}
return nil
}

39
.storage.go Normal file
View File

@ -0,0 +1,39 @@
package main
import (
"context"
"errors"
"time"
)
var (
ErrNotFound = errors.New("not found")
)
type Storage struct {
driver Driver
}
func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
return nil, errors.New("not impl")
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
return nil, errors.New("not impl")
}
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
return nil, errors.New("not impl")
}
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
return nil, errors.New("not impl")
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return nil, errors.New("not impl")
}

303
driver.go
View File

@ -5,25 +5,44 @@ import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"
"go.etcd.io/bbolt"
"net/url"
_ "github.com/glebarez/go-sqlite"
_ "github.com/lib/pq"
)
type Driver interface {
Close() error
ForEach(context.Context, string, func(string, []byte) error) error
Get(context.Context, string, string) ([]byte, error)
Set(context.Context, string, string, []byte) error
type Driver struct {
*sql.DB
}
func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern, eventNamePattern string) error {
func NewDriver(ctx context.Context, conn string) (Driver, error) {
engine := "sqlite"
if conn == "" {
conn = ":memory:"
} else {
if u, err := url.Parse(conn); err != nil {
return Driver{}, err
} else if u.Scheme != "" {
engine = u.Scheme
}
}
db, err := sql.Open(engine, conn)
if err != nil {
return Driver{}, err
}
driver := Driver{DB: db}
if err := driver.setup(ctx); err != nil {
driver.Close()
return Driver{}, fmt.Errorf("failed setup: %w", err)
}
return driver, nil
}
/*
func (driver Driver) FillWithTestdata(ctx context.Context, assetPattern, datacenterPattern, eventNamePattern string) error {
d := "./testdata/slack_events"
entries, err := os.ReadDir(d)
if err != nil {
@ -49,61 +68,17 @@ func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacent
}
return nil
}
*/
type Postgres struct {
db *sql.DB
func (driver Driver) setup(ctx context.Context) error {
_, err := driver.ExecContext(ctx, `
DROP TABLE IF EXISTS spoc_bot_vr_q;
DROP TABLE IF EXISTS spoc_bot_vr_messages;
`)
return err
}
func NewPostgres(ctx context.Context, conn string) (Postgres, error) {
db, err := sql.Open("postgres", conn)
if err != nil {
return Postgres{}, err
}
pg := Postgres{db: db}
if err := pg.setup(ctx); err != nil {
pg.Close()
return Postgres{}, fmt.Errorf("failed setup: %w", err)
}
return pg, nil
}
func (pg Postgres) setup(ctx context.Context) error {
tableQ, err := pg.table("q")
if err != nil {
return err
}
tableM, err := pg.table("m")
if err != nil {
return err
}
if _, err := pg.db.ExecContext(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS %s (
id TEXT NOT NULL,
v JSONB NOT NULL
);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique;
ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id);
`, tableQ,
tableM,
tableQ, tableQ,
tableQ, tableQ,
tableM, tableM,
tableM, tableM,
)); err != nil {
return err
}
return nil
}
func (pg Postgres) table(s string) (string, error) {
func (d Driver) table(s string) (string, error) {
switch s {
case "q":
return "spoc_bot_vr_q", nil
@ -112,201 +87,3 @@ func (pg Postgres) table(s string) (string, error) {
}
return "", errors.New("invalid table " + s)
}
func (pg Postgres) Close() error {
return pg.db.Close()
}
func (pg Postgres) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
table, err := pg.table(ns)
if err != nil {
return err
}
rows, err := pg.db.QueryContext(ctx, fmt.Sprintf(`SELECT id, v FROM %s;`, table))
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id string
var v []byte
if err := rows.Scan(&id, &v); err != nil {
return err
} else if err := cb(id, v); err != nil {
return err
}
}
return ctx.Err()
}
func (pg Postgres) Get(ctx context.Context, ns, id string) ([]byte, error) {
table, err := pg.table(ns)
if err != nil {
return nil, err
}
row := pg.db.QueryRowContext(ctx, fmt.Sprintf(`SELECT v FROM %s WHERE id='%s';`, table, id))
if err := row.Err(); err != nil {
return nil, err
}
var v []byte
if err := row.Scan(&v); err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
return v, nil
}
func (pg Postgres) Set(ctx context.Context, ns, id string, v []byte) error {
table, err := pg.table(ns)
if err != nil {
return err
}
if v == nil {
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE id='%s';`, table, id))
return err
}
_, err = pg.db.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s (id, v) VALUES ('%s', '%s') ON CONFLICT (id) DO UPDATE SET v = '%s'`, table, id, v, v))
return err
}
type RAM struct {
m map[string]map[string][]byte
lock *sync.RWMutex
}
func NewRAM() RAM {
return RAM{
m: make(map[string]map[string][]byte),
lock: &sync.RWMutex{},
}
}
func (ram RAM) Close() error {
return nil
}
func (ram RAM) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error {
ram.lock.RLock()
defer ram.lock.RUnlock()
for k, v := range ram.m[ns] {
if ctx.Err() != nil {
break
}
if err := cb(k, v); err != nil {
return err
}
}
return ctx.Err()
}
func (ram RAM) Get(_ context.Context, ns, id string) ([]byte, error) {
ram.lock.RLock()
defer ram.lock.RUnlock()
if _, ok := ram.m[ns]; !ok {
return nil, nil
}
return ram.m[ns][id], nil
}
func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
ram.lock.Lock()
defer ram.lock.Unlock()
if _, ok := ram.m[ns]; !ok {
ram.m[ns] = map[string][]byte{}
}
ram.m[ns][id] = v
if v == nil {
delete(ram.m[ns], id)
}
return nil
}
type BBolt struct {
db *bbolt.DB
}
func NewTestDBIn(d string) BBolt {
d, err := ioutil.TempDir(d, "test-db-*")
if err != nil {
panic(err)
}
db, err := NewDB(path.Join(d, "bb"))
if err != nil {
panic(err)
}
return db
}
func NewDB(p string) (BBolt, error) {
db, err := bbolt.Open(p, 0600, &bbolt.Options{
Timeout: time.Second,
})
return BBolt{db: db}, err
}
func (bb BBolt) Close() error {
return bb.db.Close()
}
func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error {
return bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
c := bkt.Cursor()
for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() {
if err := cb(string(k), v); err != nil {
return err
}
}
return ctx.Err()
})
}
func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) {
var b []byte
err := bb.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
return nil
}
b = bkt.Get([]byte(id))
return nil
})
return b, err
}
func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error {
return bb.db.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(db))
if bkt == nil {
var err error
bkt, err = tx.CreateBucket([]byte(db))
if err != nil {
return err
}
}
if value == nil {
return bkt.Delete([]byte(id))
}
return bkt.Put([]byte(id), value)
})
}

View File

@ -1,22 +0,0 @@
//go:build postgres
package main
import (
"context"
"os"
"testing"
"time"
)
func TestPostgres(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
conn := os.Getenv("INTEGRATION_POSTGRES_CONN")
pg, err := NewPostgres(ctx, conn)
if err != nil {
t.Fatal(err)
}
testDriver(t, pg)
}

View File

@ -2,91 +2,69 @@ package main
import (
"context"
"errors"
"io"
"testing"
"time"
)
func TestDriverRAM(t *testing.T) {
testDriver(t, NewRAM())
}
func TestFillTestdata(t *testing.T) {
func TestDriver(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
ram := NewRAM()
if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern); err != nil {
d, err := NewDriver(ctx, "")
if err != nil {
t.Fatal(err)
}
n := 0
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
n += 1
return nil
}); err != nil {
t.Fatal(err)
}
t.Log(n)
}
func TestDriverBBolt(t *testing.T) {
testDriver(t, NewTestDBIn(t.TempDir()))
}
func testDriver(t *testing.T, d Driver) {
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
defer can()
defer d.Close()
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from empty:", err)
} else if b != nil {
t.Error("got fake from empty")
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil {
t.Error("cannot set from empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from full:", err)
} else if string(b) != `"hello world"` {
t.Error("got fake from full")
}
if err := d.ForEach(ctx, "m", func(id string, v []byte) error {
if id != "id" {
t.Error("for each id weird:", id)
/*
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from empty:", err)
} else if b != nil {
t.Error("got fake from empty")
}
if string(v) != `"hello world"` {
t.Error("for each value weird:", string(v))
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
return io.EOF
}); err != io.EOF {
t.Error("failed to forEach full:", err)
}
if err := d.Set(ctx, "m", "id", nil); err != nil {
t.Error("cannot set from full:", err)
}
if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil {
t.Error("cannot set from empty:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from full:", err)
} else if string(b) != `"hello world"` {
t.Error("got fake from full")
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {
t.Error("got fake from deleted")
}
if err := d.ForEach(ctx, "m", func(id string, v []byte) error {
if id != "id" {
t.Error("for each id weird:", id)
}
if string(v) != `"hello world"` {
t.Error("for each value weird:", string(v))
}
return io.EOF
}); err != io.EOF {
t.Error("failed to forEach full:", err)
}
if err := d.Set(ctx, "m", "id", nil); err != nil {
t.Error("cannot set from full:", err)
}
if err := d.ForEach(ctx, "m", func(string, []byte) error {
return errors.New("should have no hits")
}); err != nil {
t.Error("failed to forEach empty:", err)
}
if b, err := d.Get(ctx, "m", "id"); err != nil {
t.Error("cannot get from deleted:", err)
} else if b != nil {
t.Error("got fake from deleted")
}
*/
}

16
go.mod
View File

@ -3,17 +3,19 @@ module github.com/breel-render/spoc-bot-vr
go 1.22.1
require (
github.com/glebarez/go-sqlite v1.21.2
github.com/go-errors/errors v1.5.1
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8
go.etcd.io/bbolt v1.3.9
)
require (
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/gage-technologies/mistral-go v1.0.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.23.1 // indirect
)

42
go.sum
View File

@ -1,30 +1,28 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A=
github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=

412
main.go
View File

@ -1,413 +1,3 @@
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
"time"
)
func main() {
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer can()
cfg, err := newConfig(ctx)
if err != nil {
panic(err)
}
defer cfg.driver.Close()
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
panic(err)
}
}
func run(ctx context.Context, cfg Config) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-listenAndServe(ctx, cfg):
return err
}
}
func listenAndServe(ctx context.Context, cfg Config) chan error {
s := http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: http.HandlerFunc(newHandler(cfg)),
BaseContext: func(net.Listener) context.Context {
return ctx
},
}
errc := make(chan error)
go func() {
defer close(errc)
log.Printf("listening on %s", s.Addr)
errc <- s.ListenAndServe()
}()
return errc
}
func newHandler(cfg Config) http.HandlerFunc {
mux := http.NewServeMux()
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
return func(w http.ResponseWriter, r *http.Request) {
if cfg.Debug {
b, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b))
log.Printf("%s %s | %s", r.Method, r.URL, b)
}
mux.ServeHTTP(w, r)
}
}
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
var page struct {
OK bool
Messages []json.RawMessage
}
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
} else if !page.OK {
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if errors.Is(err, ErrIrrelevantMessage) {
} else if err != nil {
errs = append(errs, err)
} else if err := cfg.storage.Upsert(r.Context(), m); err != nil {
errs = append(errs, err)
} else {
log.Printf("re-ingested %v", m.ID)
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
}
}
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
}
}
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := cfg.storage.EventsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"events": events})
}
}
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
messages, err := cfg.storage.MessagesSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"messages": messages})
}
}
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"threads": threads})
}
}
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !basicAuth(cfg, w, r) {
return
}
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
messages, err := cfg.storage.Thread(r.Context(), thread)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
encodeResponse(w, r, map[string]any{"thread": messages})
}
}
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
http.Error(w, "shoo", http.StatusForbidden)
return false
}
return true
}
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
if cfg.InitializeSlack {
return handlerPostAPIV1EventsSlackInitialize
}
return _newHandlerPostAPIV1EventsSlack(cfg)
}
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var challenge struct {
Token string
Challenge string
Type string
}
if err := json.Unmarshal(b, &challenge); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
}
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(b))
var allowList struct {
Token string
Event struct {
Channel string
}
}
if err := json.Unmarshal(b, &allowList); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
} else if allowList.Token != cfg.SlackToken {
http.Error(w, "invalid .token", http.StatusForbidden)
return
} else if !func() bool {
for _, slackChannel := range cfg.SlackChannels {
if slackChannel == allowList.Event.Channel {
return true
}
}
return false
}() {
return
}
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
if errors.Is(err, ErrIrrelevantMessage) {
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
log.Printf("failed to ingest %+v: %v", m, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("ingested %v", m.ID)
}
}
func parseSince(s string) (time.Time, error) {
if s == "" {
return time.Unix(0, 0), nil
}
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
} else {
return time.Unix(n, 0), nil
}
if t, err := time.Parse(time.RFC3339, s); err != nil {
} else {
return t, nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
} else {
return t, nil
}
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
} else {
return t, nil
}
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
}
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
return encodeCSVResponse(w, v)
}
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
return encodeTSVResponse(w, v)
}
return encodeJSONResponse(w, v)
}
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
return json.NewEncoder(w).Encode(v)
}
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, "\t")
}
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
return encodeSVResponse(w, v, ",")
}
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
var data map[string][]map[string]json.RawMessage
if err := json.Unmarshal(b, &data); err != nil {
return err
}
var objects []map[string]json.RawMessage
for k := range data {
objects = data[k]
}
fields := []string{}
for i := range objects {
for k := range objects[i] {
b, _ := json.Marshal(k)
fields = append(fields, string(b))
}
break
}
sort.Strings(fields)
w.Write([]byte(strings.Join(fields, delim)))
w.Write([]byte("\n"))
for _, object := range objects {
for j, field := range fields {
json.Unmarshal([]byte(field), &field)
if j > 0 {
w.Write([]byte(delim))
}
w.Write(object[field])
}
w.Write([]byte("\n"))
}
return nil
}
func main() {}

106
queue.go
View File

@ -2,57 +2,105 @@ package main
import (
"context"
"fmt"
"time"
"github.com/go-errors/errors"
"github.com/google/uuid"
)
type Queue struct {
driver Driver
}
func NewQueue(driver Driver) Queue {
return Queue{driver: driver}
func NewQueue(ctx context.Context, driver Driver) (Queue, error) {
_, err := driver.ExecContext(ctx, `
DROP TABLE IF EXISTS queue;
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY,
updated INTEGER NOT NULL,
reservation TEXT,
payload TEXT
);
`)
return Queue{driver: driver}, err
}
func (q Queue) Push(ctx context.Context, m Message) error {
return q.driver.Set(ctx, "q", m.ID, m.Serialize())
func (q Queue) Enqueue(ctx context.Context, b []byte) error {
_, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (updated, payload) VALUES (?, ?)
`,
time.Now().Unix(),
b,
)
return err
}
func (q Queue) PeekFirst(ctx context.Context) (Message, error) {
func (q Queue) Dequeue(ctx context.Context) (string, []byte, error) {
for {
m, err := q.peekFirst(ctx)
if err != nil {
return m, err
}
if !m.Empty() {
return m, nil
reservation, m, err := q.dequeue(ctx)
if reservation != nil || err != nil {
return string(reservation), m, err
}
select {
case <-ctx.Done():
return Message{}, ctx.Err()
return "", nil, ctx.Err()
case <-time.After(time.Second):
}
}
}
func (q Queue) Ack(ctx context.Context, id string) error {
return q.driver.Set(ctx, "q", id, nil)
func (q Queue) dequeue(ctx context.Context) ([]byte, []byte, error) {
now := time.Now().Unix()
reservation := []byte(uuid.New().String())
var payload []byte
if result, err := q.driver.ExecContext(ctx, `
UPDATE queue
SET
updated = ?, reservation = ?
WHERE
id IN (
SELECT id
FROM queue
WHERE
reservation IS NULL
OR ? - updated > 60
LIMIT 1
)
`, now, reservation, now); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
} else if n, err := result.RowsAffected(); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
} else if n == 0 {
return nil, nil, fmt.Errorf("failed to assign reservation: zero updates")
}
rows, err := q.driver.QueryContext(ctx, `
SELECT payload
FROM queue
WHERE reservation==?
LIMIT 1
`, reservation)
if err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
}
defer rows.Close()
for rows.Next() {
if err := rows.Scan(&payload); err != nil {
return nil, nil, fmt.Errorf("failed to parse reservation: %w", err)
}
}
if err := rows.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to page reservation: %w", err)
}
return reservation, payload, nil
}
func (q Queue) peekFirst(ctx context.Context) (Message, error) {
var m Message
subctx, subcan := context.WithCancel(ctx)
defer subcan()
err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error {
m = MustDeserialize(value)
subcan()
return nil
})
if errors.Is(err, subctx.Err()) {
err = nil
}
return m, err
func (q Queue) Ack(ctx context.Context, reservation string) error {
_, err := q.driver.ExecContext(ctx, `
DELETE FROM queue
WHERE reservation==?
`, reservation)
return err
}

View File

@ -11,24 +11,32 @@ func TestQueue(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
defer can()
q := NewQueue(NewRAM())
for i := 0; i < 39; i++ {
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
t.Fatal(i, err)
}
driver, _ := NewDriver(ctx, "/tmp/f.db")
q, err := NewQueue(ctx, driver)
if err != nil {
t.Fatal(err)
}
found := map[uint64]struct{}{}
for i := 0; i < 39; i++ {
if m, err := q.PeekFirst(ctx); err != nil {
t.Fatal(i, err)
} else if _, ok := found[m.TS]; ok {
t.Error(i, m.TS)
} else if err := q.Ack(ctx, m.ID); err != nil {
t.Fatal(i, err)
} else {
found[m.TS] = struct{}{}
t.Run("enqueue", func(t *testing.T) {
for i := 0; i < 39; i++ {
if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil {
t.Fatal(i, err)
}
}
}
})
t.Run("dequeue", func(t *testing.T) {
found := map[string]struct{}{}
for i := 0; i < 39; i++ {
if reservation, b, err := q.Dequeue(ctx); err != nil {
t.Fatal(i, "dequeue err", err)
} else if _, ok := found[string(b)]; ok {
t.Errorf("dequeued %q twice (%+v)", b, found)
} else if err := q.Ack(ctx, reservation); err != nil {
t.Fatal(i, "failed to ack", err)
} else {
found[string(b)] = struct{}{}
}
}
})
}

View File

@ -1,96 +0,0 @@
package main
import (
"context"
"errors"
"sort"
"time"
)
var (
ErrNotFound = errors.New("not found")
)
type Storage struct {
driver Driver
}
func NewStorage(driver Driver) Storage {
return Storage{driver: driver}
}
func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return !t.After(m.Time())
})
}
func (s Storage) Threads(ctx context.Context) ([]string, error) {
return s.ThreadsSince(ctx, time.Unix(0, 0))
}
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread })
}
func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName })
}
func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) {
return s.fieldsSince(ctx, t, func(m Message) string { return m.Event })
}
func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) {
messages, err := s.MessagesSince(ctx, t)
if err != nil {
return nil, err
}
values := map[string]struct{}{}
for _, m := range messages {
values[fielder(m)] = struct{}{}
}
result := make([]string, 0, len(values))
for k := range values {
result = append(result, k)
}
sort.Strings(result)
return result, nil
}
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
return s.messagesWhere(ctx, func(m Message) bool {
return m.Thread == thread
})
}
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
result := make([]Message, 0)
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
m := MustDeserialize(v)
if !where(m) {
return nil
}
result = append(result, m)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i].TS < result[j].TS
})
return result, err
}
func (s Storage) Upsert(ctx context.Context, m Message) error {
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
}
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
b, err := s.driver.Get(ctx, "m", id)
if err != nil {
return Message{}, err
}
if b == nil {
return Message{}, ErrNotFound
}
return MustDeserialize(b), nil
}