From 1dcffdd956e70cc104e914e5645f35efec8dba6a Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:04:12 -0600 Subject: [PATCH] ew compile errs --- .main.go | 413 -------------------------------- main.go | 432 +++++++++++++++++++++++++++++++++- .main_test.go => main_test.go | 0 3 files changed, 431 insertions(+), 414 deletions(-) delete mode 100644 .main.go rename .main_test.go => main_test.go (100%) diff --git a/.main.go b/.main.go deleted file mode 100644 index a716c42..0000000 --- a/.main.go +++ /dev/null @@ -1,413 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "net" - "net/http" - "os/signal" - "sort" - "strconv" - "strings" - "syscall" - "time" -) - -func main() { - ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) - defer can() - - cfg, err := newConfig(ctx) - if err != nil { - panic(err) - } - defer cfg.driver.Close() - - if err := run(ctx, cfg); err != nil && ctx.Err() == nil { - panic(err) - } -} - -func run(ctx context.Context, cfg Config) error { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-listenAndServe(ctx, cfg): - return err - } -} - -func listenAndServe(ctx context.Context, cfg Config) chan error { - s := http.Server{ - Addr: fmt.Sprintf(":%d", cfg.Port), - Handler: http.HandlerFunc(newHandler(cfg)), - BaseContext: func(net.Listener) context.Context { - return ctx - }, - } - - errc := make(chan error) - go func() { - defer close(errc) - log.Printf("listening on %s", s.Addr) - errc <- s.ListenAndServe() - }() - - return errc -} - -func newHandler(cfg Config) http.HandlerFunc { - mux := http.NewServeMux() - - mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) - mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) - mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) - mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) - mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) - mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) - mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) - - return func(w http.ResponseWriter, r *http.Request) { - if cfg.Debug { - b, _ := io.ReadAll(r.Body) - r.Body = io.NopCloser(bytes.NewReader(b)) - log.Printf("%s %s | %s", r.Method, r.URL, b) - } - - mux.ServeHTTP(w, r) - } -} - -func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - channel := r.Header.Get("slack-channel") - token := r.Header.Get("slack-oauth-token") - - req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - req.Header.Set("Authorization", "Bearer "+token) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer resp.Body.Close() - defer io.Copy(io.Discard, resp.Body) - - var page struct { - OK bool - Messages []json.RawMessage - } - if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } else if !page.OK { - http.Error(w, "slack page was !.ok", http.StatusBadGateway) - return - } - errs := []error{} - for _, messageJSON := range page.Messages { - m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - } else if err != nil { - errs = append(errs, err) - } else if err := cfg.storage.Upsert(r.Context(), m); err != nil { - errs = append(errs, err) - } else { - log.Printf("re-ingested %v", m.ID) - } - } - - if len(errs) > 0 { - http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) - return - } - json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) - } -} - -func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"eventNames": eventNames}) - } -} - -func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - events, err := cfg.storage.EventsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"events": events}) - } -} - -func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - messages, err := cfg.storage.MessagesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"messages": messages}) - } -} - -func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - threads, err := cfg.storage.ThreadsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"threads": threads}) - } -} - -func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if !basicAuth(cfg, w, r) { - return - } - - thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] - - messages, err := cfg.storage.Thread(r.Context(), thread) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"thread": messages}) - } -} - -func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { - if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { - http.Error(w, "shoo", http.StatusForbidden) - return false - } - return true -} - -func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { - if cfg.InitializeSlack { - return handlerPostAPIV1EventsSlackInitialize - } - return _newHandlerPostAPIV1EventsSlack(cfg) -} - -func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { - b, _ := io.ReadAll(r.Body) - var challenge struct { - Token string - Challenge string - Type string - } - if err := json.Unmarshal(b, &challenge); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) -} - -func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := io.ReadAll(r.Body) - r.Body = io.NopCloser(bytes.NewReader(b)) - - var allowList struct { - Token string - Event struct { - Channel string - } - } - if err := json.Unmarshal(b, &allowList); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } else if allowList.Token != cfg.SlackToken { - http.Error(w, "invalid .token", http.StatusForbidden) - return - } else if !func() bool { - for _, slackChannel := range cfg.SlackChannels { - if slackChannel == allowList.Event.Channel { - return true - } - } - return false - }() { - return - } - - m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - return - } else if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if err := cfg.storage.Upsert(r.Context(), m); err != nil { - log.Printf("failed to ingest %+v: %v", m, err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - log.Printf("ingested %v", m.ID) - } -} - -func parseSince(s string) (time.Time, error) { - if s == "" { - return time.Unix(0, 0), nil - } - - if n, err := strconv.ParseInt(s, 10, 64); err != nil { - } else { - return time.Unix(n, 0), nil - } - - if t, err := time.Parse(time.RFC3339, s); err != nil { - } else { - return t, nil - } - - if t, err := time.Parse(time.RFC3339Nano, s); err != nil { - } else { - return t, nil - } - - if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { - } else { - return t, nil - } - - return time.Time{}, fmt.Errorf("failed to parse since=%q", s) -} - -func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { - if strings.Contains(r.Header.Get("Accept"), "text/csv") { - return encodeCSVResponse(w, v) - } - if strings.Contains(r.Header.Get("Accept"), "text/tsv") { - return encodeTSVResponse(w, v) - } - return encodeJSONResponse(w, v) -} - -func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { - return json.NewEncoder(w).Encode(v) -} - -func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { - return encodeSVResponse(w, v, "\t") -} - -func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { - return encodeSVResponse(w, v, ",") -} - -func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { - b, err := json.Marshal(v) - if err != nil { - return err - } - - var data map[string][]map[string]json.RawMessage - if err := json.Unmarshal(b, &data); err != nil { - return err - } - - var objects []map[string]json.RawMessage - for k := range data { - objects = data[k] - } - - fields := []string{} - for i := range objects { - for k := range objects[i] { - b, _ := json.Marshal(k) - fields = append(fields, string(b)) - } - break - } - sort.Strings(fields) - - w.Write([]byte(strings.Join(fields, delim))) - w.Write([]byte("\n")) - - for _, object := range objects { - for j, field := range fields { - json.Unmarshal([]byte(field), &field) - if j > 0 { - w.Write([]byte(delim)) - } - w.Write(object[field]) - } - w.Write([]byte("\n")) - } - - return nil -} diff --git a/main.go b/main.go index 38dd16d..317dd4b 100644 --- a/main.go +++ b/main.go @@ -1,3 +1,433 @@ package main -func main() {} +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "os/signal" + "sort" + "strconv" + "strings" + "syscall" + "time" +) + +func main() { + ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT) + defer can() + + cfg, err := newConfig(ctx) + if err != nil { + panic(err) + } + defer cfg.driver.Close() + + if err := run(ctx, cfg); err != nil && ctx.Err() == nil { + panic(err) + } +} + +func run(ctx context.Context, cfg Config) error { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-processSlackToMessagePipeline(ctx, cfg): + return err + case err := <-listenAndServe(ctx, cfg): + return err + } +} + +func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error { + errs := make(chan error) + go func() { + defer close(errs) + select { + case errs <- func() error { + slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, cfg) + if err != nil { + return err + } + return slackToMessagePipeline.Process(ctx) + }(): + case <-ctx.Done(): + } + }() + return errs +} + +func listenAndServe(ctx context.Context, cfg Config) chan error { + s := http.Server{ + Addr: fmt.Sprintf(":%d", cfg.Port), + Handler: http.HandlerFunc(newHandler(cfg)), + BaseContext: func(net.Listener) context.Context { + return ctx + }, + } + + errc := make(chan error) + go func() { + defer close(errc) + log.Printf("listening on %s", s.Addr) + errc <- s.ListenAndServe() + }() + + return errc +} + +func newHandler(cfg Config) http.HandlerFunc { + mux := http.NewServeMux() + + mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg))) + mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg))) + mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg))) + mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg))) + mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg))) + mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg))) + mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg))) + + return func(w http.ResponseWriter, r *http.Request) { + if cfg.Debug { + b, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewReader(b)) + log.Printf("%s %s | %s", r.Method, r.URL, b) + } + + mux.ServeHTTP(w, r) + } +} + +func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + channel := r.Header.Get("slack-channel") + token := r.Header.Get("slack-oauth-token") + + req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer resp.Body.Close() + defer io.Copy(io.Discard, resp.Body) + + var page struct { + OK bool + Messages []json.RawMessage + } + if err := json.NewDecoder(resp.Body).Decode(&page); err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } else if !page.OK { + http.Error(w, "slack page was !.ok", http.StatusBadGateway) + return + } + errs := []error{} + for _, messageJSON := range page.Messages { + m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) + if errors.Is(err, ErrIrrelevantMessage) { + } else if err != nil { + errs = append(errs, err) + } else if err := cfg.storage.Upsert(r.Context(), m); err != nil { + errs = append(errs, err) + } else { + log.Printf("re-ingested %v", m.ID) + } + } + + if len(errs) > 0 { + http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError) + return + } + json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)}) + } +} + +func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"eventNames": eventNames}) + } +} + +func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + events, err := cfg.storage.EventsSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"events": events}) + } +} + +func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + messages, err := cfg.storage.MessagesSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"messages": messages}) + } +} + +func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + since, err := parseSince(r.URL.Query().Get("since")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + threads, err := cfg.storage.ThreadsSince(r.Context(), since) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"threads": threads}) + } +} + +func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !basicAuth(cfg, w, r) { + return + } + + thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] + + messages, err := cfg.storage.Thread(r.Context(), thread) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + encodeResponse(w, r, map[string]any{"thread": messages}) + } +} + +func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool { + if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword { + http.Error(w, "shoo", http.StatusForbidden) + return false + } + return true +} + +func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { + if cfg.InitializeSlack { + return handlerPostAPIV1EventsSlackInitialize + } + return _newHandlerPostAPIV1EventsSlack(cfg) +} + +func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + var challenge struct { + Token string + Challenge string + Type string + } + if err := json.Unmarshal(b, &challenge); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge}) +} + +func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewReader(b)) + + var allowList struct { + Token string + Event struct { + Channel string + } + } + if err := json.Unmarshal(b, &allowList); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } else if allowList.Token != cfg.SlackToken { + http.Error(w, "invalid .token", http.StatusForbidden) + return + } else if !func() bool { + for _, slackChannel := range cfg.SlackChannels { + if slackChannel == allowList.Event.Channel { + return true + } + } + return false + }() { + return + } + + m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) + if errors.Is(err, ErrIrrelevantMessage) { + return + } else if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := cfg.storage.Upsert(r.Context(), m); err != nil { + log.Printf("failed to ingest %+v: %v", m, err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.Printf("ingested %v", m.ID) + } +} + +func parseSince(s string) (time.Time, error) { + if s == "" { + return time.Unix(0, 0), nil + } + + if n, err := strconv.ParseInt(s, 10, 64); err != nil { + } else { + return time.Unix(n, 0), nil + } + + if t, err := time.Parse(time.RFC3339, s); err != nil { + } else { + return t, nil + } + + if t, err := time.Parse(time.RFC3339Nano, s); err != nil { + } else { + return t, nil + } + + if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil { + } else { + return t, nil + } + + return time.Time{}, fmt.Errorf("failed to parse since=%q", s) +} + +func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error { + if strings.Contains(r.Header.Get("Accept"), "text/csv") { + return encodeCSVResponse(w, v) + } + if strings.Contains(r.Header.Get("Accept"), "text/tsv") { + return encodeTSVResponse(w, v) + } + return encodeJSONResponse(w, v) +} + +func encodeJSONResponse(w http.ResponseWriter, v interface{}) error { + return json.NewEncoder(w).Encode(v) +} + +func encodeTSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, "\t") +} + +func encodeCSVResponse(w http.ResponseWriter, v interface{}) error { + return encodeSVResponse(w, v, ",") +} + +func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + + var data map[string][]map[string]json.RawMessage + if err := json.Unmarshal(b, &data); err != nil { + return err + } + + var objects []map[string]json.RawMessage + for k := range data { + objects = data[k] + } + + fields := []string{} + for i := range objects { + for k := range objects[i] { + b, _ := json.Marshal(k) + fields = append(fields, string(b)) + } + break + } + sort.Strings(fields) + + w.Write([]byte(strings.Join(fields, delim))) + w.Write([]byte("\n")) + + for _, object := range objects { + for j, field := range fields { + json.Unmarshal([]byte(field), &field) + if j > 0 { + w.Write([]byte(delim)) + } + w.Write(object[field]) + } + w.Write([]byte("\n")) + } + + return nil +} diff --git a/.main_test.go b/main_test.go similarity index 100% rename from .main_test.go rename to main_test.go