From 8193bf73770678b09023e6ecd5cf8132ca506e29 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:08:21 -0600 Subject: [PATCH] f sql jeez --- ai.go => .ai.go | 0 ai_test.go => .ai_test.go | 0 config.go => .config.go | 25 +- config_test.go => .config_test.go | 0 .main.go | 413 ++++++++++++++++++++++++++++ main_test.go => .main_test.go | 0 message.go => .message.go | 0 message_test.go => .message_test.go | 0 report.go => .report.go | 0 report_test.go => .report_test.go | 0 .storage.go | 39 +++ storage_test.go => .storage_test.go | 0 driver.go | 303 +++----------------- driver_integration_test.go | 22 -- driver_test.go | 120 ++++---- go.mod | 16 +- go.sum | 42 ++- main.go | 412 +-------------------------- queue.go | 106 +++++-- queue_test.go | 42 +-- storage.go | 96 ------- 21 files changed, 684 insertions(+), 952 deletions(-) rename ai.go => .ai.go (100%) rename ai_test.go => .ai_test.go (100%) rename config.go => .config.go (85%) rename config_test.go => .config_test.go (100%) create mode 100644 .main.go rename main_test.go => .main_test.go (100%) rename message.go => .message.go (100%) rename message_test.go => .message_test.go (100%) rename report.go => .report.go (100%) rename report_test.go => .report_test.go (100%) create mode 100644 .storage.go rename storage_test.go => .storage_test.go (100%) delete mode 100644 driver_integration_test.go delete mode 100644 storage.go diff --git a/ai.go b/.ai.go similarity index 100% rename from ai.go rename to .ai.go diff --git a/ai_test.go b/.ai_test.go similarity index 100% rename from ai_test.go rename to .ai_test.go diff --git a/config.go b/.config.go similarity index 85% rename from config.go rename to .config.go index 1159a09..2df9049 100644 --- a/config.go +++ b/.config.go @@ -17,7 +17,7 @@ type Config struct { InitializeSlack bool SlackToken string SlackChannels []string - PostgresConn string + DriverConn string BasicAuthUser string BasicAuthPassword string FillWithTestdata bool @@ -104,21 +104,18 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, return Config{}, err } - result.driver = NewRAM() - if result.PostgresConn != "" { - ctx, can := context.WithTimeout(ctx, time.Second*10) - defer can() - pg, err := NewPostgres(ctx, result.PostgresConn) - if err != nil { - return Config{}, err - } - result.driver = pg + ctx, can := context.WithTimeout(ctx, time.Second*10) + defer can() + driver, err := NewDriver(ctx, result.DriverConn) + if err != nil { + return Config{}, err } - if result.FillWithTestdata { - if err := FillWithTestdata(ctx, result.driver, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil { - return Config{}, err - } + result.driver = driver + if !result.FillWithTestdata { + } else if err := result.driver.FillWithTestdata(ctx, result.AssetPattern, result.DatacenterPattern, result.EventNamePattern); err != nil { + return Config{}, err } + result.storage = NewStorage(result.driver) result.queue = NewQueue(result.driver) diff --git a/config_test.go b/.config_test.go similarity index 100% rename from config_test.go rename to .config_test.go diff --git a/.main.go b/.main.go new file mode 100644 index 0000000..a716c42 --- /dev/null +++ b/.main.go @@ -0,0 +1,413 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "os/signal" + "sort" + "strconv" + "strings" + "syscall" + "time" +) + +func main() { + ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) + defer can() + + cfg, err := newConfig(ctx) + if err != nil { + panic(err) + } + defer cfg.driver.Close() + + if err := run(ctx, cfg); err != nil && ctx.Err() == nil { + panic(err) + } +} + +func run(ctx context.Context, cfg Config) error { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-listenAndServe(ctx, cfg): + return err + } +} + +func listenAndServe(ctx context.Context, cfg Config) chan error { + s := http.Server{ + Addr: fmt.Sprintf(":%d", cfg.Port), + Handler: http.HandlerFunc(newHandler(cfg)), + BaseContext: func(net.Listener) context.Context { + return ctx + }, + } + + errc := make(chan error) + go func() { + defer close(errc) + log.Printf("listening on %s", s.Addr) + errc <- s.ListenAndServe() + }() + + return errc +} + +func newHandler(cfg Config) http.HandlerFunc { + mux := http.NewServeMux() + + mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) + mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) + mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) + mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) + mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) + mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) + mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) + + return func(w http.ResponseWriter, r *http.Request) { + if cfg.Debug { + b, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewReader(b)) + log.Printf("%s %s | %s", r.Method, r.URL, b) + } + + mux.ServeHTTP(w, r) + } +} + +func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + channel := r.Header.Get("slack-channel") + token := r.Header.Get("slack-oauth-token") + + req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer resp.Body.Close() + defer io.Copy(io.Discard, resp.Body) + + var page struct { + OK bool + Messages []json.RawMessage + } + if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } else if !page.OK { + http.Error(w, "slack page was !.ok", http.StatusBadGateway) + return + } + errs := []error{} + for _, messageJSON := range page.Messages { + m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) + if errors.Is(err, ErrIrrelevantMessage) { + } else if err != nil { + errs = append(errs, err) + } else if err := cfg.storage.Upsert(r.Context(), m); err != nil { + errs = append(errs, err) + } else { + log.Printf("re-ingested %v", m.ID) + } + } + + if len(errs) > 0 { + http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) + return + } + json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) + } +} + +func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"eventNames": eventNames}) + } +} + +func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + events, err := cfg.storage.EventsSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"events": events}) + } +} + +func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + messages, err := cfg.storage.MessagesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"messages": messages}) + } +} + +func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + threads, err := cfg.storage.ThreadsSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"threads": threads}) + } +} + +func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] + + messages, err := cfg.storage.Thread(r.Context(), thread) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"thread": messages}) + } +} + +func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { + if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { + http.Error(w, "shoo", http.StatusForbidden) + return false + } + return true +} + +func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { + if cfg.InitializeSlack { + return handlerPostAPIV1EventsSlackInitialize + } + return _newHandlerPostAPIV1EventsSlack(cfg) +} + +func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + var challenge struct { + Token string + Challenge string + Type string + } + if err := json.Unmarshal(b, &challenge); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) +} + +func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewReader(b)) + + var allowList struct { + Token string + Event struct { + Channel string + } + } + if err := json.Unmarshal(b, &allowList); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } else if allowList.Token != cfg.SlackToken { + http.Error(w, "invalid .token", http.StatusForbidden) + return + } else if !func() bool { + for _, slackChannel := range cfg.SlackChannels { + if slackChannel == allowList.Event.Channel { + return true + } + } + return false + }() { + return + } + + m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) + if errors.Is(err, ErrIrrelevantMessage) { + return + } else if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := cfg.storage.Upsert(r.Context(), m); err != nil { + log.Printf("failed to ingest %+v: %v", m, err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.Printf("ingested %v", m.ID) + } +} + +func parseSince(s string) (time.Time, error) { + if s == "" { + return time.Unix(0, 0), nil + } + + if n, err := strconv.ParseInt(s, 10, 64); err != nil { + } else { + return time.Unix(n, 0), nil + } + + if t, err := time.Parse(time.RFC3339, s); err != nil { + } else { + return t, nil + } + + if t, err := time.Parse(time.RFC3339Nano, s); err != nil { + } else { + return t, nil + } + + if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { + } else { + return t, nil + } + + return time.Time{}, fmt.Errorf("failed to parse since=%q", s) +} + +func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { + if strings.Contains(r.Header.Get("Accept"), "text/csv") { + return encodeCSVResponse(w, v) + } + if strings.Contains(r.Header.Get("Accept"), "text/tsv") { + return encodeTSVResponse(w, v) + } + return encodeJSONResponse(w, v) +} + +func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { + return json.NewEncoder(w).Encode(v) +} + +func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, "\t") +} + +func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, ",") +} + +func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + + var data map[string][]map[string]json.RawMessage + if err := json.Unmarshal(b, &data); err != nil { + return err + } + + var objects []map[string]json.RawMessage + for k := range data { + objects = data[k] + } + + fields := []string{} + for i := range objects { + for k := range objects[i] { + b, _ := json.Marshal(k) + fields = append(fields, string(b)) + } + break + } + sort.Strings(fields) + + w.Write([]byte(strings.Join(fields, delim))) + w.Write([]byte("\n")) + + for _, object := range objects { + for j, field := range fields { + json.Unmarshal([]byte(field), &field) + if j > 0 { + w.Write([]byte(delim)) + } + w.Write(object[field]) + } + w.Write([]byte("\n")) + } + + return nil +} diff --git a/main_test.go b/.main_test.go similarity index 100% rename from main_test.go rename to .main_test.go diff --git a/message.go b/.message.go similarity index 100% rename from message.go rename to .message.go diff --git a/message_test.go b/.message_test.go similarity index 100% rename from message_test.go rename to .message_test.go diff --git a/report.go b/.report.go similarity index 100% rename from report.go rename to .report.go diff --git a/report_test.go b/.report_test.go similarity index 100% rename from report_test.go rename to .report_test.go diff --git a/.storage.go b/.storage.go new file mode 100644 index 0000000..ca3089b --- /dev/null +++ b/.storage.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "errors" + "time" +) + +var ( + ErrNotFound = errors.New("not found") +) + +type Storage struct { + driver Driver +} + +func NewStorage(driver Driver) Storage { + return Storage{driver: driver} +} + +func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) { + return nil, errors.New("not impl") +} + +func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) { + return nil, errors.New("not impl") +} + +func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) { + return nil, errors.New("not impl") +} + +func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) { + return nil, errors.New("not impl") +} + +func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) { + return nil, errors.New("not impl") +} diff --git a/storage_test.go b/.storage_test.go similarity index 100% rename from storage_test.go rename to .storage_test.go diff --git a/driver.go b/driver.go index 9b35194..e65f69a 100644 --- a/driver.go +++ b/driver.go @@ -5,25 +5,44 @@ import ( "database/sql" "errors" "fmt" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "go.etcd.io/bbolt" + "net/url" + _ "github.com/glebarez/go-sqlite" _ "github.com/lib/pq" ) -type Driver interface { - Close() error - ForEach(context.Context, string, func(string, []byte) error) error - Get(context.Context, string, string) ([]byte, error) - Set(context.Context, string, string, []byte) error +type Driver struct { + *sql.DB } -func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacenterPattern, eventNamePattern string) error { +func NewDriver(ctx context.Context, conn string) (Driver, error) { + engine := "sqlite" + if conn == "" { + conn = ":memory:" + } else { + if u, err := url.Parse(conn); err != nil { + return Driver{}, err + } else if u.Scheme != "" { + engine = u.Scheme + } + } + + db, err := sql.Open(engine, conn) + if err != nil { + return Driver{}, err + } + + driver := Driver{DB: db} + if err := driver.setup(ctx); err != nil { + driver.Close() + return Driver{}, fmt.Errorf("failed setup: %w", err) + } + + return driver, nil +} + +/* +func (driver Driver) FillWithTestdata(ctx context.Context, assetPattern, datacenterPattern, eventNamePattern string) error { d := "./testdata/slack_events" entries, err := os.ReadDir(d) if err != nil { @@ -49,61 +68,17 @@ func FillWithTestdata(ctx context.Context, driver Driver, assetPattern, datacent } return nil } +*/ -type Postgres struct { - db *sql.DB +func (driver Driver) setup(ctx context.Context) error { + _, err := driver.ExecContext(ctx, ` + DROP TABLE IF EXISTS spoc_bot_vr_q; + DROP TABLE IF EXISTS spoc_bot_vr_messages; + `) + return err } -func NewPostgres(ctx context.Context, conn string) (Postgres, error) { - db, err := sql.Open("postgres", conn) - if err != nil { - return Postgres{}, err - } - - pg := Postgres{db: db} - if err := pg.setup(ctx); err != nil { - pg.Close() - return Postgres{}, fmt.Errorf("failed setup: %w", err) - } - - return pg, nil -} - -func (pg Postgres) setup(ctx context.Context) error { - tableQ, err := pg.table("q") - if err != nil { - return err - } - tableM, err := pg.table("m") - if err != nil { - return err - } - if _, err := pg.db.ExecContext(ctx, fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id TEXT NOT NULL, - v JSONB NOT NULL - ); - CREATE TABLE IF NOT EXISTS %s ( - id TEXT NOT NULL, - v JSONB NOT NULL - ); - ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique; - ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id); - ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s_id_unique; - ALTER TABLE %s ADD CONSTRAINT %s_id_unique UNIQUE (id); - `, tableQ, - tableM, - tableQ, tableQ, - tableQ, tableQ, - tableM, tableM, - tableM, tableM, - )); err != nil { - return err - } - return nil -} - -func (pg Postgres) table(s string) (string, error) { +func (d Driver) table(s string) (string, error) { switch s { case "q": return "spoc_bot_vr_q", nil @@ -112,201 +87,3 @@ func (pg Postgres) table(s string) (string, error) { } return "", errors.New("invalid table " + s) } - -func (pg Postgres) Close() error { - return pg.db.Close() -} - -func (pg Postgres) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error { - table, err := pg.table(ns) - if err != nil { - return err - } - - rows, err := pg.db.QueryContext(ctx, fmt.Sprintf(`SELECT id, v FROM %s;`, table)) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var id string - var v []byte - if err := rows.Scan(&id, &v); err != nil { - return err - } else if err := cb(id, v); err != nil { - return err - } - } - - return ctx.Err() -} - -func (pg Postgres) Get(ctx context.Context, ns, id string) ([]byte, error) { - table, err := pg.table(ns) - if err != nil { - return nil, err - } - - row := pg.db.QueryRowContext(ctx, fmt.Sprintf(`SELECT v FROM %s WHERE id='%s';`, table, id)) - if err := row.Err(); err != nil { - return nil, err - } - - var v []byte - if err := row.Scan(&v); err != nil && !errors.Is(err, sql.ErrNoRows) { - return nil, err - } - - return v, nil -} - -func (pg Postgres) Set(ctx context.Context, ns, id string, v []byte) error { - table, err := pg.table(ns) - if err != nil { - return err - } - - if v == nil { - _, err = pg.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE id='%s';`, table, id)) - return err - - } - - _, err = pg.db.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s (id, v) VALUES ('%s', '%s') ON CONFLICT (id) DO UPDATE SET v = '%s'`, table, id, v, v)) - return err -} - -type RAM struct { - m map[string]map[string][]byte - lock *sync.RWMutex -} - -func NewRAM() RAM { - return RAM{ - m: make(map[string]map[string][]byte), - lock: &sync.RWMutex{}, - } -} - -func (ram RAM) Close() error { - return nil -} - -func (ram RAM) ForEach(ctx context.Context, ns string, cb func(string, []byte) error) error { - ram.lock.RLock() - defer ram.lock.RUnlock() - - for k, v := range ram.m[ns] { - if ctx.Err() != nil { - break - } - if err := cb(k, v); err != nil { - return err - } - } - return ctx.Err() -} - -func (ram RAM) Get(_ context.Context, ns, id string) ([]byte, error) { - ram.lock.RLock() - defer ram.lock.RUnlock() - - if _, ok := ram.m[ns]; !ok { - return nil, nil - } - - return ram.m[ns][id], nil -} - -func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error { - ram.lock.Lock() - defer ram.lock.Unlock() - - if _, ok := ram.m[ns]; !ok { - ram.m[ns] = map[string][]byte{} - } - ram.m[ns][id] = v - if v == nil { - delete(ram.m[ns], id) - } - - return nil -} - -type BBolt struct { - db *bbolt.DB -} - -func NewTestDBIn(d string) BBolt { - d, err := ioutil.TempDir(d, "test-db-*") - if err != nil { - panic(err) - } - db, err := NewDB(path.Join(d, "bb")) - if err != nil { - panic(err) - } - return db -} - -func NewDB(p string) (BBolt, error) { - db, err := bbolt.Open(p, 0600, &bbolt.Options{ - Timeout: time.Second, - }) - return BBolt{db: db}, err -} - -func (bb BBolt) Close() error { - return bb.db.Close() -} - -func (bb BBolt) ForEach(ctx context.Context, db string, cb func(string, []byte) error) error { - return bb.db.View(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(db)) - if bkt == nil { - return nil - } - - c := bkt.Cursor() - for k, v := c.First(); k != nil && ctx.Err() == nil; k, v = c.Next() { - if err := cb(string(k), v); err != nil { - return err - } - } - - return ctx.Err() - }) -} - -func (bb BBolt) Get(_ context.Context, db, id string) ([]byte, error) { - var b []byte - err := bb.db.View(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(db)) - if bkt == nil { - return nil - } - - b = bkt.Get([]byte(id)) - return nil - }) - return b, err -} - -func (bb BBolt) Set(_ context.Context, db, id string, value []byte) error { - return bb.db.Update(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(db)) - if bkt == nil { - var err error - bkt, err = tx.CreateBucket([]byte(db)) - if err != nil { - return err - } - } - - if value == nil { - return bkt.Delete([]byte(id)) - } - return bkt.Put([]byte(id), value) - }) -} diff --git a/driver_integration_test.go b/driver_integration_test.go deleted file mode 100644 index 84a0da0..0000000 --- a/driver_integration_test.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:build postgres - -package main - -import ( - "context" - "os" - "testing" - "time" -) - -func TestPostgres(t *testing.T) { - ctx, can := context.WithTimeout(context.Background(), time.Second*15) - defer can() - - conn := os.Getenv("INTEGRATION_POSTGRES_CONN") - pg, err := NewPostgres(ctx, conn) - if err != nil { - t.Fatal(err) - } - testDriver(t, pg) -} diff --git a/driver_test.go b/driver_test.go index 037c26d..c8f6e7d 100644 --- a/driver_test.go +++ b/driver_test.go @@ -2,91 +2,69 @@ package main import ( "context" - "errors" - "io" "testing" "time" ) -func TestDriverRAM(t *testing.T) { - testDriver(t, NewRAM()) -} - -func TestFillTestdata(t *testing.T) { +func TestDriver(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*15) defer can() - ram := NewRAM() - if err := FillWithTestdata(ctx, ram, renderAssetPattern, renderDatacenterPattern, renderEventNamePattern); err != nil { + d, err := NewDriver(ctx, "") + if err != nil { t.Fatal(err) } - n := 0 - if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error { - n += 1 - return nil - }); err != nil { - t.Fatal(err) - } - t.Log(n) -} - -func TestDriverBBolt(t *testing.T) { - testDriver(t, NewTestDBIn(t.TempDir())) -} - -func testDriver(t *testing.T, d Driver) { - ctx, can := context.WithTimeout(context.Background(), time.Second*15) - defer can() - defer d.Close() - if b, err := d.Get(ctx, "m", "id"); err != nil { - t.Error("cannot get from empty:", err) - } else if b != nil { - t.Error("got fake from empty") - } - - if err := d.ForEach(ctx, "m", func(string, []byte) error { - return errors.New("should have no hits") - }); err != nil { - t.Error("failed to forEach empty:", err) - } - - if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil { - t.Error("cannot set from empty:", err) - } - - if b, err := d.Get(ctx, "m", "id"); err != nil { - t.Error("cannot get from full:", err) - } else if string(b) != `"hello world"` { - t.Error("got fake from full") - } - - if err := d.ForEach(ctx, "m", func(id string, v []byte) error { - if id != "id" { - t.Error("for each id weird:", id) + /* + if b, err := d.Get(ctx, "m", "id"); err != nil { + t.Error("cannot get from empty:", err) + } else if b != nil { + t.Error("got fake from empty") } - if string(v) != `"hello world"` { - t.Error("for each value weird:", string(v)) + + if err := d.ForEach(ctx, "m", func(string, []byte) error { + return errors.New("should have no hits") + }); err != nil { + t.Error("failed to forEach empty:", err) } - return io.EOF - }); err != io.EOF { - t.Error("failed to forEach full:", err) - } - if err := d.Set(ctx, "m", "id", nil); err != nil { - t.Error("cannot set from full:", err) - } + if err := d.Set(ctx, "m", "id", []byte(`"hello world"`)); err != nil { + t.Error("cannot set from empty:", err) + } - if err := d.ForEach(ctx, "m", func(string, []byte) error { - return errors.New("should have no hits") - }); err != nil { - t.Error("failed to forEach empty:", err) - } + if b, err := d.Get(ctx, "m", "id"); err != nil { + t.Error("cannot get from full:", err) + } else if string(b) != `"hello world"` { + t.Error("got fake from full") + } - if b, err := d.Get(ctx, "m", "id"); err != nil { - t.Error("cannot get from deleted:", err) - } else if b != nil { - t.Error("got fake from deleted") - } + if err := d.ForEach(ctx, "m", func(id string, v []byte) error { + if id != "id" { + t.Error("for each id weird:", id) + } + if string(v) != `"hello world"` { + t.Error("for each value weird:", string(v)) + } + return io.EOF + }); err != io.EOF { + t.Error("failed to forEach full:", err) + } + + if err := d.Set(ctx, "m", "id", nil); err != nil { + t.Error("cannot set from full:", err) + } + + if err := d.ForEach(ctx, "m", func(string, []byte) error { + return errors.New("should have no hits") + }); err != nil { + t.Error("failed to forEach empty:", err) + } + + if b, err := d.Get(ctx, "m", "id"); err != nil { + t.Error("cannot get from deleted:", err) + } else if b != nil { + t.Error("got fake from deleted") + } + */ } diff --git a/go.mod b/go.mod index 9c3fb93..2e1654e 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,19 @@ module github.com/breel-render/spoc-bot-vr go 1.22.1 require ( + github.com/glebarez/go-sqlite v1.21.2 github.com/go-errors/errors v1.5.1 + github.com/google/uuid v1.6.0 github.com/lib/pq v1.10.9 - github.com/nikolaydubina/llama2.go v0.7.1 - github.com/tmc/langchaingo v0.1.8 - go.etcd.io/bbolt v1.3.9 ) require ( - github.com/dlclark/regexp2 v1.10.0 // indirect - github.com/gage-technologies/mistral-go v1.0.0 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/pkoukk/tiktoken-go v0.1.6 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/sys v0.16.0 // indirect + modernc.org/libc v1.22.5 // indirect + modernc.org/mathutil v1.5.0 // indirect + modernc.org/memory v1.5.0 // indirect + modernc.org/sqlite v1.23.1 // indirect ) diff --git a/go.sum b/go.sum index b238cea..215593c 100644 --- a/go.sum +++ b/go.sum @@ -1,30 +1,28 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= -github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/gage-technologies/mistral-go v1.0.0 h1:Hwk0uJO+Iq4kMX/EwbfGRUq9zkO36w7HZ/g53N4N73A= -github.com/gage-technologies/mistral-go v1.0.0/go.mod h1:tF++Xt7U975GcLlzhrjSQb8l/x+PrriO9QEdsgm9l28= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= +github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE= -github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM= -github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= -github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA= -github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ= -go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= -go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= +modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= +modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= +modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= +modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= +modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM= +modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk= diff --git a/main.go b/main.go index a716c42..38dd16d 100644 --- a/main.go +++ b/main.go @@ -1,413 +1,3 @@ package main -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "net" - "net/http" - "os/signal" - "sort" - "strconv" - "strings" - "syscall" - "time" -) - -func main() { - ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) - defer can() - - cfg, err := newConfig(ctx) - if err != nil { - panic(err) - } - defer cfg.driver.Close() - - if err := run(ctx, cfg); err != nil && ctx.Err() == nil { - panic(err) - } -} - -func run(ctx context.Context, cfg Config) error { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-listenAndServe(ctx, cfg): - return err - } -} - -func listenAndServe(ctx context.Context, cfg Config) chan error { - s := http.Server{ - Addr: fmt.Sprintf(":%d", cfg.Port), - Handler: http.HandlerFunc(newHandler(cfg)), - BaseContext: func(net.Listener) context.Context { - return ctx - }, - } - - errc := make(chan error) - go func() { - defer close(errc) - log.Printf("listening on %s", s.Addr) - errc <- s.ListenAndServe() - }() - - return errc -} - -func newHandler(cfg Config) http.HandlerFunc { - mux := http.NewServeMux() - - mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) - mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) - mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) - mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) - mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) - mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) - mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) - - return func(w http.ResponseWriter, r *http.Request) { - if cfg.Debug { - b, _ := io.ReadAll(r.Body) - r.Body = io.NopCloser(bytes.NewReader(b)) - log.Printf("%s %s | %s", r.Method, r.URL, b) - } - - mux.ServeHTTP(w, r) - } -} - -func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - channel := r.Header.Get("slack-channel") - token := r.Header.Get("slack-oauth-token") - - req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - req.Header.Set("Authorization", "Bearer "+token) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer resp.Body.Close() - defer io.Copy(io.Discard, resp.Body) - - var page struct { - OK bool - Messages []json.RawMessage - } - if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } else if !page.OK { - http.Error(w, "slack page was !.ok", http.StatusBadGateway) - return - } - errs := []error{} - for _, messageJSON := range page.Messages { - m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - } else if err != nil { - errs = append(errs, err) - } else if err := cfg.storage.Upsert(r.Context(), m); err != nil { - errs = append(errs, err) - } else { - log.Printf("re-ingested %v", m.ID) - } - } - - if len(errs) > 0 { - http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) - return - } - json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) - } -} - -func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"eventNames": eventNames}) - } -} - -func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - events, err := cfg.storage.EventsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"events": events}) - } -} - -func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - messages, err := cfg.storage.MessagesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"messages": messages}) - } -} - -func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - threads, err := cfg.storage.ThreadsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"threads": threads}) - } -} - -func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] - - messages, err := cfg.storage.Thread(r.Context(), thread) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"thread": messages}) - } -} - -func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { - if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { - http.Error(w, "shoo", http.StatusForbidden) - return false - } - return true -} - -func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { - if cfg.InitializeSlack { - return handlerPostAPIV1EventsSlackInitialize - } - return _newHandlerPostAPIV1EventsSlack(cfg) -} - -func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { - b, _ := io.ReadAll(r.Body) - var challenge struct { - Token string - Challenge string - Type string - } - if err := json.Unmarshal(b, &challenge); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) -} - -func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := io.ReadAll(r.Body) - r.Body = io.NopCloser(bytes.NewReader(b)) - - var allowList struct { - Token string - Event struct { - Channel string - } - } - if err := json.Unmarshal(b, &allowList); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } else if allowList.Token != cfg.SlackToken { - http.Error(w, "invalid .token", http.StatusForbidden) - return - } else if !func() bool { - for _, slackChannel := range cfg.SlackChannels { - if slackChannel == allowList.Event.Channel { - return true - } - } - return false - }() { - return - } - - m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - return - } else if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if err := cfg.storage.Upsert(r.Context(), m); err != nil { - log.Printf("failed to ingest %+v: %v", m, err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - log.Printf("ingested %v", m.ID) - } -} - -func parseSince(s string) (time.Time, error) { - if s == "" { - return time.Unix(0, 0), nil - } - - if n, err := strconv.ParseInt(s, 10, 64); err != nil { - } else { - return time.Unix(n, 0), nil - } - - if t, err := time.Parse(time.RFC3339, s); err != nil { - } else { - return t, nil - } - - if t, err := time.Parse(time.RFC3339Nano, s); err != nil { - } else { - return t, nil - } - - if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { - } else { - return t, nil - } - - return time.Time{}, fmt.Errorf("failed to parse since=%q", s) -} - -func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { - if strings.Contains(r.Header.Get("Accept"), "text/csv") { - return encodeCSVResponse(w, v) - } - if strings.Contains(r.Header.Get("Accept"), "text/tsv") { - return encodeTSVResponse(w, v) - } - return encodeJSONResponse(w, v) -} - -func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { - return json.NewEncoder(w).Encode(v) -} - -func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { - return encodeSVResponse(w, v, "\t") -} - -func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { - return encodeSVResponse(w, v, ",") -} - -func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { - b, err := json.Marshal(v) - if err != nil { - return err - } - - var data map[string][]map[string]json.RawMessage - if err := json.Unmarshal(b, &data); err != nil { - return err - } - - var objects []map[string]json.RawMessage - for k := range data { - objects = data[k] - } - - fields := []string{} - for i := range objects { - for k := range objects[i] { - b, _ := json.Marshal(k) - fields = append(fields, string(b)) - } - break - } - sort.Strings(fields) - - w.Write([]byte(strings.Join(fields, delim))) - w.Write([]byte("\n")) - - for _, object := range objects { - for j, field := range fields { - json.Unmarshal([]byte(field), &field) - if j > 0 { - w.Write([]byte(delim)) - } - w.Write(object[field]) - } - w.Write([]byte("\n")) - } - - return nil -} +func main() {} diff --git a/queue.go b/queue.go index 0001749..f766d32 100644 --- a/queue.go +++ b/queue.go @@ -2,57 +2,105 @@ package main import ( "context" + "fmt" "time" - "github.com/go-errors/errors" + "github.com/google/uuid" ) type Queue struct { driver Driver } -func NewQueue(driver Driver) Queue { - return Queue{driver: driver} +func NewQueue(ctx context.Context, driver Driver) (Queue, error) { + _, err := driver.ExecContext(ctx, ` + DROP TABLE IF EXISTS queue; + CREATE TABLE IF NOT EXISTS queue ( + id INTEGER PRIMARY KEY, + updated INTEGER NOT NULL, + reservation TEXT, + payload TEXT + ); + `) + return Queue{driver: driver}, err } -func (q Queue) Push(ctx context.Context, m Message) error { - return q.driver.Set(ctx, "q", m.ID, m.Serialize()) +func (q Queue) Enqueue(ctx context.Context, b []byte) error { + _, err := q.driver.ExecContext(ctx, ` + INSERT INTO queue (updated, payload) VALUES (?, ?) + `, + time.Now().Unix(), + b, + ) + return err } -func (q Queue) PeekFirst(ctx context.Context) (Message, error) { +func (q Queue) Dequeue(ctx context.Context) (string, []byte, error) { for { - m, err := q.peekFirst(ctx) - if err != nil { - return m, err - } - - if !m.Empty() { - return m, nil + reservation, m, err := q.dequeue(ctx) + if reservation != nil || err != nil { + return string(reservation), m, err } select { case <-ctx.Done(): - return Message{}, ctx.Err() + return "", nil, ctx.Err() case <-time.After(time.Second): } } } -func (q Queue) Ack(ctx context.Context, id string) error { - return q.driver.Set(ctx, "q", id, nil) +func (q Queue) dequeue(ctx context.Context) ([]byte, []byte, error) { + now := time.Now().Unix() + reservation := []byte(uuid.New().String()) + var payload []byte + if result, err := q.driver.ExecContext(ctx, ` + UPDATE queue + SET + updated = ?, reservation = ? + WHERE + id IN ( + SELECT id + FROM queue + WHERE + reservation IS NULL + OR ? - updated > 60 + LIMIT 1 + ) + `, now, reservation, now); err != nil { + return nil, nil, fmt.Errorf("failed to assign reservation: %w", err) + } else if n, err := result.RowsAffected(); err != nil { + return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) + } else if n == 0 { + return nil, nil, fmt.Errorf("failed to assign reservation: zero updates") + } + + rows, err := q.driver.QueryContext(ctx, ` + SELECT payload + FROM queue + WHERE reservation==? + LIMIT 1 + `, reservation) + if err != nil { + return nil, nil, fmt.Errorf("failed to query reservation: %w", err) + } + defer rows.Close() + for rows.Next() { + if err := rows.Scan(&payload); err != nil { + return nil, nil, fmt.Errorf("failed to parse reservation: %w", err) + } + } + if err := rows.Err(); err != nil { + return nil, nil, fmt.Errorf("failed to page reservation: %w", err) + } + + return reservation, payload, nil } -func (q Queue) peekFirst(ctx context.Context) (Message, error) { - var m Message - subctx, subcan := context.WithCancel(ctx) - defer subcan() - err := q.driver.ForEach(subctx, "q", func(_ string, value []byte) error { - m = MustDeserialize(value) - subcan() - return nil - }) - if errors.Is(err, subctx.Err()) { - err = nil - } - return m, err +func (q Queue) Ack(ctx context.Context, reservation string) error { + _, err := q.driver.ExecContext(ctx, ` + DELETE FROM queue + WHERE reservation==? + `, reservation) + return err } diff --git a/queue_test.go b/queue_test.go index 1f8520f..924538b 100644 --- a/queue_test.go +++ b/queue_test.go @@ -11,24 +11,32 @@ func TestQueue(t *testing.T) { ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() - q := NewQueue(NewRAM()) - - for i := 0; i < 39; i++ { - if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil { - t.Fatal(i, err) - } + driver, _ := NewDriver(ctx, "/tmp/f.db") + q, err := NewQueue(ctx, driver) + if err != nil { + t.Fatal(err) } - found := map[uint64]struct{}{} - for i := 0; i < 39; i++ { - if m, err := q.PeekFirst(ctx); err != nil { - t.Fatal(i, err) - } else if _, ok := found[m.TS]; ok { - t.Error(i, m.TS) - } else if err := q.Ack(ctx, m.ID); err != nil { - t.Fatal(i, err) - } else { - found[m.TS] = struct{}{} + t.Run("enqueue", func(t *testing.T) { + for i := 0; i < 39; i++ { + if err := q.Enqueue(ctx, []byte(strconv.Itoa(i))); err != nil { + t.Fatal(i, err) + } } - } + }) + + t.Run("dequeue", func(t *testing.T) { + found := map[string]struct{}{} + for i := 0; i < 39; i++ { + if reservation, b, err := q.Dequeue(ctx); err != nil { + t.Fatal(i, "dequeue err", err) + } else if _, ok := found[string(b)]; ok { + t.Errorf("dequeued %q twice (%+v)", b, found) + } else if err := q.Ack(ctx, reservation); err != nil { + t.Fatal(i, "failed to ack", err) + } else { + found[string(b)] = struct{}{} + } + } + }) } diff --git a/storage.go b/storage.go deleted file mode 100644 index 19aa577..0000000 --- a/storage.go +++ /dev/null @@ -1,96 +0,0 @@ -package main - -import ( - "context" - "errors" - "sort" - "time" -) - -var ( - ErrNotFound = errors.New("not found") -) - -type Storage struct { - driver Driver -} - -func NewStorage(driver Driver) Storage { - return Storage{driver: driver} -} - -func (s Storage) MessagesSince(ctx context.Context, t time.Time) ([]Message, error) { - return s.messagesWhere(ctx, func(m Message) bool { - return !t.After(m.Time()) - }) -} - -func (s Storage) Threads(ctx context.Context) ([]string, error) { - return s.ThreadsSince(ctx, time.Unix(0, 0)) -} - -func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) { - return s.fieldsSince(ctx, t, func(m Message) string { return m.Thread }) -} - -func (s Storage) EventNamesSince(ctx context.Context, t time.Time) ([]string, error) { - return s.fieldsSince(ctx, t, func(m Message) string { return m.EventName }) -} - -func (s Storage) EventsSince(ctx context.Context, t time.Time) ([]string, error) { - return s.fieldsSince(ctx, t, func(m Message) string { return m.Event }) -} - -func (s Storage) fieldsSince(ctx context.Context, t time.Time, fielder func(Message) string) ([]string, error) { - messages, err := s.MessagesSince(ctx, t) - if err != nil { - return nil, err - } - values := map[string]struct{}{} - for _, m := range messages { - values[fielder(m)] = struct{}{} - } - result := make([]string, 0, len(values)) - for k := range values { - result = append(result, k) - } - sort.Strings(result) - return result, nil -} - -func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) { - return s.messagesWhere(ctx, func(m Message) bool { - return m.Thread == thread - }) -} - -func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) { - result := make([]Message, 0) - err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error { - m := MustDeserialize(v) - if !where(m) { - return nil - } - result = append(result, m) - return nil - }) - sort.Slice(result, func(i, j int) bool { - return result[i].TS < result[j].TS - }) - return result, err -} - -func (s Storage) Upsert(ctx context.Context, m Message) error { - return s.driver.Set(ctx, "m", m.ID, m.Serialize()) -} - -func (s Storage) Get(ctx context.Context, id string) (Message, error) { - b, err := s.driver.Get(ctx, "m", id) - if err != nil { - return Message{}, err - } - if b == nil { - return Message{}, ErrNotFound - } - return MustDeserialize(b), nil -}