model to persist pipeline tests OK
parent
709f2ac254
commit
5fa21d0cd9
|
|
@ -2,15 +2,22 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ModelToPersistence struct {
|
type ModelToPersistence struct {
|
||||||
pipeline Pipeline
|
pipeline Pipeline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ModelIDs struct {
|
||||||
|
Event string
|
||||||
|
Message string
|
||||||
|
Thread string
|
||||||
|
}
|
||||||
|
|
||||||
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
|
||||||
reader, err := NewQueue(ctx, "new_message", cfg.driver)
|
reader, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
|
|
@ -21,12 +28,36 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
|
||||||
return Pipeline{
|
return Pipeline{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
process: newModelToPersistenceProcess(cfg.driver),
|
process: newModelToPersistenceProcess(cfg.storage),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newModelToPersistenceProcess(driver Driver) processFunc {
|
func newModelToPersistenceProcess(storage Storage) processFunc {
|
||||||
return func(ctx context.Context, msg []byte) ([]byte, error) {
|
return func(ctx context.Context, models []byte) ([]byte, error) {
|
||||||
return nil, errors.New("not impl")
|
var m Models
|
||||||
|
if err := json.Unmarshal(models, &m); err != nil {
|
||||||
|
return nil, fmt.Errorf("received non models payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Event.Empty() {
|
||||||
|
} else if err := storage.UpsertEvent(ctx, m.Event); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Thread.Empty() {
|
||||||
|
} else if err := storage.UpsertThread(ctx, m.Thread); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist thread: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.Empty() {
|
||||||
|
} else if err := storage.UpsertMessage(ctx, m.Message); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to persist message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(ModelIDs{
|
||||||
|
Event: m.Event.ID,
|
||||||
|
Thread: m.Thread.ID,
|
||||||
|
Message: m.Message.ID,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,11 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/breel-render/spoc-bot-vr/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestModelToPersistenceProcessor(t *testing.T) {
|
func TestModelToPersistenceProcessor(t *testing.T) {
|
||||||
|
|
@ -12,7 +15,49 @@ func TestModelToPersistenceProcessor(t *testing.T) {
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
d := NewTestDriver(t)
|
d := NewTestDriver(t)
|
||||||
process := newModelToPersistenceProcess(d)
|
s, _ := NewStorage(ctx, d)
|
||||||
|
process := newModelToPersistenceProcess(s)
|
||||||
|
|
||||||
_, _ = ctx, process
|
_, _ = ctx, process
|
||||||
|
|
||||||
|
inputModels := Models{
|
||||||
|
Event: model.Event{ID: "event", Asset: "event-asset"},
|
||||||
|
//Thread: {ID: "thread", Channel: "thread-channel"},
|
||||||
|
Message: model.Message{ID: "message", Plaintext: "message-plaintext"},
|
||||||
|
}
|
||||||
|
input, _ := json.Marshal(inputModels)
|
||||||
|
|
||||||
|
var outputModelIDs ModelIDs
|
||||||
|
var n int
|
||||||
|
if output, err := process(ctx, input); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := json.Unmarshal(output, &outputModelIDs); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if outputModelIDs != (ModelIDs{Event: "event", Message: "message"}) {
|
||||||
|
t.Error(outputModelIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`); row.Err() != nil {
|
||||||
|
t.Error("cant count events:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count events:", err)
|
||||||
|
} else if n != 1 {
|
||||||
|
t.Error("bad event count:", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM threads`); row.Err() != nil {
|
||||||
|
t.Error("cant count threads:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count threads:", err)
|
||||||
|
} else if n != 0 {
|
||||||
|
t.Error("bad thread count:", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if row := d.QueryRowContext(ctx, `SELECT COUNT(*) FROM messages`); row.Err() != nil {
|
||||||
|
t.Error("cant count messages:", row.Err())
|
||||||
|
} else if err := row.Scan(&n); err != nil {
|
||||||
|
t.Error("cant count messages:", err)
|
||||||
|
} else if n != 1 {
|
||||||
|
t.Error("bad message count:", n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
2
slack.go
2
slack.go
|
|
@ -32,7 +32,7 @@ func NewSlackToModelPipeline(ctx context.Context, cfg Config) (Pipeline, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
writer, err := NewQueue(ctx, "new_message", cfg.driver)
|
writer, err := NewQueue(ctx, "new_models", cfg.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Pipeline{}, err
|
return Pipeline{}, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue