package main import ( "context" "encoding/json" "fmt" "log" ) type ModelToPersistence struct { pipeline Pipeline } type ModelIDs struct { Event string Message string Thread string } func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) { reader, err := NewQueue(ctx, "new_models", cfg.driver) if err != nil { return Pipeline{}, err } writer, err := NewQueue(ctx, "new_persistence", cfg.driver) if err != nil { return Pipeline{}, err } writer = NewNoopQueue() return Pipeline{ writer: writer, reader: reader, process: newModelToPersistenceProcess(cfg, cfg.storage), }, nil } func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc { return func(ctx context.Context, models []byte) ([]byte, error) { var m Models if err := json.Unmarshal(models, &m); err != nil { return nil, fmt.Errorf("received non models payload: %w", err) } if m.Event.Empty() { } else if err := storage.UpsertEvent(ctx, m.Event); err != nil { return nil, fmt.Errorf("failed to persist event: %w", err) } if m.Thread.Empty() { } else if err := storage.UpsertThread(ctx, m.Thread); err != nil { return nil, fmt.Errorf("failed to persist thread: %w", err) } if m.Message.Empty() { } else if err := storage.UpsertMessage(ctx, m.Message); err != nil { return nil, fmt.Errorf("failed to persist message: %w", err) } if cfg.Debug { log.Printf("persisted models") } return json.Marshal(ModelIDs{ Event: m.Event.ID, Thread: m.Thread.ID, Message: m.Message.ID, }) } }