synchronous fanout from channel to threads for scrape

main
Bel LaPointe 2024-04-16 08:33:21 -06:00
parent d88a8bb23a
commit a8e8fdc451
2 changed files with 72 additions and 32 deletions

101
main.go
View File

@ -107,44 +107,81 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Authorization", "Bearer "+token)
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel}
resp, err := http.DefaultClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
httpc := http.Client{Timeout: time.Second}
get := func(url string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token)
req = req.WithContext(r.Context())
var page struct {
OK bool
Messages []json.RawMessage
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
}
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
} else if !page.OK {
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
n := 0
for len(urls) > 0 {
url := urls[0]
urls = urls[1:]
select {
case <-r.Context().Done():
case <-time.After(time.Second):
}
body, err := get(url)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON)
}
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil {
errs = append(errs, err)
} else {
n += 1
}
if !strings.Contains(url, "ts=") {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
}
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
}
}

View File

@ -47,6 +47,9 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
func newSlackToModelProcess(cfg Config) processFunc {
return func(ctx context.Context, slack []byte) ([]byte, error) {
s, err := parseSlack(slack)
if cfg.Debug {
log.Printf("%v: %s => %+v", err, slack, s)
}
if errors.Is(err, ErrIrrelevantMessage) {
return nil, nil
} else if err != nil {