From f8861a73b5ffb4c68ed05d04e35865041d3fc2bc Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Thu, 18 Apr 2024 14:56:33 -0600 Subject: [PATCH] async slack scrape goes up to ?since --- config.go | 7 +++ go.mod | 5 +- go.sum | 6 +- main.go | 90 +++++---------------------- main_test.go | 1 + persistence.go | 8 ++- persistence_test.go | 2 +- queue.go | 82 +++++++++++++----------- slack.go | 4 +- slackscrape.go | 148 ++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 229 insertions(+), 124 deletions(-) create mode 100644 slackscrape.go diff --git a/config.go b/config.go index 0bcaa16..b0aa83d 100644 --- a/config.go +++ b/config.go @@ -32,6 +32,7 @@ type Config struct { storage Storage ai AI slackToModelPipeline Pipeline + slackScrapePipeline Pipeline modelToPersistencePipeline Pipeline } @@ -146,5 +147,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, } result.modelToPersistencePipeline = modelToPersistencePipeline + slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result) + if err != nil { + return Config{}, err + } + result.slackScrapePipeline = slackScrapePipeline + return result, nil } diff --git a/go.mod b/go.mod index 6178823..ded1a0b 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/glebarez/go-sqlite v1.21.2 github.com/google/uuid v1.6.0 github.com/lib/pq v1.10.9 - github.com/nikolaydubina/llama2.go v0.7.1 github.com/tmc/langchaingo v0.1.8 - gotest.tools/v3 v3.5.1 + golang.org/x/time v0.5.0 + gotest.tools v2.2.0+incompatible ) require ( @@ -20,7 +20,6 @@ require ( github.com/pkoukk/tiktoken-go v0.1.6 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/sys v0.16.0 // indirect - gotest.tools v2.2.0+incompatible // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect diff --git a/go.sum b/go.sum index 49774e6..285aba0 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE= -github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= @@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= -gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= -gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= diff --git a/main.go b/main.go index 0576fea..d8855d8 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,7 @@ func run(ctx context.Context, cfg Config) error { case err := <-processPipelines(ctx, cfg.slackToModelPipeline, cfg.modelToPersistencePipeline, + cfg.slackScrapePipeline, ): return err case err := <-listenAndServe(ctx, cfg): @@ -111,85 +112,24 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { if !basicAuth(cfg, w, r) { return } - channel := r.Header.Get("slack-channel") - token := r.Header.Get("slack-oauth-token") - urls := []string{"https://slack.com/api/conversations.history?channel=" + channel} - - httpc := http.Client{Timeout: time.Second} - get := func(url string) ([]byte, error) { - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return nil, err - } - req.Header.Set("Authorization", "Bearer "+token) - req = req.WithContext(r.Context()) - - resp, err := httpc.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - defer io.Copy(io.Discard, resp.Body) - - if resp.StatusCode != http.StatusOK { - b, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b) - } - return io.ReadAll(resp.Body) + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - n := 0 - - for len(urls) > 0 { - url := urls[0] - urls = urls[1:] - select { - case <-r.Context().Done(): - case <-time.After(time.Second): - } - body, err := get(url) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - - var page struct { - Messages []json.RawMessage - } - if err := json.Unmarshal(body, &page); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - errs := []error{} - for _, messageJSON := range page.Messages { - if cfg.Debug { - log.Printf("rpc/scrapeslack => %s", messageJSON) - } - b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON}) - if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil { - errs = append(errs, err) - } else { - n += 1 - } - if !strings.Contains(url, "ts=") { - var peek struct { - ThreadTS string `json:"thread_ts"` - } - json.Unmarshal(messageJSON, &peek) - if peek.ThreadTS != "" { - urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS)) - } - } - } - - if len(errs) > 0 { - http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) - return - } + job, _ := json.Marshal(SlackScrape{ + Latest: time.Now().Unix(), + Oldest: since.Unix(), + ThreadTS: "", + Channel: r.Header.Get("slack-channel"), + Token: r.Header.Get("slack-oauth-token"), + }) + if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } - - json.NewEncoder(w).Encode(map[string]any{"scraped": n}) } } diff --git a/main_test.go b/main_test.go index efdeecf..5d9abcc 100644 --- a/main_test.go +++ b/main_test.go @@ -47,6 +47,7 @@ func TestRun(t *testing.T) { cfg.SlackToken = "redacted" cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg) + cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg) cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg) go func() { diff --git a/persistence.go b/persistence.go index b786665..9985a78 100644 --- a/persistence.go +++ b/persistence.go @@ -30,11 +30,11 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e return Pipeline{ writer: writer, reader: reader, - process: newModelToPersistenceProcess(cfg.storage), + process: newModelToPersistenceProcess(cfg, cfg.storage), }, nil } -func newModelToPersistenceProcess(storage Storage) processFunc { +func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc { return func(ctx context.Context, models []byte) ([]byte, error) { var m Models if err := json.Unmarshal(models, &m); err != nil { @@ -56,7 +56,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc { return nil, fmt.Errorf("failed to persist message: %w", err) } - log.Printf("persisted models") + if cfg.Debug { + log.Printf("persisted models") + } return json.Marshal(ModelIDs{ Event: m.Event.ID, Thread: m.Thread.ID, diff --git a/persistence_test.go b/persistence_test.go index 8c15803..4ec7163 100644 --- a/persistence_test.go +++ b/persistence_test.go @@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) { d := NewTestDriver(t) s, _ := NewStorage(ctx, d) - process := newModelToPersistenceProcess(s) + process := newModelToPersistenceProcess(Config{}, s) _, _ = ctx, process diff --git a/queue.go b/queue.go index 52480b3..d3db804 100644 --- a/queue.go +++ b/queue.go @@ -20,14 +20,14 @@ func NewNoopQueue() Queue { func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) { if _, err := driver.ExecContext(ctx, ` - CREATE TABLE IF NOT EXISTS queue ( - id TEXT PRIMARY KEY, - topic TEXT NOT NULL, - updated INTEGER NOT NULL, - reservation TEXT, - payload TEXT - ); - `); err != nil { + CREATE TABLE IF NOT EXISTS queue ( + id TEXT PRIMARY KEY, + topic TEXT NOT NULL, + updated INTEGER NOT NULL, + reservation TEXT, + payload TEXT + ); + `); err != nil { return Queue{}, fmt.Errorf("failed to create table: %w", err) } return Queue{topic: topic, driver: driver}, nil @@ -37,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error { if q.driver.DB == nil { return nil } - _, err := q.driver.ExecContext(ctx, ` - INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3) - `, + result, err := q.driver.ExecContext(ctx, ` + INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4) + `, + uuid.New().String(), q.topic, time.Now().Unix(), b, - uuid.New().String(), ) - return err + if err != nil { + return err + } + if n, err := result.RowsAffected(); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("insert into queue %s affected %v rows", b, n) + } + return nil } func (q Queue) Syn(ctx context.Context) (string, []byte, error) { @@ -71,22 +79,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { reservation := []byte(uuid.New().String()) var payload []byte if result, err := q.driver.ExecContext(ctx, ` - UPDATE queue - SET - updated = $1, reservation = $2 - WHERE - id IN ( - SELECT id - FROM queue - WHERE - topic = $3 - AND ( - reservation IS NULL - OR $4 - updated > 60 - ) - LIMIT 1 - ) - `, now, reservation, q.topic, now); err != nil { + UPDATE queue + SET + updated = $1, reservation = $2 + WHERE + id IN ( + SELECT id + FROM queue + WHERE + topic = $3 + AND ( + reservation IS NULL + OR $4 - updated > 60 + ) + LIMIT 1 + ) + `, now, reservation, q.topic, now); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: %w", err) } else if n, err := result.RowsAffected(); err != nil { return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) @@ -95,11 +103,11 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { } row := q.driver.QueryRowContext(ctx, ` - SELECT payload - FROM queue - WHERE reservation=$1 - LIMIT 1 - `, reservation) + SELECT payload + FROM queue + WHERE reservation=$1 + LIMIT 1 + `, reservation) if err := row.Err(); err != nil { return nil, nil, fmt.Errorf("failed to query reservation: %w", err) } else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") { @@ -118,9 +126,9 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error { return nil } result, err := q.driver.ExecContext(ctx, ` - DELETE FROM queue - WHERE reservation=$1 - `, reservation) + DELETE FROM queue + WHERE reservation=$1 + `, reservation) if err != nil { return err } diff --git a/slack.go b/slack.go index f9c22c8..23d4ab6 100644 --- a/slack.go +++ b/slack.go @@ -77,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc { thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event) } - log.Printf("parsed slack message into models") + if cfg.Debug { + log.Printf("parsed slack message into models") + } return json.Marshal(Models{ Event: event, Message: message, diff --git a/slackscrape.go b/slackscrape.go new file mode 100644 index 0000000..059f632 --- /dev/null +++ b/slackscrape.go @@ -0,0 +1,148 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strconv" + "time" + + "golang.org/x/time/rate" +) + +type SlackScrape struct { + Latest int64 + Oldest int64 + ThreadTS string + Channel string + Token string +} + +func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) { + reader, err := NewQueue(ctx, "slack_channels_to_scrape", cfg.driver) + if err != nil { + return Pipeline{}, err + } + cfg.slackScrapePipeline.reader = reader + writer := NewNoopQueue() + return Pipeline{ + writer: writer, + reader: reader, + process: newSlackScrapeProcess(cfg), + }, nil +} + +func newSlackScrapeProcess(cfg Config) processFunc { + limiter := rate.NewLimiter(0.5, 1) + return func(ctx context.Context, jobb []byte) ([]byte, error) { + log.Printf("newSlackScrapeProcess(%s)", jobb) + if err := limiter.Wait(ctx); err != nil { + return nil, err + } + + var job SlackScrape + if err := json.Unmarshal(jobb, &job); err != nil { + return nil, fmt.Errorf("received non SlackScrape payload: %w", err) + } + + u := url.URL{ + Scheme: "https", + Host: "slack.com", + Path: "/api/conversations.history", + } + q := url.Values{} + q.Set("channel", job.Channel) + q.Set("latest", strconv.FormatInt(job.Latest, 10)) + q.Set("limit", "999") + q.Set("inclusive", "true") + if job.ThreadTS != "" { + u.Path = "/api/conversations.replies" + q.Set("ts", job.ThreadTS) + } + if job.Oldest != 0 { + q.Set("oldest", strconv.FormatInt(job.Oldest, 10)) + } + u.RawQuery = q.Encode() + url := u.String() + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+job.Token) + req = req.WithContext(ctx) + + httpc := http.Client{Timeout: time.Second} + resp, err := httpc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + defer io.Copy(io.Discard, resp.Body) + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var page struct { + Messages []json.RawMessage + } + if err := json.Unmarshal(body, &page); err != nil { + return nil, err + } + newLatest := float64(job.Latest) + for _, messageJSON := range page.Messages { + if cfg.Debug { + log.Printf("slackScrapePipeline %s => %s", url, messageJSON) + } + b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON}) + if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil { + return nil, err + } + var peekTS struct { + TS float64 `json:"ts,string"` + } + if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest { + newLatest = peekTS.TS + } + if job.ThreadTS == "" { + var peek struct { + ThreadTS string `json:"thread_ts"` + } + json.Unmarshal(messageJSON, &peek) + if peek.ThreadTS != "" { + clone := job + clone.ThreadTS = peek.ThreadTS + clone.Oldest = 0 + b, _ := json.Marshal(clone) + if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil { + return nil, err + } + log.Printf("enqueue thread scrape for %s/%s", job.Channel, peek.ThreadTS) + } + } + } + if len(page.Messages) == 999 { + clone := job + clone.Latest = int64(newLatest) + b, _ := json.Marshal(clone) + if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil { + return nil, err + } + log.Printf("enqueue page scrape for %s up to %v", job.Channel, clone.Latest) + } + + log.Printf("scraped %v from %s", len(page.Messages), url) + return nil, nil + } +}