package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net" "net/http" "os/signal" "sort" "strconv" "strings" "syscall" "time" ) func main() { ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) defer can() cfg, err := newConfig(ctx) if err != nil { panic(err) } defer cfg.driver.Close() if err := run(ctx, cfg); err != nil && ctx.Err() == nil { panic(err) } } func run(ctx context.Context, cfg Config) error { select { case <-ctx.Done(): return ctx.Err() case err := <-processSlackToMessagePipeline(ctx, cfg): return err case err := <-listenAndServe(ctx, cfg): return err } } func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error { errs := make(chan error) go func() { defer close(errs) select { case errs <- cfg.slackToMessagePipeline.Process(ctx): case <-ctx.Done(): } }() return errs } func listenAndServe(ctx context.Context, cfg Config) chan error { s := http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), Handler: http.HandlerFunc(newHandler(cfg)), BaseContext: func(net.Listener) context.Context { return ctx }, } errc := make(chan error) go func() { defer close(errc) log.Printf("listening on %s", s.Addr) errc <- s.ListenAndServe() }() return errc } func newHandler(cfg Config) http.HandlerFunc { mux := http.NewServeMux() mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) return func(w http.ResponseWriter, r *http.Request) { if cfg.Debug { b, _ := io.ReadAll(r.Body) r.Body = io.NopCloser(bytes.NewReader(b)) log.Printf("%s %s | %s", r.Method, r.URL, b) } mux.ServeHTTP(w, r) } } func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } channel := r.Header.Get("slack-channel") token := r.Header.Get("slack-oauth-token") req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } req.Header.Set("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { http.Error(w, err.Error(), http.StatusBadGateway) return } defer resp.Body.Close() defer io.Copy(io.Discard, resp.Body) var page struct { OK bool Messages []json.RawMessage } if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { http.Error(w, err.Error(), http.StatusBadGateway) return } else if !page.OK { http.Error(w, "slack page was !.ok", http.StatusBadGateway) return } errs := []error{} for _, messageJSON := range page.Messages { if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { errs = append(errs, err) } } if len(errs) > 0 { http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) } } func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } http.Error(w, "not impl", http.StatusNotImplemented) } } func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } http.Error(w, "not impl", http.StatusNotImplemented) } } func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } http.Error(w, "not impl", http.StatusNotImplemented) } } func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } http.Error(w, "not impl", http.StatusNotImplemented) } } func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] _ = thread http.Error(w, "not impl", http.StatusNotImplemented) } } func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { http.Error(w, "shoo", http.StatusForbidden) return false } return true } func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { if cfg.InitializeSlack { return handlerPostAPIV1EventsSlackInitialize } return _newHandlerPostAPIV1EventsSlack(cfg) } func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { b, _ := io.ReadAll(r.Body) var challenge struct { Token string Challenge string Type string } if err := json.Unmarshal(b, &challenge); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) } func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { b, _ := io.ReadAll(r.Body) r.Body = io.NopCloser(bytes.NewReader(b)) var allowList struct { Token string Event struct { Channel string } } if err := json.Unmarshal(b, &allowList); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } else if allowList.Token != cfg.SlackToken { http.Error(w, "invalid .token", http.StatusForbidden) return } else if !func() bool { for _, slackChannel := range cfg.SlackChannels { if slackChannel == allowList.Event.Channel { return true } } return false }() { return } if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil { log.Printf("failed to ingest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } log.Printf("ingested") } } func parseSince(s string) (time.Time, error) { if s == "" { return time.Unix(0, 0), nil } if n, err := strconv.ParseInt(s, 10, 64); err != nil { } else { return time.Unix(n, 0), nil } if t, err := time.Parse(time.RFC3339, s); err != nil { } else { return t, nil } if t, err := time.Parse(time.RFC3339Nano, s); err != nil { } else { return t, nil } if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { } else { return t, nil } return time.Time{}, fmt.Errorf("failed to parse since=%q", s) } func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { if strings.Contains(r.Header.Get("Accept"), "text/csv") { return encodeCSVResponse(w, v) } if strings.Contains(r.Header.Get("Accept"), "text/tsv") { return encodeTSVResponse(w, v) } return encodeJSONResponse(w, v) } func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { return json.NewEncoder(w).Encode(v) } func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { return encodeSVResponse(w, v, "\t") } func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { return encodeSVResponse(w, v, ",") } func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { b, err := json.Marshal(v) if err != nil { return err } var data map[string][]map[string]json.RawMessage if err := json.Unmarshal(b, &data); err != nil { return err } var objects []map[string]json.RawMessage for k := range data { objects = data[k] } fields := []string{} for i := range objects { for k := range objects[i] { b, _ := json.Marshal(k) fields = append(fields, string(b)) } break } sort.Strings(fields) w.Write([]byte(strings.Join(fields, delim))) w.Write([]byte("\n")) for _, object := range objects { for j, field := range fields { json.Unmarshal([]byte(field), &field) if j > 0 { w.Write([]byte(delim)) } w.Write(object[field]) } w.Write([]byte("\n")) } return nil }