109 lines
2.2 KiB
Go
109 lines
2.2 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"os/exec"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
type Mongo struct {
|
|
client *mongo.Client
|
|
ns string
|
|
page int
|
|
}
|
|
|
|
func init() {
|
|
go func() {
|
|
kick := func() error {
|
|
cmd := exec.Command("bash", "-c", "true; until [ $(basename $PWD) == cheqbooq ]; do cd ..; done; NOFORK=1 bash ./testdata/start_mdb.sh")
|
|
b, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
log.Printf("%s", b)
|
|
}
|
|
return err
|
|
}
|
|
block := func() error {
|
|
cmd := exec.Command("bash", "-c", "true; tail --pid=$(ps aux | grep mongod | grep -v grep | awk '{print $2}') -f /dev/null")
|
|
b, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
log.Printf("%s", b)
|
|
}
|
|
return err
|
|
}
|
|
for {
|
|
if err := kick(); err != nil {
|
|
log.Println(err)
|
|
}
|
|
if err := block(); err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func NewMongo(page int, ns, addr string) (*Mongo, error) {
|
|
ctx, can := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer can()
|
|
|
|
opt := options.Client()
|
|
opt.ApplyURI(addr)
|
|
|
|
client, err := mongo.Connect(ctx, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Mongo{
|
|
client: client,
|
|
ns: ns,
|
|
}, client.Ping(ctx, nil)
|
|
}
|
|
|
|
func (m *Mongo) Close() error {
|
|
return m.client.Disconnect(context.TODO())
|
|
}
|
|
|
|
func (m *Mongo) Account() *mongo.Collection {
|
|
return m.client.Database(m.ns).Collection("account")
|
|
}
|
|
|
|
func (m *Mongo) Balance() *mongo.Collection {
|
|
return m.client.Database(m.ns).Collection("balance")
|
|
}
|
|
|
|
func (m *Mongo) Transaction() *mongo.Collection {
|
|
return m.client.Database(m.ns).Collection("transaction")
|
|
}
|
|
|
|
func (m *Mongo) Find(c *mongo.Collection, where interface{}, next func() interface{}) error {
|
|
ctx, can := context.WithCancel(context.TODO())
|
|
defer can()
|
|
|
|
cur, err := c.Find(ctx, where, options.Find().SetLimit(int64(m.page)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer cur.Close(ctx)
|
|
|
|
for cur.Next(ctx) {
|
|
ptr := next()
|
|
if err := cur.Decode(ptr); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return cur.Err()
|
|
}
|
|
|
|
func (m *Mongo) Upsert(c *mongo.Collection, where, op interface{}) error {
|
|
ctx, can := context.WithCancel(context.TODO())
|
|
defer can()
|
|
|
|
_, err := c.UpdateMany(ctx, where, op, options.Update().SetUpsert(true))
|
|
return err
|
|
}
|