diff --git a/message.go b/.message.go similarity index 100% rename from message.go rename to .message.go diff --git a/message_test.go b/.message_test.go similarity index 100% rename from message_test.go rename to .message_test.go diff --git a/config.go b/config.go index 315014c..6f2bc7c 100644 --- a/config.go +++ b/config.go @@ -13,28 +13,27 @@ import ( ) type Config struct { - Port int - Debug bool - InitializeSlack bool - SlackToken string - SlackChannels []string - DriverConn string - BasicAuthUser string - BasicAuthPassword string - FillWithTestdata bool - OllamaURL string - OllamaModel string - LocalCheckpoint string - LocalTokenizer string - AssetPattern string - DatacenterPattern string - EventNamePattern string - driver Driver - storage Storage - ai AI - slackToMessagePipeline Pipeline - messageToPersistencePipeline Pipeline - persistenceToNormalizedPipeline Pipeline + Port int + Debug bool + InitializeSlack bool + SlackToken string + SlackChannels []string + DriverConn string + BasicAuthUser string + BasicAuthPassword string + FillWithTestdata bool + OllamaURL string + OllamaModel string + LocalCheckpoint string + LocalTokenizer string + AssetPattern string + DatacenterPattern string + EventNamePattern string + driver Driver + storage Storage + ai AI + slackToModelPipeline Pipeline + modelToPersistencePipeline Pipeline } var ( @@ -134,23 +133,17 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config, result.ai = NewAINoop() } - slackToMessagePipeline, err := NewSlackToMessagePipeline(ctx, result) + slackToModelPipeline, err := NewSlackToModelPipeline(ctx, result) if err != nil { return Config{}, err } - result.slackToMessagePipeline = slackToMessagePipeline + result.slackToModelPipeline = slackToModelPipeline - messageToPersistencePipeline, err := NewMessageToPersistencePipeline(ctx, result) + modelToPersistencePipeline, err := NewModelToPersistencePipeline(ctx, result) if err != nil { return Config{}, err } - result.messageToPersistencePipeline = messageToPersistencePipeline - - persistenceToNormalizedPipeline, err := NewPersistenceToNormalizedPipeline(ctx, result) - if err != nil { - return Config{}, err - } - result.persistenceToNormalizedPipeline = persistenceToNormalizedPipeline + result.modelToPersistencePipeline = modelToPersistencePipeline return result, nil } diff --git a/main.go b/main.go index ad7a27a..c1047e3 100644 --- a/main.go +++ b/main.go @@ -37,9 +37,8 @@ func run(ctx context.Context, cfg Config) error { case <-ctx.Done(): return ctx.Err() case err := <-processPipelines(ctx, - cfg.slackToMessagePipeline, - cfg.messageToPersistencePipeline, - cfg.persistenceToNormalizedPipeline, + cfg.slackToModelPipeline, + cfg.modelToPersistencePipeline, ): return err case err := <-listenAndServe(ctx, cfg): @@ -142,7 +141,7 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc { } errs := []error{} for _, messageJSON := range page.Messages { - if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { + if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), messageJSON); err != nil { errs = append(errs, err) } } @@ -266,7 +265,7 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc { return } - if err := cfg.slackToMessagePipeline.reader.Enqueue(r.Context(), b); err != nil { + if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil { log.Printf("failed to ingest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/main_test.go b/main_test.go index 14c1e49..779c74e 100644 --- a/main_test.go +++ b/main_test.go @@ -45,7 +45,6 @@ func TestRun(t *testing.T) { cfg.EventNamePattern = renderEventNamePattern cfg.Port = port cfg.driver = NewTestDriver(t) - cfg.slackToMessagePipeline, _ = NewSlackToMessagePipeline(ctx, cfg) cfg.SlackToken = "redacted" cfg.SlackChannels = []string{"C06U1DDBBU4"} @@ -96,7 +95,7 @@ func TestRun(t *testing.T) { t.Fatalf("(%d) %s", resp.StatusCode, b) } var result struct { - Messages []Message + Messages []any } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatal(err) @@ -189,7 +188,7 @@ func TestRun(t *testing.T) { } var result struct { - Thread []Message + Thread []any } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { t.Fatal(err) diff --git a/normalize.go b/normalize.go deleted file mode 100644 index 1f4a474..0000000 --- a/normalize.go +++ /dev/null @@ -1,32 +0,0 @@ -package main - -import ( - "context" - "errors" -) - -type PersistenceToNormalized struct { - pipeline Pipeline -} - -func NewPersistenceToNormalizedPipeline(ctx context.Context, cfg Config) (Pipeline, error) { - reader, err := NewQueue(ctx, "new_message", cfg.driver) - if err != nil { - return Pipeline{}, err - } - writer, err := NewQueue(ctx, "new_persistence", cfg.driver) - if err != nil { - return Pipeline{}, err - } - return Pipeline{ - writer: writer, - reader: reader, - process: newPersistenceToNormalizedProcess(cfg.driver), - }, nil -} - -func newPersistenceToNormalizedProcess(driver Driver) processFunc { - return func(ctx context.Context, msg []byte) ([]byte, error) { - return nil, errors.New("not impl") - } -} diff --git a/normalize_test.go b/normalize_test.go deleted file mode 100644 index 4523d5a..0000000 --- a/normalize_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package main - -import ( - "context" - "testing" - "time" -) - -func TestPersistenceToNormalizedProcessor(t *testing.T) { - t.Parallel() - ctx, can := context.WithTimeout(context.Background(), time.Second*10) - defer can() - - d := NewTestDriver(t) - process := newPersistenceToNormalizedProcess(d) - - _, _ = ctx, process -} diff --git a/persistence.go b/persistence.go index 8fbc9f7..7ce003d 100644 --- a/persistence.go +++ b/persistence.go @@ -2,13 +2,14 @@ package main import ( "context" + "errors" ) -type MessageToPersistence struct { +type ModelToPersistence struct { pipeline Pipeline } -func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { +func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { reader, err := NewQueue(ctx, "new_message", cfg.driver) if err != nil { return Pipeline{}, err @@ -20,19 +21,12 @@ func NewMessageToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, return Pipeline{ writer: writer, reader: reader, - process: newMessageToPersistenceProcess(cfg.storage), + process: newModelToPersistenceProcess(cfg.driver), }, nil } -func newMessageToPersistenceProcess(storage Storage) processFunc { +func newModelToPersistenceProcess(driver Driver) processFunc { return func(ctx context.Context, msg []byte) ([]byte, error) { - m, err := Deserialize(msg) - if err != nil { - return nil, err - } - if err := storage.UpsertMessage(ctx, m); err != nil { - return nil, err - } - return msg, nil + return nil, errors.New("not impl") } } diff --git a/persistence_test.go b/persistence_test.go index d12adf3..4256c91 100644 --- a/persistence_test.go +++ b/persistence_test.go @@ -6,29 +6,13 @@ import ( "time" ) -func TestMessageToPersistenceProcessor(t *testing.T) { +func TestModelToPersistenceProcessor(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*10) defer can() d := NewTestDriver(t) - process := newMessageToPersistenceProcess(d) + process := newModelToPersistenceProcess(d) - if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil { - t.Fatal(err) - } else if _, err := process(ctx, []byte(`{"ID":"x"}`)); err != nil { - t.Fatal("failed to upsert on redundant process", err) - } - - var id, v []byte - row := d.QueryRowContext(ctx, `SELECT * FROM messages WHERE id=?`, "x") - if err := row.Err(); err != nil { - t.Fatal(err) - } else if err := row.Scan(&id, &v); err != nil { - t.Fatal(err) - } else if string(id) != "x" { - t.Fatal(string(id)) - } else if string(v) != `{"ID":"x"}` { - t.Fatal(string(v)) - } + _, _ = ctx, process } diff --git a/slack.go b/slack.go index a4cd85e..e9cd912 100644 --- a/slack.go +++ b/slack.go @@ -2,14 +2,15 @@ package main import ( "context" + "errors" "fmt" ) -type SlackToMessage struct { +type SlackToModel struct { pipeline Pipeline } -func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error) { +func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error) { reader, err := NewQueue(ctx, "slack_event", cfg.driver) if err != nil { return Pipeline{}, err @@ -21,11 +22,11 @@ func NewSlackToMessagePipeline(ctx context.Context, cfg Config) (Pipeline, error return Pipeline{ writer: writer, reader: reader, - process: newSlackToMessageProcess(cfg), + process: newSlackToModelProcess(cfg), }, nil } -func newSlackToMessageProcess(cfg Config) processFunc { +func newSlackToModelProcess(cfg Config) processFunc { return func(ctx context.Context, slack []byte) ([]byte, error) { m, err := ParseSlack(slack, cfg.AssetPattern, cfg.DatacenterPattern, cfg.EventNamePattern) if err != nil { @@ -34,3 +35,7 @@ func newSlackToMessageProcess(cfg Config) processFunc { return m.Serialize(), nil } } + +func ParseSlack([]byte, string, string, string) (interface{ Serialize() []byte }, error) { + return nil, errors.New("not impl") +} diff --git a/slack_test.go b/slack_test.go index c4ea67a..eab83ed 100644 --- a/slack_test.go +++ b/slack_test.go @@ -2,19 +2,16 @@ package main import ( "context" - "os" "testing" "time" - - "gotest.tools/v3/assert" ) -func TestSlackToMessagePipeline(t *testing.T) { +func TestSlackToModelPipeline(t *testing.T) { t.Parallel() ctx, can := context.WithTimeout(context.Background(), time.Second*5) defer can() - pipeline, err := NewSlackToMessagePipeline(ctx, Config{driver: NewTestDriver(t)}) + pipeline, err := NewSlackToModelPipeline(ctx, Config{driver: NewTestDriver(t)}) if err != nil { t.Fatal(err) } @@ -24,27 +21,29 @@ func TestSlackToMessagePipeline(t *testing.T) { } }() - want := Message{ - ID: "1712927439.728409/1712927439", - TS: 1712927439, - Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409", - Channel: "C06U1DDBBU4", - Thread: "1712927439.728409", - EventName: "", - Event: "11071", - Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", - Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", - Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", - } + /* + want := Message{ + ID: "1712927439.728409/1712927439", + TS: 1712927439, + Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712927439728409", + Channel: "C06U1DDBBU4", + Thread: "1712927439.728409", + EventName: "", + Event: "11071", + Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", + Asset: "At least one alertconfig run has failed unexpectedly.\nDashboard: \nPanel: \nSource: ", + Datacenter: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", + } - b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json") - if err := pipeline.reader.Enqueue(ctx, b); err != nil { - t.Fatal("failed to enqueue", err) - } - if _, b2, err := pipeline.writer.Syn(ctx); err != nil { - t.Fatal("failed to syn", err) - } else if m := MustDeserialize(b2); false { - } else { - assert.DeepEqual(t, want, m) - } + b, _ := os.ReadFile("testdata/slack_events/opsgenie_alert.json") + if err := pipeline.reader.Enqueue(ctx, b); err != nil { + t.Fatal("failed to enqueue", err) + } + if _, b2, err := pipeline.writer.Syn(ctx); err != nil { + t.Fatal("failed to syn", err) + } else if m := MustDeserialize(b2); false { + } else { + assert.DeepEqual(t, want, m) + } + */ }