diff --git a/main.go b/main.go index 4670d23..adda675 100644 --- a/main.go +++ b/main.go @@ -107,44 +107,81 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { channel := r.Header.Get("slack-channel") token := r.Header.Get("slack-oauth-token") - req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - req.Header.Set("Authorization", "Bearer "+token) + urls := []string{"https://slack.com/api/conversations.history?channel=" + channel} - resp, err := http.DefaultClient.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer resp.Body.Close() - defer io.Copy(io.Discard, resp.Body) + httpc := http.Client{Timeout: time.Second} + get := func(url string) ([]byte, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+token) + req = req.WithContext(r.Context()) - var page struct { - OK bool - Messages []json.RawMessage + resp, err := httpc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + defer io.Copy(io.Discard, resp.Body) + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b) + } + return io.ReadAll(resp.Body) } - if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } else if !page.OK { - http.Error(w, "slack page was !.ok", http.StatusBadGateway) - return - } - errs := []error{} - for _, messageJSON := range page.Messages { - if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { - errs = append(errs, err) + + n := 0 + + for len(urls) > 0 { + url := urls[0] + urls = urls[1:] + select { + case <-r.Context().Done(): + case <-time.After(time.Second): + } + body, err := get(url) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + + var page struct { + Messages []json.RawMessage + } + if err := json.Unmarshal(body, &page); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + errs := []error{} + for _, messageJSON := range page.Messages { + if cfg.Debug { + log.Printf("rpc/scrapeslack => %s", messageJSON) + } + if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { + errs = append(errs, err) + } else { + n += 1 + } + if !strings.Contains(url, "ts=") { + var peek struct { + ThreadTS string `json:"thread_ts"` + } + json.Unmarshal(messageJSON, &peek) + if peek.ThreadTS != "" { + urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS)) + } + } + } + + if len(errs) > 0 { + http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) + return } } - if len(errs) > 0 { - http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) - return - } - json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) + json.NewEncoder(w).Encode(map[string]any{"scraped": n}) } } diff --git a/slack.go b/slack.go index d020070..043dc73 100644 --- a/slack.go +++ b/slack.go @@ -47,6 +47,9 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) func newSlackToModelProcess(cfg Config) processFunc { return func(ctx context.Context, slack []byte) ([]byte, error) { s, err := parseSlack(slack) + if cfg.Debug { + log.Printf("%v: %s => %+v", err, slack, s) + } if errors.Is(err, ErrIrrelevantMessage) { return nil, nil } else if err != nil {