diff --git a/persistence.go b/persistence.go index 933fd10..b786665 100644 --- a/persistence.go +++ b/persistence.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" ) type ModelToPersistence struct { @@ -25,6 +26,7 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e if err != nil { return Pipeline{}, err } + writer = NewNoopQueue() return Pipeline{ writer: writer, reader: reader, @@ -54,6 +56,7 @@ func newModelToPersistenceProcess(storage Storage) processFunc { return nil, fmt.Errorf("failed to persist message: %w", err) } + log.Printf("persisted models") return json.Marshal(ModelIDs{ Event: m.Event.ID, Thread: m.Thread.ID, diff --git a/queue.go b/queue.go index 6593aa0..8ae6878 100644 --- a/queue.go +++ b/queue.go @@ -108,12 +108,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) { } func (q Queue) Ack(ctx context.Context, reservation string) error { + return q.ack(ctx, []byte(reservation)) +} + +func (q Queue) ack(ctx context.Context, reservation []byte) error { if q.driver.DB == nil { return nil } - _, err := q.driver.ExecContext(ctx, ` + result, err := q.driver.ExecContext(ctx, ` DELETE FROM queue WHERE reservation==? `, reservation) + if err != nil { + return err + } + if n, _ := result.RowsAffected(); n != 1 { + return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n) + } return err } diff --git a/slack.go b/slack.go index 45a282c..d020070 100644 --- a/slack.go +++ b/slack.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "regexp" "strconv" "strings" @@ -79,6 +80,8 @@ func newSlackToModelProcess(cfg Config) processFunc { if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" { thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event) } + + log.Printf("parsed slack message into models") return json.Marshal(Models{ Event: event, Message: message,