package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net" "net/http" "os/signal" "sort" "strconv" "strings" "syscall" "time" ) func main() { ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) defer can() cfg, err := newConfig(ctx) if err != nil { panic(err) } defer cfg.driver.Close() if err := run(ctx, cfg); err != nil && ctx.Err() == nil { panic(err) } } func run(ctx context.Context, cfg Config) error { select { case <-ctx.Done(): return ctx.Err() case err := <-processPipelines(ctx, cfg.slackToModelPipeline, cfg.modelToPersistencePipeline, cfg.slackScrapePipeline, cfg.persistenceToRecapPipeline, ): return err case err := <-listenAndServe(ctx, cfg): return err } } func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error { ctx, can := context.WithCancel(ctx) pipelines = append(pipelines, first) errs := make(chan error) for i := range pipelines { go func(i int) { defer can() select { case errs <- pipelines[i].Process(ctx): case <-ctx.Done(): } }(i) } return errs } func listenAndServe(ctx context.Context, cfg Config) chan error { s := http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), Handler: http.HandlerFunc(newHandler(cfg)), BaseContext: func(net.Listener) context.Context { return ctx }, } errc := make(chan error) go func() { defer close(errc) log.Printf("listening on %s", s.Addr) errc <- s.ListenAndServe() }() return errc } func newHandler(cfg Config) http.HandlerFunc { mux := http.NewServeMux() mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version)) mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg))) return func(w http.ResponseWriter, r *http.Request) { if cfg.Debug { b, _ := io.ReadAll(r.Body) r.Body = io.NopCloser(bytes.NewReader(b)) log.Printf("%s %s | %s", r.Method, r.URL, b) } mux.ServeHTTP(w, r) } } var Version = "undef" func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) { json.NewEncoder(w).Encode(map[string]any{"version": Version}) } func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } event := r.URL.Query().Get("id") b, _ := json.Marshal(ModelIDs{Event: event}) if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(map[string]any{"event": event}) } } func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !basicAuth(cfg, w, r) { return } since, err := parseSince(r.URL.Query().Get("since")) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } job, _ := json.Marshal(SlackScrape{ Latest: time.Now().Unix(), Oldest: since.Unix(), ThreadTS: "", Channel: r.Header.Get("slack-channel"), Token: r.Header.Get("slack-oauth-token"), }) if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { http.Error(w, "shoo", http.StatusForbidden) return false } return true } func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { if cfg.InitializeSlack { return handlerPostAPIV1EventsSlackInitialize(cfg) } return _newHandlerPostAPIV1EventsSlack(cfg) } func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { b, _ := io.ReadAll(r.Body) var challenge struct { Token string Challenge string Type string } if err := json.Unmarshal(b, &challenge); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } cfg.driver.ExecContext(r.Context(), ` CREATE TABLE IF NOT EXISTS initialization ( label TEXT, token TEXT, updated TIMESTAMP ) `) if _, err := cfg.driver.ExecContext(r.Context(), ` INSERT INTO initialization (label, token, updated) VALUES ('slack_events_webhook_token', $1, $2) `, challenge.Token, time.Now().UTC()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } log.Println("stashed new slack initialization token", challenge.Token) encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) } } func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) r.Body = io.NopCloser(bytes.NewReader(body)) var allowList struct { Token string Event struct { Channel string } } if err := json.Unmarshal(body, &allowList); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } else if allowList.Token != cfg.SlackToken { http.Error(w, "invalid .token", http.StatusForbidden) return } else if !func() bool { for _, slackChannel := range cfg.SlackChannels { if slackChannel == allowList.Event.Channel { return true } } return false }() { return } if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil { log.Printf("failed to ingest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } log.Printf("ingested") } } func parseSince(s string) (time.Time, error) { if s == "" { return time.Unix(0, 0), nil } if n, err := strconv.ParseInt(s, 10, 64); err != nil { } else { return time.Unix(n, 0), nil } if t, err := time.Parse(time.RFC3339, s); err != nil { } else { return t, nil } if t, err := time.Parse(time.RFC3339Nano, s); err != nil { } else { return t, nil } if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { } else { return t, nil } return time.Time{}, fmt.Errorf("failed to parse since=%q", s) } func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { if strings.Contains(r.Header.Get("Accept"), "text/csv") { return encodeCSVResponse(w, v) } if strings.Contains(r.Header.Get("Accept"), "text/tsv") { return encodeTSVResponse(w, v) } return encodeJSONResponse(w, v) } func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { return json.NewEncoder(w).Encode(v) } func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { return encodeSVResponse(w, v, "\t") } func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { return encodeSVResponse(w, v, ",") } func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { b, err := json.Marshal(v) if err != nil { return err } var data map[string][]map[string]json.RawMessage if err := json.Unmarshal(b, &data); err != nil { return err } var objects []map[string]json.RawMessage for k := range data { objects = data[k] } fields := []string{} for i := range objects { for k := range objects[i] { b, _ := json.Marshal(k) fields = append(fields, string(b)) } break } sort.Strings(fields) w.Write([]byte(strings.Join(fields, delim))) w.Write([]byte("\n")) for _, object := range objects { for j, field := range fields { json.Unmarshal([]byte(field), &field) if j > 0 { w.Write([]byte(delim)) } w.Write(object[field]) } w.Write([]byte("\n")) } return nil }