spoc-bot-vr/slackscrape.go

150 lines
3.6 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"time"
"golang.org/x/time/rate"
)
type SlackScrape struct {
Latest int64
Oldest int64
ThreadTS string
Channel string
Token string
}
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
writer, err := NewQueue(ctx, "new_persistence", cfg.driver)
if err != nil {
return Pipeline{}, err
}
cfg.slackScrapePipeline.reader, err = NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
if err != nil {
return Pipeline{}, err
}
return Pipeline{
writer: writer,
reader: cfg.slackScrapePipeline.reader,
process: newSlackScrapeProcess(cfg),
}, nil
}
func newSlackScrapeProcess(cfg Config) processFunc {
limiter := rate.NewLimiter(0.5, 1)
return func(ctx context.Context, jobb []byte) ([]byte, error) {
if err := limiter.Wait(ctx); err != nil {
return nil, err
}
var job SlackScrape
if err := json.Unmarshal(jobb, &job); err != nil {
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
}
u := url.URL{
Scheme: "https",
Host: "slack.com",
Path: "/api/conversations.history",
}
q := url.Values{}
q.Set("channel", job.Channel)
q.Set("latest", strconv.FormatInt(job.Latest, 10))
q.Set("limit", "999")
q.Set("inclusive", "true")
if job.ThreadTS != "" {
u.Path = "/api/conversations.replies"
q.Set("ts", job.ThreadTS)
}
if job.Oldest != 0 {
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
}
u.RawQuery = q.Encode()
url := u.String()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+job.Token)
req = req.WithContext(ctx)
httpc := http.Client{Timeout: time.Second}
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
return nil, err
}
newLatest := float64(job.Latest)
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
var peekTS struct {
TS float64 `json:"ts,string"`
}
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
newLatest = peekTS.TS
}
if job.ThreadTS == "" {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
clone := job
clone.ThreadTS = peek.ThreadTS
clone.Oldest = 0
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout thread scrape for %s/%s", job.Channel, peek.ThreadTS)
}
}
}
if len(page.Messages) == 999 {
clone := job
clone.Latest = int64(newLatest)
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("fanout page scrape for %s up to %v", job.Channel, clone.Latest)
}
log.Printf("scraped %v from %s", len(page.Messages), url)
return nil, nil
}
}