332 lines
7.7 KiB
Go
332 lines
7.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os/signal"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
func main() {
|
|
ctx, can := signal.NotifyContext(context.Background(), syscall.SIGINT)
|
|
defer can()
|
|
|
|
cfg, err := newConfig(ctx)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer cfg.driver.Close()
|
|
|
|
if err := run(ctx, cfg); err != nil && ctx.Err() == nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func run(ctx context.Context, cfg Config) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-processPipelines(ctx,
|
|
cfg.slackToModelPipeline,
|
|
cfg.modelToPersistencePipeline,
|
|
cfg.slackScrapePipeline,
|
|
cfg.persistenceToRecapPipeline,
|
|
):
|
|
return err
|
|
case err := <-listenAndServe(ctx, cfg):
|
|
return err
|
|
}
|
|
}
|
|
|
|
func processPipelines(ctx context.Context, first Pipeline, pipelines ...Pipeline) chan error {
|
|
ctx, can := context.WithCancel(ctx)
|
|
|
|
pipelines = append(pipelines, first)
|
|
errs := make(chan error)
|
|
for i := range pipelines {
|
|
go func(i int) {
|
|
defer can()
|
|
select {
|
|
case errs <- pipelines[i].Process(ctx):
|
|
case <-ctx.Done():
|
|
}
|
|
}(i)
|
|
}
|
|
return errs
|
|
}
|
|
|
|
func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|
s := http.Server{
|
|
Addr: fmt.Sprintf(":%d", cfg.Port),
|
|
Handler: http.HandlerFunc(newHandler(cfg)),
|
|
BaseContext: func(net.Listener) context.Context {
|
|
return ctx
|
|
},
|
|
}
|
|
|
|
errc := make(chan error)
|
|
go func() {
|
|
defer close(errc)
|
|
log.Printf("listening on %s", s.Addr)
|
|
errc <- s.ListenAndServe()
|
|
}()
|
|
|
|
return errc
|
|
}
|
|
|
|
func newHandler(cfg Config) http.HandlerFunc {
|
|
mux := http.NewServeMux()
|
|
|
|
mux.Handle("GET /api/v1/version", http.HandlerFunc(newHandlerGetAPIV1Version))
|
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
|
mux.Handle("PUT /api/v1/rpc/scrapeslack", http.HandlerFunc(newHandlerPutAPIV1RPCScrapeSlack(cfg)))
|
|
mux.Handle("GET /api/v1/rpc/recapevent", http.HandlerFunc(newHandlerGetAPIV1RPCRecapEvent(cfg)))
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if cfg.Debug {
|
|
b, _ := io.ReadAll(r.Body)
|
|
r.Body = io.NopCloser(bytes.NewReader(b))
|
|
log.Printf("%s %s | %s", r.Method, r.URL, b)
|
|
}
|
|
|
|
mux.ServeHTTP(w, r)
|
|
}
|
|
}
|
|
|
|
var Version = "undef"
|
|
|
|
func newHandlerGetAPIV1Version(w http.ResponseWriter, _ *http.Request) {
|
|
json.NewEncoder(w).Encode(map[string]any{"version": Version})
|
|
}
|
|
|
|
func newHandlerGetAPIV1RPCRecapEvent(cfg Config) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if !basicAuth(cfg, w, r) {
|
|
return
|
|
}
|
|
|
|
event := r.URL.Query().Get("id")
|
|
b, _ := json.Marshal(ModelIDs{Event: event})
|
|
if err := cfg.persistenceToRecapPipeline.reader.Enqueue(r.Context(), b); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
json.NewEncoder(w).Encode(map[string]any{"event": event})
|
|
}
|
|
}
|
|
|
|
func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if !basicAuth(cfg, w, r) {
|
|
return
|
|
}
|
|
|
|
since, err := parseSince(r.URL.Query().Get("since"))
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
job, _ := json.Marshal(SlackScrape{
|
|
Latest: time.Now().Unix(),
|
|
Oldest: since.Unix(),
|
|
ThreadTS: "",
|
|
Channel: r.Header.Get("slack-channel"),
|
|
Token: r.Header.Get("slack-oauth-token"),
|
|
})
|
|
if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func basicAuth(cfg Config, w http.ResponseWriter, r *http.Request) bool {
|
|
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
|
|
http.Error(w, "shoo", http.StatusForbidden)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|
if cfg.InitializeSlack {
|
|
return handlerPostAPIV1EventsSlackInitialize(cfg)
|
|
}
|
|
return _newHandlerPostAPIV1EventsSlack(cfg)
|
|
}
|
|
|
|
func handlerPostAPIV1EventsSlackInitialize(cfg Config) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
b, _ := io.ReadAll(r.Body)
|
|
var challenge struct {
|
|
Token string
|
|
Challenge string
|
|
Type string
|
|
}
|
|
if err := json.Unmarshal(b, &challenge); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
cfg.driver.ExecContext(r.Context(), `
|
|
CREATE TABLE
|
|
IF NOT EXISTS
|
|
initialization (
|
|
label TEXT,
|
|
token TEXT,
|
|
updated TIMESTAMP
|
|
)
|
|
`)
|
|
if _, err := cfg.driver.ExecContext(r.Context(), `
|
|
INSERT
|
|
INTO initialization (label, token, updated)
|
|
VALUES ('slack_events_webhook_token', $1, $2)
|
|
`, challenge.Token, time.Now().UTC()); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
log.Println("stashed new slack initialization token", challenge.Token)
|
|
encodeResponse(w, r, map[string]any{"challenge": challenge.Challenge})
|
|
}
|
|
}
|
|
|
|
func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
body, _ := io.ReadAll(r.Body)
|
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
|
|
|
var allowList struct {
|
|
Token string
|
|
Event struct {
|
|
Channel string
|
|
}
|
|
}
|
|
if err := json.Unmarshal(body, &allowList); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
} else if allowList.Token != cfg.SlackToken {
|
|
http.Error(w, "invalid .token", http.StatusForbidden)
|
|
return
|
|
} else if !func() bool {
|
|
for _, slackChannel := range cfg.SlackChannels {
|
|
if slackChannel == allowList.Event.Channel {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}() {
|
|
return
|
|
}
|
|
|
|
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), body); err != nil {
|
|
log.Printf("failed to ingest: %v", err)
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
log.Printf("ingested")
|
|
}
|
|
}
|
|
|
|
func parseSince(s string) (time.Time, error) {
|
|
if s == "" {
|
|
return time.Unix(0, 0), nil
|
|
}
|
|
|
|
if n, err := strconv.ParseInt(s, 10, 64); err != nil {
|
|
} else {
|
|
return time.Unix(n, 0), nil
|
|
}
|
|
|
|
if t, err := time.Parse(time.RFC3339, s); err != nil {
|
|
} else {
|
|
return t, nil
|
|
}
|
|
|
|
if t, err := time.Parse(time.RFC3339Nano, s); err != nil {
|
|
} else {
|
|
return t, nil
|
|
}
|
|
|
|
if t, err := time.ParseInLocation(time.DateOnly, s, time.Local); err != nil {
|
|
} else {
|
|
return t, nil
|
|
}
|
|
|
|
return time.Time{}, fmt.Errorf("failed to parse since=%q", s)
|
|
}
|
|
|
|
func encodeResponse(w http.ResponseWriter, r *http.Request, v interface{}) error {
|
|
if strings.Contains(r.Header.Get("Accept"), "text/csv") {
|
|
return encodeCSVResponse(w, v)
|
|
}
|
|
if strings.Contains(r.Header.Get("Accept"), "text/tsv") {
|
|
return encodeTSVResponse(w, v)
|
|
}
|
|
return encodeJSONResponse(w, v)
|
|
}
|
|
|
|
func encodeJSONResponse(w http.ResponseWriter, v interface{}) error {
|
|
return json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
func encodeTSVResponse(w http.ResponseWriter, v interface{}) error {
|
|
return encodeSVResponse(w, v, "\t")
|
|
}
|
|
|
|
func encodeCSVResponse(w http.ResponseWriter, v interface{}) error {
|
|
return encodeSVResponse(w, v, ",")
|
|
}
|
|
|
|
func encodeSVResponse(w http.ResponseWriter, v interface{}, delim string) error {
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var data map[string][]map[string]json.RawMessage
|
|
if err := json.Unmarshal(b, &data); err != nil {
|
|
return err
|
|
}
|
|
|
|
var objects []map[string]json.RawMessage
|
|
for k := range data {
|
|
objects = data[k]
|
|
}
|
|
|
|
fields := []string{}
|
|
for i := range objects {
|
|
for k := range objects[i] {
|
|
b, _ := json.Marshal(k)
|
|
fields = append(fields, string(b))
|
|
}
|
|
break
|
|
}
|
|
sort.Strings(fields)
|
|
|
|
w.Write([]byte(strings.Join(fields, delim)))
|
|
w.Write([]byte("\n"))
|
|
|
|
for _, object := range objects {
|
|
for j, field := range fields {
|
|
json.Unmarshal([]byte(field), &field)
|
|
if j > 0 {
|
|
w.Write([]byte(delim))
|
|
}
|
|
w.Write(object[field])
|
|
}
|
|
w.Write([]byte("\n"))
|
|
}
|
|
|
|
return nil
|
|
}
|