From 9d7a175c62ec27809b64e332774472e96ab5abf4 Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:12:41 -0600 Subject: [PATCH] at least main_test runs --- config.go | 43 +++++++++++++---------- main.go | 96 ++++++---------------------------------------------- main_test.go | 5 ++- 3 files changed, 38 insertions(+), 106 deletions(-) diff --git a/config.go b/config.go index b91f859..f3bfcae 100644 --- a/config.go +++ b/config.go @@ -13,24 +13,25 @@ import ( ) type Config struct { - Port int - Debug bool - InitializeSlack bool - SlackToken string - SlackChannels []string - DriverConn string - BasicAuthUser string - BasicAuthPassword string - FillWithTestdata bool - OllamaURL string - OllamaModel string - LocalCheckpoint string - LocalTokenizer string - AssetPattern string - DatacenterPattern string - EventNamePattern string - driver Driver - ai AI + Port int + Debug bool + InitializeSlack bool + SlackToken string + SlackChannels []string + DriverConn string + BasicAuthUser string + BasicAuthPassword string + FillWithTestdata bool + OllamaURL string + OllamaModel string + LocalCheckpoint string + LocalTokenizer string + AssetPattern string + DatacenterPattern string + EventNamePattern string + driver Driver + ai AI + slackToMessagePipeline Pipeline } var ( @@ -124,5 +125,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, result.ai = NewAINoop() } + slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result) + if err != nil { + return Config{}, err + } + result.slackToMessagePipeline = slackToMessagePipeline + return result, nil } diff --git a/main.go b/main.go index 317dd4b..5cacef1 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "log" @@ -49,13 +48,7 @@ func processSlackToMessagePipeline(ctx context.Context, cfg Config) chan error { go func() { defer close(errs) select { - case errs <- func() error { - slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, cfg) - if err != nil { - return err - } - return slackToMessagePipeline.Process(ctx) - }(): + case errs <- cfg.slackToMessagePipeline.Process(ctx): case <-ctx.Done(): } }() @@ -139,14 +132,8 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { } errs := []error{} for _, messageJSON := range page.Messages { - m, err := ParseSlack(messageJSON, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - } else if err != nil { + if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { errs = append(errs, err) - } else if err := cfg.storage.Upsert(r.Context(), m); err != nil { - errs = append(errs, err) - } else { - log.Printf("re-ingested %v", m.ID) } } @@ -164,19 +151,7 @@ func newHandlerGetAPIV1EventNames(cfg Config) http.HandlerFunc { return } - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - eventNames, err := cfg.storage.EventNamesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"eventNames": eventNames}) + http.Error(w, "not impl", http.StatusNotImplemented) } } @@ -186,19 +161,7 @@ func newHandlerGetAPIV1Events(cfg Config) http.HandlerFunc { return } - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - events, err := cfg.storage.EventsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"events": events}) + http.Error(w, "not impl", http.StatusNotImplemented) } } @@ -208,19 +171,7 @@ func newHandlerGetAPIV1Messages(cfg Config) http.HandlerFunc { return } - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - messages, err := cfg.storage.MessagesSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"messages": messages}) + http.Error(w, "not impl", http.StatusNotImplemented) } } @@ -230,19 +181,7 @@ func newHandlerGetAPIV1Threads(cfg Config) http.HandlerFunc { return } - since, err := parseSince(r.URL.Query().Get("since")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - threads, err := cfg.storage.ThreadsSince(r.Context(), since) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"threads": threads}) + http.Error(w, "not impl", http.StatusNotImplemented) } } @@ -253,14 +192,9 @@ func newHandlerGetAPIV1ThreadsThread(cfg Config) http.HandlerFunc { } thread := strings.Split(strings.Split(r.URL.Path, "/threads/")[1], "/")[0] + _ = thread - messages, err := cfg.storage.Thread(r.Context(), thread) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - encodeResponse(w, r, map[string]any{"thread": messages}) + http.Error(w, "not impl", http.StatusNotImplemented) } } @@ -322,20 +256,12 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { return } - m, err := ParseSlack(b, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) - if errors.Is(err, ErrIrrelevantMessage) { - return - } else if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if err := cfg.storage.Upsert(r.Context(), m); err != nil { - log.Printf("failed to ingest %+v: %v", m, err) + if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil { + log.Printf("failed to ingest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - log.Printf("ingested %v", m.ID) + log.Printf("ingested") } } diff --git a/main_test.go b/main_test.go index 72e926c..5abcef9 100644 --- a/main_test.go +++ b/main_test.go @@ -43,9 +43,8 @@ func TestRun(t *testing.T) { cfg.AssetPattern = renderAssetPattern cfg.EventNamePattern = renderEventNamePattern cfg.Port = port - cfg.driver = NewRAM() - cfg.storage = NewStorage(cfg.driver) - cfg.queue = NewQueue(cfg.driver) + cfg.driver, _ = NewDriver(ctx, "") + cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg) cfg.SlackToken = "redacted" cfg.SlackChannels = []string{"C06U1DDBBU4"}