oof string != byte arr got it
parent
39c0056190
commit
d88a8bb23a
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ModelToPersistence struct {
|
type ModelToPersistence struct {
|
||||||
|
|
@ -25,6 +26,7 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
|
writer = NewNoopQueue()
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
|
|
@ -54,6 +56,7 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||||
return nil, fmt.Errorf("failed to persist message: %w", err)
|
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("persisted models")
|
||||||
return json.Marshal(ModelIDs{
|
return json.Marshal(ModelIDs{
|
||||||
Event: m.Event.ID,
|
Event: m.Event.ID,
|
||||||
Thread: m.Thread.ID,
|
Thread: m.Thread.ID,
|
||||||
|
|
|
||||||
12
queue.go
12
queue.go
|
|
@ -108,12 +108,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
func (q Queue) Ack(ctx context.Context, reservation string) error {
|
||||||
|
return q.ack(ctx, []byte(reservation))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q Queue) ack(ctx context.Context, reservation []byte) error {
|
||||||
if q.driver.DB == nil {
|
if q.driver.DB == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := q.driver.ExecContext(ctx, `
|
result, err := q.driver.ExecContext(ctx, `
|
||||||
DELETE FROM queue
|
DELETE FROM queue
|
||||||
WHERE reservation==?
|
WHERE reservation==?
|
||||||
`, reservation)
|
`, reservation)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if n, _ := result.RowsAffected(); n != 1 {
|
||||||
|
return fmt.Errorf("failed to ack %s: %v rows affected", reservation, n)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
3
slack.go
3
slack.go
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -79,6 +80,8 @@ func newSlackToModelProcess(cfg Config) processFunc {
|
||||||
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
if s.Thread != "" && s.Source != "" && s.TS > 0 && s.Event != "" {
|
||||||
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("parsed slack message into models")
|
||||||
return json.Marshal(Models{
|
return json.Marshal(Models{
|
||||||
Event: event,
|
Event: event,
|
||||||
Message: message,
|
Message: message,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue