150 lines
3.6 KiB
Go
150 lines
3.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
type SlackScrape struct {
|
|
Latest int64
|
|
Oldest int64
|
|
ThreadTS string
|
|
Channel string
|
|
Token string
|
|
}
|
|
|
|
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
|
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
|
|
if err != nil {
|
|
return Pipeline{}, err
|
|
}
|
|
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
|
|
if err != nil {
|
|
return Pipeline{}, err
|
|
}
|
|
return Pipeline{
|
|
writer: writer,
|
|
reader: cfg.slackScrapePipeline.reader,
|
|
process: newSlackScrapeProcess(cfg),
|
|
}, nil
|
|
}
|
|
|
|
func newSlackScrapeProcess(cfg Config) processFunc {
|
|
limiter := rate.NewLimiter(0.5, 1)
|
|
return func(ctx context.Context, jobb []byte) ([]byte, error) {
|
|
if err := limiter.Wait(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var job SlackScrape
|
|
if err := json.Unmarshal(jobb, &job); err != nil {
|
|
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
|
|
}
|
|
|
|
u := url.URL{
|
|
Scheme: "https",
|
|
Host: "slack.com",
|
|
Path: "/api/conversations.history",
|
|
}
|
|
q := url.Values{}
|
|
q.Set("channel", job.Channel)
|
|
q.Set("latest", strconv.FormatInt(job.Latest, 10))
|
|
q.Set("limit", "999")
|
|
q.Set("inclusive", "true")
|
|
if job.ThreadTS != "" {
|
|
u.Path = "/api/conversations.replies"
|
|
q.Set("ts", job.ThreadTS)
|
|
}
|
|
if job.Oldest != 0 {
|
|
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
|
|
}
|
|
u.RawQuery = q.Encode()
|
|
url := u.String()
|
|
|
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+job.Token)
|
|
req = req.WithContext(ctx)
|
|
|
|
httpc := http.Client{Timeout: time.Second}
|
|
resp, err := httpc.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
defer io.Copy(io.Discard, resp.Body)
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
b, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var page struct {
|
|
Messages []json.RawMessage
|
|
}
|
|
if err := json.Unmarshal(body, &page); err != nil {
|
|
return nil, err
|
|
}
|
|
newLatest := float64(job.Latest)
|
|
for _, messageJSON := range page.Messages {
|
|
if cfg.Debug {
|
|
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
|
|
}
|
|
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
|
|
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
|
|
return nil, err
|
|
}
|
|
var peekTS struct {
|
|
TS float64 `json:"ts,string"`
|
|
}
|
|
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
|
|
newLatest = peekTS.TS
|
|
}
|
|
if job.ThreadTS == "" {
|
|
var peek struct {
|
|
ThreadTS string `json:"thread_ts"`
|
|
}
|
|
json.Unmarshal(messageJSON, &peek)
|
|
if peek.ThreadTS != "" {
|
|
clone := job
|
|
clone.ThreadTS = peek.ThreadTS
|
|
clone.Oldest = 0
|
|
b, _ := json.Marshal(clone)
|
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
|
return nil, err
|
|
}
|
|
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
|
|
}
|
|
}
|
|
}
|
|
if len(page.Messages) == 999 {
|
|
clone := job
|
|
clone.Latest = int64(newLatest)
|
|
b, _ := json.Marshal(clone)
|
|
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
|
|
return nil, err
|
|
}
|
|
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
|
|
}
|
|
|
|
log.Printf("scraped %v from %s", len(page.Messages), url)
|
|
return nil, nil
|
|
}
|
|
}
|