ew compile errs
parent
580068d98b
commit
1dcffdd956
413
.main.go
413
.main.go
|
|
@ -1,413 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
|
||||
defer can()
|
||||
|
||||
cfg, err := newConfig(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer cfg.driver.Close()
|
||||
|
||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context, cfg Config) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-listenAndServe(ctx, cfg):
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func listenAndServe(ctx context.Context, cfg Config) chan error {
|
||||
s := http.Server{
|
||||
Addr: fmt.Sprintf(":%d", cfg.Port),
|
||||
Handler: http.HandlerFunc(newHandler(cfg)),
|
||||
BaseContext: func(net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
go func() {
|
||||
defer close(errc)
|
||||
log.Printf("listening on %s", s.Addr)
|
||||
errc <- s.ListenAndServe()
|
||||
}()
|
||||
|
||||
return errc
|
||||
}
|
||||
|
||||
func newHandler(cfg Config) http.HandlerFunc {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
||||
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
||||
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
||||
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if cfg.Debug {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
log.Printf("%s %s | %s", r.Method, r.URL, b)
|
||||
}
|
||||
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
channel := r.Header.Get("slack-channel")
|
||||
token := r.Header.Get("slack-oauth-token")
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
|
||||
var page struct {
|
||||
OK bool
|
||||
Messages []json.RawMessage
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
} else if !page.OK {
|
||||
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
} else if err != nil {
|
||||
errs = append(errs, err)
|
||||
} else if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
log.Printf("re-ingested %v", m.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
events, err := cfg.storage.EventsSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"events": events})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
messages, err := cfg.storage.MessagesSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"messages": messages})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"threads": threads})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
||||
|
||||
messages, err := cfg.storage.Thread(r.Context(), thread)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"thread": messages})
|
||||
}
|
||||
}
|
||||
|
||||
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
|
||||
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
|
||||
http.Error(w, "shoo", http.StatusForbidden)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
if cfg.InitializeSlack {
|
||||
return handlerPostAPIV1EventsSlackInitialize
|
||||
}
|
||||
return _newHandlerPostAPIV1EventsSlack(cfg)
|
||||
}
|
||||
|
||||
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
var challenge struct {
|
||||
Token string
|
||||
Challenge string
|
||||
Type string
|
||||
}
|
||||
if err := json.Unmarshal(b, &challenge); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
||||
}
|
||||
|
||||
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
|
||||
var allowList struct {
|
||||
Token string
|
||||
Event struct {
|
||||
Channel string
|
||||
}
|
||||
}
|
||||
if err := json.Unmarshal(b, &allowList); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
} else if allowList.Token != cfg.SlackToken {
|
||||
http.Error(w, "invalid .token", http.StatusForbidden)
|
||||
return
|
||||
} else if !func() bool {
|
||||
for _, slackChannel := range cfg.SlackChannels {
|
||||
if slackChannel == allowList.Event.Channel {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}() {
|
||||
return
|
||||
}
|
||||
|
||||
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
return
|
||||
} else if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||
log.Printf("failed to ingest %+v: %v", m, err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
log.Printf("ingested %v", m.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func parseSince(s string) (time.Time, error) {
|
||||
if s == "" {
|
||||
return time.Unix(0, 0), nil
|
||||
}
|
||||
|
||||
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
|
||||
} else {
|
||||
return time.Unix(n, 0), nil
|
||||
}
|
||||
|
||||
if t, err := time.Parse(time.RFC3339, s); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
|
||||
}
|
||||
|
||||
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
|
||||
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
|
||||
return encodeCSVResponse(w, v)
|
||||
}
|
||||
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
|
||||
return encodeTSVResponse(w, v)
|
||||
}
|
||||
return encodeJSONResponse(w, v)
|
||||
}
|
||||
|
||||
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return encodeSVResponse(w, v, "\t")
|
||||
}
|
||||
|
||||
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return encodeSVResponse(w, v, ",")
|
||||
}
|
||||
|
||||
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var data map[string][]map[string]json.RawMessage
|
||||
if err := json.Unmarshal(b, &data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var objects []map[string]json.RawMessage
|
||||
for k := range data {
|
||||
objects = data[k]
|
||||
}
|
||||
|
||||
fields := []string{}
|
||||
for i := range objects {
|
||||
for k := range objects[i] {
|
||||
b, _ := json.Marshal(k)
|
||||
fields = append(fields, string(b))
|
||||
}
|
||||
break
|
||||
}
|
||||
sort.Strings(fields)
|
||||
|
||||
w.Write([]byte(strings.Join(fields, delim)))
|
||||
w.Write([]byte("\n"))
|
||||
|
||||
for _, object := range objects {
|
||||
for j, field := range fields {
|
||||
json.Unmarshal([]byte(field), &field)
|
||||
if j > 0 {
|
||||
w.Write([]byte(delim))
|
||||
}
|
||||
w.Write(object[field])
|
||||
}
|
||||
w.Write([]byte("\n"))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
432
main.go
432
main.go
|
|
@ -1,3 +1,433 @@
|
|||
package main
|
||||
|
||||
func main() {}
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
|
||||
defer can()
|
||||
|
||||
cfg, err := newConfig(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer cfg.driver.Close()
|
||||
|
||||
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context, cfg Config) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-processSlackToMessagePipeline(ctx, cfg):
|
||||
return err
|
||||
case err := <-listenAndServe(ctx, cfg):
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error {
|
||||
errs := make(chan error)
|
||||
go func() {
|
||||
defer close(errs)
|
||||
select {
|
||||
case errs <- func() error {
|
||||
slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return slackToMessagePipeline.Process(ctx)
|
||||
}():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
return errs
|
||||
}
|
||||
|
||||
func listenAndServe(ctx context.Context, cfg Config) chan error {
|
||||
s := http.Server{
|
||||
Addr: fmt.Sprintf(":%d", cfg.Port),
|
||||
Handler: http.HandlerFunc(newHandler(cfg)),
|
||||
BaseContext: func(net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
go func() {
|
||||
defer close(errc)
|
||||
log.Printf("listening on %s", s.Addr)
|
||||
errc <- s.ListenAndServe()
|
||||
}()
|
||||
|
||||
return errc
|
||||
}
|
||||
|
||||
func newHandler(cfg Config) http.HandlerFunc {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("GET /api/v1/eventnames", http.HandlerFunc(newHandlerGetAPIV1EventNames(cfg)))
|
||||
mux.Handle("GET /api/v1/events", http.HandlerFunc(newHandlerGetAPIV1Events(cfg)))
|
||||
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Messages(cfg)))
|
||||
mux.Handle("GET /api/v1/threads", http.HandlerFunc(newHandlerGetAPIV1Threads(cfg)))
|
||||
mux.Handle("GET /api/v1/threads/{thread}", http.HandlerFunc(newHandlerGetAPIV1ThreadsThread(cfg)))
|
||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if cfg.Debug {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
log.Printf("%s %s | %s", r.Method, r.URL, b)
|
||||
}
|
||||
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
channel := r.Header.Get("slack-channel")
|
||||
token := r.Header.Get("slack-oauth-token")
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, "https://slack.com/api/conversations.history?channel="+channel, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer io.Copy(io.Discard, resp.Body)
|
||||
|
||||
var page struct {
|
||||
OK bool
|
||||
Messages []json.RawMessage
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&page); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
} else if !page.OK {
|
||||
http.Error(w, "slack page was !.ok", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
errs := []error{}
|
||||
for _, messageJSON := range page.Messages {
|
||||
m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
} else if err != nil {
|
||||
errs = append(errs, err)
|
||||
} else if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
log.Printf("re-ingested %v", m.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"scraped": len(page.Messages)})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
eventNames, err := cfg.storage.EventNamesSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"eventNames": eventNames})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
events, err := cfg.storage.EventsSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"events": events})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
messages, err := cfg.storage.MessagesSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"messages": messages})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
since, err := parseSince(r.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
threads, err := cfg.storage.ThreadsSince(r.Context(), since)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"threads": threads})
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if !basicAuth(cfg, w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0]
|
||||
|
||||
messages, err := cfg.storage.Thread(r.Context(), thread)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"thread": messages})
|
||||
}
|
||||
}
|
||||
|
||||
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
|
||||
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
|
||||
http.Error(w, "shoo", http.StatusForbidden)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
if cfg.InitializeSlack {
|
||||
return handlerPostAPIV1EventsSlackInitialize
|
||||
}
|
||||
return _newHandlerPostAPIV1EventsSlack(cfg)
|
||||
}
|
||||
|
||||
func handlerPostAPIV1EventsSlackInitialize(w http.ResponseWriter, r *http.Request) {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
var challenge struct {
|
||||
Token string
|
||||
Challenge string
|
||||
Type string
|
||||
}
|
||||
if err := json.Unmarshal(b, &challenge); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
||||
}
|
||||
|
||||
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
|
||||
var allowList struct {
|
||||
Token string
|
||||
Event struct {
|
||||
Channel string
|
||||
}
|
||||
}
|
||||
if err := json.Unmarshal(b, &allowList); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
} else if allowList.Token != cfg.SlackToken {
|
||||
http.Error(w, "invalid .token", http.StatusForbidden)
|
||||
return
|
||||
} else if !func() bool {
|
||||
for _, slackChannel := range cfg.SlackChannels {
|
||||
if slackChannel == allowList.Event.Channel {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}() {
|
||||
return
|
||||
}
|
||||
|
||||
m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern)
|
||||
if errors.Is(err, ErrIrrelevantMessage) {
|
||||
return
|
||||
} else if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||
log.Printf("failed to ingest %+v: %v", m, err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
log.Printf("ingested %v", m.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func parseSince(s string) (time.Time, error) {
|
||||
if s == "" {
|
||||
return time.Unix(0, 0), nil
|
||||
}
|
||||
|
||||
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
|
||||
} else {
|
||||
return time.Unix(n, 0), nil
|
||||
}
|
||||
|
||||
if t, err := time.Parse(time.RFC3339, s); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
|
||||
} else {
|
||||
return t, nil
|
||||
}
|
||||
|
||||
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
|
||||
}
|
||||
|
||||
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
|
||||
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
|
||||
return encodeCSVResponse(w, v)
|
||||
}
|
||||
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
|
||||
return encodeTSVResponse(w, v)
|
||||
}
|
||||
return encodeJSONResponse(w, v)
|
||||
}
|
||||
|
||||
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return encodeSVResponse(w, v, "\t")
|
||||
}
|
||||
|
||||
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
|
||||
return encodeSVResponse(w, v, ",")
|
||||
}
|
||||
|
||||
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var data map[string][]map[string]json.RawMessage
|
||||
if err := json.Unmarshal(b, &data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var objects []map[string]json.RawMessage
|
||||
for k := range data {
|
||||
objects = data[k]
|
||||
}
|
||||
|
||||
fields := []string{}
|
||||
for i := range objects {
|
||||
for k := range objects[i] {
|
||||
b, _ := json.Marshal(k)
|
||||
fields = append(fields, string(b))
|
||||
}
|
||||
break
|
||||
}
|
||||
sort.Strings(fields)
|
||||
|
||||
w.Write([]byte(strings.Join(fields, delim)))
|
||||
w.Write([]byte("\n"))
|
||||
|
||||
for _, object := range objects {
|
||||
for j, field := range fields {
|
||||
json.Unmarshal([]byte(field), &field)
|
||||
if j > 0 {
|
||||
w.Write([]byte(delim))
|
||||
}
|
||||
w.Write(object[field])
|
||||
}
|
||||
w.Write([]byte("\n"))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue