package main import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "strconv" "time" "golang.org/x/time/rate" ) type SlackScrape struct { Latest int64 Oldest int64 ThreadTS string Channel string Token string } func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) { writer, err := NewQueue(ctx, "new_persistence", cfg.driver) if err != nil { return Pipeline{}, err } cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver) if err != nil { return Pipeline{}, err } return Pipeline{ writer: writer, reader: cfg.slackScrapePipeline.reader, process: newSlackScrapeProcess(cfg), }, nil } func newSlackScrapeProcess(cfg Config) processFunc { limiter := rate.NewLimiter(0.5, 1) return func(ctx context.Context, jobb []byte) ([]byte, error) { if err := limiter.Wait(ctx); err != nil { return nil, err } var job SlackScrape if err := json.Unmarshal(jobb, &job); err != nil { return nil, fmt.Errorf("received non SlackScrape payload: %w", err) } u := url.URL{ Scheme: "https", Host: "slack.com", Path: "/api/conversations.history", } q := url.Values{} q.Set("channel", job.Channel) q.Set("latest", strconv.FormatInt(job.Latest, 10)) q.Set("limit", "999") q.Set("inclusive", "true") if job.ThreadTS != "" { u.Path = "/api/conversations.replies" q.Set("ts", job.ThreadTS) } if job.Oldest != 0 { q.Set("oldest", strconv.FormatInt(job.Oldest, 10)) } u.RawQuery = q.Encode() url := u.String() req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } req.Header.Set("Authorization", "Bearer "+job.Token) req = req.WithContext(ctx) httpc := http.Client{Timeout: time.Second} resp, err := httpc.Do(req) if err != nil { return nil, err } defer resp.Body.Close() defer io.Copy(io.Discard, resp.Body) if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } var page struct { Messages []json.RawMessage } if err := json.Unmarshal(body, &page); err != nil { return nil, err } newLatest := float64(job.Latest) for _, messageJSON := range page.Messages { if cfg.Debug { log.Printf("slackScrapePipeline %s => %s", url, messageJSON) } b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON}) if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil { return nil, err } var peekTS struct { TS float64 `json:"ts,string"` } if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest { newLatest = peekTS.TS } if job.ThreadTS == "" { var peek struct { ThreadTS string `json:"thread_ts"` } json.Unmarshal(messageJSON, &peek) if peek.ThreadTS != "" { clone := job clone.ThreadTS = peek.ThreadTS clone.Oldest = 0 b, _ := json.Marshal(clone) if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil { return nil, err } log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS) } } } if len(page.Messages) == 999 { clone := job clone.Latest = int64(newLatest) b, _ := json.Marshal(clone) if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil { return nil, err } log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest) } log.Printf("scraped %v from %s", len(page.Messages), url) return nil, nil } }