Impl storage mongo driver and config
parent
2c13814177
commit
1b051ee1d5
|
|
@ -0,0 +1,24 @@
|
|||
package config
|
||||
|
||||
import "local/args"
|
||||
|
||||
type Config struct {
|
||||
Port int
|
||||
DBURI string
|
||||
}
|
||||
|
||||
func New() Config {
|
||||
as := args.NewArgSet()
|
||||
|
||||
as.Append(args.INT, "p", "port to listen on", 18114)
|
||||
as.Append(args.STRING, "dburi", "database uri", "mongodb://localhost:27017")
|
||||
|
||||
if err := as.Parse(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return Config{
|
||||
Port: as.GetInt("p"),
|
||||
DBURI: as.GetString("dburi"),
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
#! /bin/bash
|
||||
|
||||
port=57017
|
||||
|
||||
if ! curl -sS localhost:$port > /dev/null; then
|
||||
prefix=/tmp/whodunit.db
|
||||
mkdir -p $prefix/data
|
||||
mongod \
|
||||
--dbpath $prefix/data \
|
||||
--logpath $prefix/log \
|
||||
--pidfilepath $prefix/pid \
|
||||
--port $port \
|
||||
--fork
|
||||
fi
|
||||
|
||||
mshell() {
|
||||
mongo \
|
||||
--quiet \
|
||||
--port $port \
|
||||
--eval "$*"
|
||||
}
|
||||
|
||||
export DBURI=${DB_URI:-"mongodb://localhost:$port"}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"local/whodunit/config"
|
||||
"log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
c := config.New()
|
||||
log.Println(c)
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
type FilterIn struct {
|
||||
Key string
|
||||
Values []interface{}
|
||||
}
|
||||
|
||||
func newFilterIn(key string, values interface{}) FilterIn {
|
||||
fi := FilterIn{Key: key}
|
||||
switch values.(type) {
|
||||
case []interface{}:
|
||||
fi.Values = values.([]interface{})
|
||||
if len(fi.Values) == 0 {
|
||||
return newFilterIn(key, nil)
|
||||
}
|
||||
case []string:
|
||||
value := values.([]string)
|
||||
fi.Values = make([]interface{}, len(value))
|
||||
for i := range value {
|
||||
fi.Values[i] = value[i]
|
||||
}
|
||||
if len(fi.Values) == 0 {
|
||||
return newFilterIn(key, nil)
|
||||
}
|
||||
case []int:
|
||||
value := values.([]int)
|
||||
fi.Values = make([]interface{}, len(value))
|
||||
for i := range value {
|
||||
fi.Values[i] = value[i]
|
||||
}
|
||||
if len(fi.Values) == 0 {
|
||||
return newFilterIn(key, nil)
|
||||
}
|
||||
case nil:
|
||||
fi.Key = ""
|
||||
default:
|
||||
panic(fmt.Sprintf("cannot convert values to filter in: %T", values))
|
||||
}
|
||||
return fi
|
||||
}
|
||||
|
||||
func (fi FilterIn) MarshalBSON() ([]byte, error) {
|
||||
if len(fi.Key) == 0 {
|
||||
return bson.Marshal(map[string]interface{}{})
|
||||
}
|
||||
return bson.Marshal(map[string]map[string][]interface{}{
|
||||
fi.Key: map[string][]interface{}{
|
||||
"$in": fi.Values,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
type Graph struct {
|
||||
mongo Mongo
|
||||
}
|
||||
|
||||
func NewGraph() Graph {
|
||||
mongo := NewMongo()
|
||||
return Graph{
|
||||
mongo: mongo,
|
||||
}
|
||||
}
|
||||
|
||||
func (g Graph) List(ctx context.Context, from ...string) ([]One, error) {
|
||||
filter := newFilterIn("_id", from)
|
||||
ch, err := g.mongo.Find(ctx, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ones []One
|
||||
for one := range ch {
|
||||
var o One
|
||||
if err := bson.Unmarshal(one, &o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ones = append(ones, o)
|
||||
}
|
||||
return ones, nil
|
||||
}
|
||||
|
||||
func (g Graph) Insert(ctx context.Context, one One) error {
|
||||
return g.mongo.Insert(ctx, one)
|
||||
}
|
||||
|
||||
func (g Graph) Update(ctx context.Context, one One, modify interface{}) error {
|
||||
return g.mongo.Update(ctx, one, modify)
|
||||
}
|
||||
|
|
@ -0,0 +1,128 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIntegration(t *testing.T) {
|
||||
if len(os.Getenv("INTEGRATION")) > 0 {
|
||||
t.Logf("skipping because $INTEGRATION unset")
|
||||
return
|
||||
}
|
||||
|
||||
os.Args = os.Args[:1]
|
||||
graph := NewGraph()
|
||||
graph.mongo.db = "test-db"
|
||||
graph.mongo.col = "test-col"
|
||||
ctx, can := context.WithCancel(context.TODO())
|
||||
defer can()
|
||||
clean := func() {
|
||||
graph.mongo.client.Database(graph.mongo.db).Collection(graph.mongo.col).DeleteMany(ctx, map[string]interface{}{})
|
||||
}
|
||||
clean()
|
||||
defer clean()
|
||||
|
||||
ones := []One{
|
||||
One{ID: "A", Relation: ":)"},
|
||||
One{ID: "B", Relation: ":("},
|
||||
One{ID: "C", Relation: ":/"},
|
||||
}
|
||||
ones[0].Know = []One{ones[len(ones)-1]}
|
||||
ones[0].Know[0].Relation = ":D"
|
||||
|
||||
t.Run("graph.Insert(...)", func(t *testing.T) {
|
||||
for _, one := range ones {
|
||||
err := graph.Insert(ctx, one)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("graph.List", func(t *testing.T) {
|
||||
all, err := graph.List(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("\nall = %+v", all)
|
||||
if len(all) != 3 {
|
||||
t.Fatalf("%+v: %+v", len(all), all)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("graph.List(foo => *)", func(t *testing.T) {
|
||||
some, err := graph.List(ctx, ones[0].Knows()...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("\nsom = %+v", some)
|
||||
if len(some) != 1 {
|
||||
t.Fatalf("%+v: %+v", len(some), some)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("graph.Update(foo, --bar)", func(t *testing.T) {
|
||||
err := graph.Update(ctx, ones[0].Min(), Set{"know", []interface{}{}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
some, err := graph.List(ctx, ones[0].ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("\nsm' = %+v", some)
|
||||
|
||||
if len(some) != 1 {
|
||||
t.Fatal(len(some))
|
||||
}
|
||||
if some[0].ID != ones[0].ID {
|
||||
t.Fatal(some[0].ID)
|
||||
}
|
||||
if len(some[0].Knows()) > 0 {
|
||||
t.Fatal(some[0].Knows())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("graph.Update(foo, ++...); graph.Update(foo, --if :()", func(t *testing.T) {
|
||||
err := graph.Update(ctx, ones[0].Min(), Set{"know", ones})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
some1, err := graph.List(ctx, ones[0].ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("sm1 = %+v", some1[0])
|
||||
if len(some1) != 1 {
|
||||
t.Fatal(len(some1))
|
||||
}
|
||||
if len(some1[0].Knows()) != len(ones) {
|
||||
t.Fatal(some1[0].Knows())
|
||||
}
|
||||
|
||||
err = graph.Update(ctx, ones[0].Min(), PopIf{"know", map[string]string{"relation": ":("}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
some2, err := graph.List(ctx, ones[0].ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("sm2 = %+v", some2[0])
|
||||
|
||||
if len(some1) != len(some2) {
|
||||
t.Fatal(len(some2))
|
||||
}
|
||||
if len(some1[0].Knows()) == len(some2[0].Knows()) {
|
||||
t.Fatal(len(some2[0].Knows()))
|
||||
}
|
||||
if len(some2[0].Knows()) == len(ones) {
|
||||
t.Fatal(len(some2[0].Knows()))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
type Unset string
|
||||
|
||||
func (u Unset) MarshalBSON() ([]byte, error) {
|
||||
return opMarshal("$unset", string(u), "")
|
||||
}
|
||||
|
||||
type PopIf struct {
|
||||
Key string
|
||||
Filter interface{}
|
||||
}
|
||||
|
||||
func (pi PopIf) MarshalBSON() ([]byte, error) {
|
||||
return opMarshal("$pull", pi.Key, pi.Filter)
|
||||
}
|
||||
|
||||
type Set struct {
|
||||
Key string
|
||||
Value interface{}
|
||||
}
|
||||
|
||||
func (s Set) MarshalBSON() ([]byte, error) {
|
||||
return opMarshal("$set", s.Key, s.Value)
|
||||
}
|
||||
|
||||
type Push struct {
|
||||
Key string
|
||||
Value interface{}
|
||||
}
|
||||
|
||||
func (p Push) MarshalBSON() ([]byte, error) {
|
||||
return opMarshal("$push", p.Key, p.Value)
|
||||
}
|
||||
|
||||
func opMarshal(op, key string, value interface{}) ([]byte, error) {
|
||||
if len(key) == 0 {
|
||||
return nil, fmt.Errorf("no key to %s", op)
|
||||
}
|
||||
return bson.Marshal(map[string]interface{}{
|
||||
op: map[string]interface{}{
|
||||
key: value,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"local/whodunit/config"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
type Mongo struct {
|
||||
client *mongo.Client
|
||||
db string
|
||||
col string
|
||||
}
|
||||
|
||||
func NewMongo() Mongo {
|
||||
opts := options.Client().ApplyURI(config.New().DBURI)
|
||||
c, err := mongo.NewClient(opts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := c.Connect(context.TODO()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
||||
defer can()
|
||||
if _, err := c.ListDatabaseNames(ctx, map[string]interface{}{}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return Mongo{
|
||||
client: c,
|
||||
db: "db",
|
||||
col: "col",
|
||||
}
|
||||
}
|
||||
|
||||
func (m Mongo) Find(ctx context.Context, filter interface{}) (chan bson.Raw, error) {
|
||||
c := m.client.Database(m.db).Collection(m.col)
|
||||
cursor, err := c.Find(ctx, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m.page(ctx, cursor), nil
|
||||
}
|
||||
|
||||
func (m Mongo) page(ctx context.Context, cursor *mongo.Cursor) chan bson.Raw {
|
||||
ch := make(chan bson.Raw)
|
||||
go func(chan<- bson.Raw) {
|
||||
defer close(ch)
|
||||
defer cursor.Close(ctx)
|
||||
for cursor.Next(ctx) {
|
||||
ch <- cursor.Current
|
||||
}
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
func (m Mongo) Update(ctx context.Context, filter, apply interface{}) error {
|
||||
c := m.client.Database(m.db).Collection(m.col)
|
||||
_, err := c.UpdateOne(ctx, filter, apply)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m Mongo) Insert(ctx context.Context, apply interface{}) error {
|
||||
b, err := bson.Marshal(apply)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mapp map[string]interface{}
|
||||
if err := bson.Unmarshal(b, &mapp); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := mapp["_id"]; !ok {
|
||||
return errors.New("no _id in new object")
|
||||
} else if _, ok := mapp["_id"].(string); !ok {
|
||||
return errors.New("non-string _id in new object")
|
||||
}
|
||||
c := m.client.Database(m.db).Collection(m.col)
|
||||
_, err = c.InsertOne(ctx, apply)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m Mongo) Delete(ctx context.Context, filter interface{}) error {
|
||||
c := m.client.Database(m.db).Collection(m.col)
|
||||
_, err := c.DeleteOne(ctx, filter)
|
||||
return err
|
||||
}
|
||||
|
||||
func peekBSON(v interface{}) {
|
||||
b, err := bson.Marshal(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var m map[string]interface{}
|
||||
if err := bson.Unmarshal(b, &m); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log.Printf("PEEK: %+v", m)
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package storage
|
||||
|
||||
type One struct {
|
||||
ID string `bson:"_id,omitempty"`
|
||||
Relation string `bson:"relation,omitempty"`
|
||||
Meta map[string]interface{} `bson:"meta,omitempty"`
|
||||
Know []One `bson:"know,omitempty"`
|
||||
}
|
||||
|
||||
func (o One) Knows() []string {
|
||||
knows := make([]string, len(o.Know))
|
||||
for i := range o.Know {
|
||||
knows[i] = o.Know[i].ID
|
||||
}
|
||||
return knows
|
||||
}
|
||||
|
||||
func (o One) Min() One {
|
||||
return One{ID: o.ID}
|
||||
}
|
||||
Loading…
Reference in New Issue