VENDOR intensifies

This commit is contained in:
bel
2020-03-13 03:41:54 +00:00
parent 0d6be1e9d8
commit a1cea7d1cb
1427 changed files with 527540 additions and 1 deletions

View File

@@ -0,0 +1,11 @@
MongoDB Go Driver Low-Level Driver Library
==========================================
The packages within this library allow users to build applications using a low-level driver
interface. Knowledge of the internals of a MongoDB driver are assumed, so this library contains
advanced features. The aim of this library is to provide an easy to use, high performance
implementation of a low-level driver.
This Library's API is experimental and subject to change. Packages may be changed or removed without
notice. These APIs are not stable and do not guarantee backward compatibility.
**THIS LIBRARY IS EXPERIMENTAL AND SUBJECT TO CHANGE.**

View File

@@ -0,0 +1,74 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// AbortTransaction handles the full cycle dispatch and execution of abortting a transaction
// against the provided topology.
func AbortTransaction(
ctx context.Context,
cmd command.AbortTransaction,
topo *topology.Topology,
selector description.ServerSelector,
) (result.TransactionResult, error) {
res, err := abortTransaction(ctx, cmd, topo, selector, nil)
if cerr, ok := err.(command.Error); ok && err != nil {
// Retry if appropriate
if cerr.Retryable() {
res, err = abortTransaction(ctx, cmd, topo, selector, cerr)
}
}
return res, err
}
func abortTransaction(
ctx context.Context,
cmd command.AbortTransaction,
topo *topology.Topology,
selector description.ServerSelector,
oldErr error,
) (result.TransactionResult, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
// If retrying server selection, return the original error if it fails
if oldErr != nil {
return result.TransactionResult{}, oldErr
}
return result.TransactionResult{}, err
}
desc := ss.Description()
if oldErr != nil && (!topo.SupportsSessions() || !description.SessionsSupported(desc.WireVersion)) {
// Assuming we are retrying (oldErr != nil),
// if server doesn't support retryable writes, return the original error
// Conditions for retry write support are the same as that of sessions
return result.TransactionResult{}, oldErr
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.TransactionResult{}, oldErr
}
return result.TransactionResult{}, err
}
defer conn.Close()
return cmd.RoundTrip(ctx, desc, conn)
}

View File

@@ -0,0 +1,204 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// Aggregate handles the full cycle dispatch and execution of an aggregate command against the provided
// topology.
func Aggregate(
ctx context.Context,
cmd command.Aggregate,
topo *topology.Topology,
readSelector, writeSelector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
registry *bsoncodec.Registry,
opts ...*options.AggregateOptions,
) (*BatchCursor, error) {
dollarOut := cmd.HasDollarOut()
var ss *topology.SelectedServer
var err error
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
writeSelector = cmd.Session.PinnedServer
readSelector = cmd.Session.PinnedServer
}
switch dollarOut {
case true:
ss, err = topo.SelectServerLegacy(ctx, writeSelector)
if err != nil {
return nil, err
}
case false:
ss, err = topo.SelectServerLegacy(ctx, readSelector)
if err != nil {
return nil, err
}
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return nil, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
}
aggOpts := options.MergeAggregateOptions(opts...)
if aggOpts.AllowDiskUse != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"allowDiskUse", bsonx.Boolean(*aggOpts.AllowDiskUse)})
}
var batchSize int32
if aggOpts.BatchSize != nil {
elem := bsonx.Elem{"batchSize", bsonx.Int32(*aggOpts.BatchSize)}
cmd.Opts = append(cmd.Opts, elem)
cmd.CursorOpts = append(cmd.CursorOpts, elem)
batchSize = *aggOpts.BatchSize
}
if aggOpts.BypassDocumentValidation != nil && desc.WireVersion.Includes(4) {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*aggOpts.BypassDocumentValidation)})
}
if aggOpts.Collation != nil {
if desc.WireVersion.Max < 5 {
return nil, ErrCollation
}
collDoc, err := bsonx.ReadDoc(aggOpts.Collation.ToDocument())
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if aggOpts.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*aggOpts.MaxTime / time.Millisecond))})
}
if aggOpts.MaxAwaitTime != nil {
// specified as maxTimeMS on getMore commands
cmd.CursorOpts = append(cmd.CursorOpts, bsonx.Elem{
"maxTimeMS", bsonx.Int64(int64(*aggOpts.MaxAwaitTime / time.Millisecond)),
})
}
if aggOpts.Comment != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"comment", bsonx.String(*aggOpts.Comment)})
}
if aggOpts.Hint != nil {
hintElem, err := interfaceToElement("hint", aggOpts.Hint, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, hintElem)
}
res, err := cmd.RoundTrip(ctx, desc, conn)
if err != nil {
if wce, ok := err.(result.WriteConcernError); ok {
ss.ProcessWriteConcernError(&wce)
}
closeImplicitSession(cmd.Session)
return nil, err
}
if desc.WireVersion.Max < 4 {
return buildLegacyCommandBatchCursor(res, batchSize, ss.Server)
}
return NewBatchCursor(bsoncore.Document(res), cmd.Session, cmd.Clock, ss.Server, cmd.CursorOpts...)
}
func buildLegacyCommandBatchCursor(rdr bson.Raw, batchSize int32, server *topology.Server) (*BatchCursor, error) {
firstBatchDocs, ns, cursorID, err := getCursorValues(rdr)
if err != nil {
return nil, err
}
return NewLegacyBatchCursor(ns, cursorID, firstBatchDocs, 0, batchSize, server)
}
// get the firstBatch, cursor ID, and namespace from a bson.Raw
//
// TODO(GODRIVER-617): Change the documents return value into []bsoncore.Document.
func getCursorValues(result bson.Raw) (*bsoncore.DocumentSequence, command.Namespace, int64, error) {
cur, err := result.LookupErr("cursor")
if err != nil {
return nil, command.Namespace{}, 0, err
}
if cur.Type != bson.TypeEmbeddedDocument {
return nil, command.Namespace{}, 0, fmt.Errorf("cursor should be an embedded document but it is a BSON %s", cur.Type)
}
elems, err := cur.Document().Elements()
if err != nil {
return nil, command.Namespace{}, 0, err
}
var ok bool
var namespace command.Namespace
var cursorID int64
batch := new(bsoncore.DocumentSequence)
for _, elem := range elems {
switch elem.Key() {
case "firstBatch":
arr, ok := elem.Value().ArrayOK()
if !ok {
return nil, command.Namespace{}, 0, fmt.Errorf("firstBatch should be an array but it is a BSON %s", elem.Value().Type)
}
batch.Style = bsoncore.ArrayStyle
batch.Data = arr
case "ns":
if elem.Value().Type != bson.TypeString {
return nil, command.Namespace{}, 0, fmt.Errorf("namespace should be a string but it is a BSON %s", elem.Value().Type)
}
namespace = command.ParseNamespace(elem.Value().StringValue())
err = namespace.Validate()
if err != nil {
return nil, command.Namespace{}, 0, err
}
case "id":
cursorID, ok = elem.Value().Int64OK()
if !ok {
return nil, command.Namespace{}, 0, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
}
}
}
return batch, namespace, cursorID, nil
}

View File

@@ -0,0 +1,454 @@
package driverlegacy
import (
"context"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/wiremessage"
)
// BatchCursor is a batch implementation of a cursor. It returns documents in entire batches instead
// of one at a time. An individual document cursor can be built on top of this batch cursor.
type BatchCursor struct {
clientSession *session.Client
clock *session.ClusterClock
namespace command.Namespace
id int64
err error
server *topology.Server
opts []bsonx.Elem
currentBatch *bsoncore.DocumentSequence
firstBatch bool
batchNumber int
postBatchResumeToken bsoncore.Document
// legacy server (< 3.2) fields
batchSize int32
limit int32
numReturned int32 // number of docs returned by server
}
// NewBatchCursor creates a new BatchCursor from the provided parameters.
func NewBatchCursor(result bsoncore.Document, clientSession *session.Client, clock *session.ClusterClock, server *topology.Server, opts ...bsonx.Elem) (*BatchCursor, error) {
cur, err := result.LookupErr("cursor")
if err != nil {
return nil, err
}
if cur.Type != bson.TypeEmbeddedDocument {
return nil, fmt.Errorf("cursor should be an embedded document but it is a BSON %s", cur.Type)
}
elems, err := cur.Document().Elements()
if err != nil {
return nil, err
}
bc := &BatchCursor{
clientSession: clientSession,
clock: clock,
server: server,
opts: opts,
firstBatch: true,
currentBatch: new(bsoncore.DocumentSequence),
}
var ok bool
for _, elem := range elems {
switch elem.Key() {
case "firstBatch":
arr, ok := elem.Value().ArrayOK()
if !ok {
return nil, fmt.Errorf("firstBatch should be an array but it is a BSON %s", elem.Value().Type)
}
bc.currentBatch.Style = bsoncore.ArrayStyle
bc.currentBatch.Data = arr
case "ns":
if elem.Value().Type != bson.TypeString {
return nil, fmt.Errorf("namespace should be a string but it is a BSON %s", elem.Value().Type)
}
namespace := command.ParseNamespace(elem.Value().StringValue())
err = namespace.Validate()
if err != nil {
return nil, err
}
bc.namespace = namespace
case "id":
bc.id, ok = elem.Value().Int64OK()
if !ok {
return nil, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
}
case "postBatchResumeToken":
pbrt, ok := elem.Value().DocumentOK()
if !ok {
return nil, fmt.Errorf("post batch resume token should be a document but it is a BSON %s", elem.Value().Type)
}
bc.postBatchResumeToken = pbrt
}
}
// close session if everything fits in first batch
if bc.id == 0 {
bc.closeImplicitSession()
}
return bc, nil
}
// NewEmptyBatchCursor returns a batch cursor that is empty.
func NewEmptyBatchCursor() *BatchCursor {
return &BatchCursor{currentBatch: new(bsoncore.DocumentSequence)}
}
// NewLegacyBatchCursor creates a new BatchCursor for server versions 3.0 and below from the
// provided parameters.
//
// TODO(GODRIVER-617): The batch parameter here should be []bsoncore.Document. Change it to this
// once we have the new wiremessage package that uses bsoncore instead of bson.
func NewLegacyBatchCursor(ns command.Namespace, cursorID int64, ds *bsoncore.DocumentSequence, limit int32, batchSize int32, server *topology.Server) (*BatchCursor, error) {
dsCount := ds.DocumentCount()
bc := &BatchCursor{
id: cursorID,
server: server,
namespace: ns,
limit: limit,
batchSize: batchSize,
numReturned: int32(dsCount),
firstBatch: true,
}
// take as many documents from the batch as needed
if limit != 0 && limit < int32(dsCount) {
for i := int32(0); i < limit; i++ {
_, err := ds.Next()
if err != nil {
return nil, err
}
}
ds.Data = ds.Data[:ds.Pos]
ds.ResetIterator()
}
bc.currentBatch = ds
return bc, nil
}
// ID returns the cursor ID for this batch cursor.
func (bc *BatchCursor) ID() int64 {
return bc.id
}
// Next indicates if there is another batch available. Returning false does not necessarily indicate
// that the cursor is closed. This method will return false when an empty batch is returned.
//
// If Next returns true, there is a valid batch of documents available. If Next returns false, there
// is not a valid batch of documents available.
func (bc *BatchCursor) Next(ctx context.Context) bool {
if ctx == nil {
ctx = context.Background()
}
if bc.firstBatch {
bc.firstBatch = false
return true
}
if bc.id == 0 || bc.server == nil {
return false
}
if bc.legacy() {
bc.legacyGetMore(ctx)
} else {
bc.getMore(ctx)
}
switch bc.currentBatch.Style {
case bsoncore.SequenceStyle:
return len(bc.currentBatch.Data) > 0
case bsoncore.ArrayStyle:
return len(bc.currentBatch.Data) > 5
default:
return false
}
}
// Batch will return a DocumentSequence for the current batch of documents. The returned
// DocumentSequence is only valid until the next call to Next or Close.
func (bc *BatchCursor) Batch() *bsoncore.DocumentSequence { return bc.currentBatch }
// Server returns a pointer to the cursor's server.
func (bc *BatchCursor) Server() driver.Server { return bc.server }
// Err returns the latest error encountered.
func (bc *BatchCursor) Err() error { return bc.err }
// Close closes this batch cursor.
func (bc *BatchCursor) Close(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
if bc.server == nil {
return nil
}
if bc.legacy() {
return bc.legacyKillCursor(ctx)
}
defer bc.closeImplicitSession()
conn, err := bc.server.ConnectionLegacy(ctx)
if err != nil {
return err
}
_, err = (&command.KillCursors{
Clock: bc.clock,
NS: bc.namespace,
IDs: []int64{bc.id},
}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
if err != nil {
_ = conn.Close() // The command response error is more important here
return err
}
bc.id = 0
bc.currentBatch.Data = nil
bc.currentBatch.Style = 0
bc.currentBatch.ResetIterator()
return conn.Close()
}
// PostBatchResumeToken returns the latest seen post batch resume token.
func (bc *BatchCursor) PostBatchResumeToken() bsoncore.Document {
return bc.postBatchResumeToken
}
func (bc *BatchCursor) closeImplicitSession() {
if bc.clientSession != nil && bc.clientSession.SessionType == session.Implicit {
bc.clientSession.EndSession()
}
}
func (bc *BatchCursor) clearBatch() {
bc.currentBatch.Data = bc.currentBatch.Data[:0]
}
func (bc *BatchCursor) getMore(ctx context.Context) {
bc.clearBatch()
if bc.id == 0 {
return
}
conn, err := bc.server.ConnectionLegacy(ctx)
if err != nil {
bc.err = err
return
}
response, err := (&command.GetMore{
Clock: bc.clock,
ID: bc.id,
NS: bc.namespace,
Opts: bc.opts,
Session: bc.clientSession,
}).RoundTrip(ctx, bc.server.SelectedDescription(), conn)
if err != nil {
_ = conn.Close() // The command response error is more important here
bc.err = err
return
}
err = conn.Close()
if err != nil {
bc.err = err
return
}
id, err := response.LookupErr("cursor", "id")
if err != nil {
bc.err = err
return
}
var ok bool
bc.id, ok = id.Int64OK()
if !ok {
bc.err = fmt.Errorf("BSON Type %s is not %s", id.Type, bson.TypeInt64)
return
}
// if this is the last getMore, close the session
if bc.id == 0 {
bc.closeImplicitSession()
}
batch, err := response.LookupErr("cursor", "nextBatch")
if err != nil {
bc.err = err
return
}
var arr bson.Raw
arr, ok = batch.ArrayOK()
if !ok {
bc.err = fmt.Errorf("BSON Type %s is not %s", batch.Type, bson.TypeArray)
return
}
bc.currentBatch.Style = bsoncore.ArrayStyle
bc.currentBatch.Data = arr
bc.currentBatch.ResetIterator()
pbrt, err := response.LookupErr("cursor", "postBatchResumeToken")
if err != nil {
// don't set bc.err because post batch resume token is only returned on server versions >= 4.0.7
return
}
pbrtDoc, ok := pbrt.DocumentOK()
if !ok {
bc.err = fmt.Errorf("expected BSON type for post batch resume token to be EmbeddedDocument but got %s", pbrt.Type)
return
}
bc.postBatchResumeToken = bsoncore.Document(pbrtDoc)
}
func (bc *BatchCursor) legacy() bool {
return bc.server.Description().WireVersion == nil || bc.server.Description().WireVersion.Max < 4
}
func (bc *BatchCursor) legacyKillCursor(ctx context.Context) error {
conn, err := bc.server.ConnectionLegacy(ctx)
if err != nil {
return err
}
kc := wiremessage.KillCursors{
NumberOfCursorIDs: 1,
CursorIDs: []int64{bc.id},
CollectionName: bc.namespace.Collection,
DatabaseName: bc.namespace.DB,
}
err = conn.WriteWireMessage(ctx, kc)
if err != nil {
_ = conn.Close()
return err
}
err = conn.Close() // no reply from OP_KILL_CURSORS
if err != nil {
return err
}
bc.id = 0
bc.clearBatch()
return nil
}
func (bc *BatchCursor) legacyGetMore(ctx context.Context) {
bc.clearBatch()
if bc.id == 0 {
return
}
conn, err := bc.server.ConnectionLegacy(ctx)
if err != nil {
bc.err = err
return
}
numToReturn := bc.batchSize
if bc.limit != 0 && bc.numReturned+bc.batchSize > bc.limit {
numToReturn = bc.limit - bc.numReturned
if numToReturn <= 0 {
err = bc.Close(ctx)
if err != nil {
bc.err = err
}
return
}
}
gm := wiremessage.GetMore{
FullCollectionName: bc.namespace.DB + "." + bc.namespace.Collection,
CursorID: bc.id,
NumberToReturn: numToReturn,
}
err = conn.WriteWireMessage(ctx, gm)
if err != nil {
_ = conn.Close()
bc.err = err
return
}
response, err := conn.ReadWireMessage(ctx)
if err != nil {
_ = conn.Close()
bc.err = err
return
}
err = conn.Close()
if err != nil {
bc.err = err
return
}
reply, ok := response.(wiremessage.Reply)
if !ok {
bc.err = errors.New("did not receive OP_REPLY response")
return
}
err = validateGetMoreReply(reply)
if err != nil {
bc.err = err
return
}
bc.id = reply.CursorID
bc.numReturned += reply.NumberReturned
if bc.limit != 0 && bc.numReturned >= bc.limit {
err = bc.Close(ctx)
if err != nil {
bc.err = err
return
}
}
// TODO(GODRIVER-617): When the wiremessage package is updated, we should ensure we can get all
// of the documents as a single slice instead of having to reslice.
bc.currentBatch.Style = bsoncore.SequenceStyle
for _, doc := range reply.Documents {
bc.currentBatch.Data = append(bc.currentBatch.Data, doc...)
}
bc.currentBatch.ResetIterator()
}
func validateGetMoreReply(reply wiremessage.Reply) error {
if int(reply.NumberReturned) != len(reply.Documents) {
return command.NewCommandResponseError("malformed OP_REPLY: NumberReturned does not match number of returned documents", nil)
}
if reply.ResponseFlags&wiremessage.CursorNotFound == wiremessage.CursorNotFound {
return command.QueryFailureError{
Message: "query failure - cursor not found",
}
}
if reply.ResponseFlags&wiremessage.QueryFailure == wiremessage.QueryFailure {
return command.QueryFailureError{
Message: "query failure",
Response: reply.Documents[0],
}
}
return nil
}

View File

@@ -0,0 +1,669 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// BulkWriteError is an error from one operation in a bulk write.
type BulkWriteError struct {
result.WriteError
Model WriteModel
}
// BulkWriteException is a collection of errors returned by a bulk write operation.
type BulkWriteException struct {
WriteConcernError *result.WriteConcernError
WriteErrors []BulkWriteError
}
func (BulkWriteException) Error() string {
return ""
}
type bulkWriteBatch struct {
models []WriteModel
canRetry bool
}
// BulkWrite handles the full dispatch cycle for a bulk write operation.
func BulkWrite(
ctx context.Context,
ns command.Namespace,
models []WriteModel,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
sess *session.Client,
writeConcern *writeconcern.WriteConcern,
clock *session.ClusterClock,
registry *bsoncodec.Registry,
opts ...*options.BulkWriteOptions,
) (result.BulkWrite, error) {
if sess != nil && sess.PinnedServer != nil {
selector = sess.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.BulkWrite{}, err
}
err = verifyOptions(models, ss)
if err != nil {
return result.BulkWrite{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if sess == nil && topo.SupportsSessions() {
sess, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.BulkWrite{}, err
}
defer sess.EndSession()
}
bwOpts := options.MergeBulkWriteOptions(opts...)
ordered := *bwOpts.Ordered
batches := createBatches(models, ordered)
bwRes := result.BulkWrite{
UpsertedIDs: make(map[int64]interface{}),
}
bwErr := BulkWriteException{
WriteErrors: make([]BulkWriteError, 0),
}
var lastErr error
var opIndex int64 // the operation index for the upsertedIDs map
continueOnError := !ordered
for _, batch := range batches {
if len(batch.models) == 0 {
continue
}
bypassDocValidation := bwOpts.BypassDocumentValidation
if bypassDocValidation != nil && !*bypassDocValidation {
bypassDocValidation = nil
}
batchRes, batchErr, err := runBatch(ctx, ns, topo, selector, ss, sess, clock, writeConcern, retryWrite,
bypassDocValidation, continueOnError, batch, registry)
mergeResults(&bwRes, batchRes, opIndex)
bwErr.WriteConcernError = batchErr.WriteConcernError
for i := range batchErr.WriteErrors {
batchErr.WriteErrors[i].Index = batchErr.WriteErrors[i].Index + int(opIndex)
}
bwErr.WriteErrors = append(bwErr.WriteErrors, batchErr.WriteErrors...)
if !continueOnError && (err != nil || len(batchErr.WriteErrors) > 0 || batchErr.WriteConcernError != nil) {
if err != nil {
return bwRes, err
}
return bwRes, bwErr
}
if err != nil {
lastErr = err
}
opIndex += int64(len(batch.models))
}
bwRes.MatchedCount -= bwRes.UpsertedCount
if lastErr != nil {
return bwRes, lastErr
}
if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
return bwRes, bwErr
}
return bwRes, nil
}
func runBatch(
ctx context.Context,
ns command.Namespace,
topo *topology.Topology,
selector description.ServerSelector,
ss *topology.SelectedServer,
sess *session.Client,
clock *session.ClusterClock,
wc *writeconcern.WriteConcern,
retryWrite bool,
bypassDocValidation *bool,
continueOnError bool,
batch bulkWriteBatch,
registry *bsoncodec.Registry,
) (result.BulkWrite, BulkWriteException, error) {
batchRes := result.BulkWrite{
UpsertedIDs: make(map[int64]interface{}),
}
batchErr := BulkWriteException{}
var writeErrors []result.WriteError
switch batch.models[0].(type) {
case InsertOneModel:
res, err := runInsert(ctx, ns, topo, selector, ss, sess, clock, wc, retryWrite, batch, bypassDocValidation,
continueOnError, registry)
if err != nil {
return result.BulkWrite{}, BulkWriteException{}, err
}
batchRes.InsertedCount = int64(res.N)
writeErrors = res.WriteErrors
batchErr.WriteConcernError = res.WriteConcernError
case DeleteOneModel, DeleteManyModel:
res, err := runDelete(ctx, ns, topo, selector, ss, sess, clock, wc, retryWrite, batch, continueOnError, registry)
if err != nil {
return result.BulkWrite{}, BulkWriteException{}, err
}
batchRes.DeletedCount = int64(res.N)
writeErrors = res.WriteErrors
batchErr.WriteConcernError = res.WriteConcernError
case ReplaceOneModel, UpdateOneModel, UpdateManyModel:
res, err := runUpdate(ctx, ns, topo, selector, ss, sess, clock, wc, retryWrite, batch, bypassDocValidation,
continueOnError, registry)
if err != nil {
return result.BulkWrite{}, BulkWriteException{}, err
}
batchRes.MatchedCount = res.MatchedCount
batchRes.ModifiedCount = res.ModifiedCount
batchRes.UpsertedCount = int64(len(res.Upserted))
writeErrors = res.WriteErrors
batchErr.WriteConcernError = res.WriteConcernError
for _, upsert := range res.Upserted {
batchRes.UpsertedIDs[upsert.Index] = upsert.ID
}
}
batchErr.WriteErrors = make([]BulkWriteError, 0, len(writeErrors))
for _, we := range writeErrors {
batchErr.WriteErrors = append(batchErr.WriteErrors, BulkWriteError{
WriteError: we,
Model: batch.models[0],
})
}
return batchRes, batchErr, nil
}
func runInsert(
ctx context.Context,
ns command.Namespace,
topo *topology.Topology,
selector description.ServerSelector,
ss *topology.SelectedServer,
sess *session.Client,
clock *session.ClusterClock,
wc *writeconcern.WriteConcern,
retryWrite bool,
batch bulkWriteBatch,
bypassDocValidation *bool,
continueOnError bool,
registry *bsoncodec.Registry,
) (result.Insert, error) {
docs := make([]bsonx.Doc, len(batch.models))
var i int
for _, model := range batch.models {
converted := model.(InsertOneModel)
doc, err := interfaceToDocument(converted.Document, registry)
if err != nil {
return result.Insert{}, err
}
docs[i] = doc
i++
}
cmd := command.Insert{
ContinueOnError: continueOnError,
NS: ns,
Docs: docs,
Session: sess,
Clock: clock,
WriteConcern: wc,
}
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
if bypassDocValidation != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)})
}
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite || !batch.canRetry {
if cmd.Session != nil {
cmd.Session.RetryWrite = false
}
return insert(ctx, &cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, origErr := insert(ctx, &cmd, ss, nil)
if shouldRetry(origErr, res.WriteConcernError) {
newServer, err := topo.SelectServerLegacy(ctx, selector)
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, origErr
}
return insert(ctx, &cmd, newServer, origErr)
}
return res, origErr
}
func runDelete(
ctx context.Context,
ns command.Namespace,
topo *topology.Topology,
selector description.ServerSelector,
ss *topology.SelectedServer,
sess *session.Client,
clock *session.ClusterClock,
wc *writeconcern.WriteConcern,
retryWrite bool,
batch bulkWriteBatch,
continueOnError bool,
registry *bsoncodec.Registry,
) (result.Delete, error) {
docs := make([]bsonx.Doc, len(batch.models))
var i int
for _, model := range batch.models {
var doc bsonx.Doc
var err error
if dom, ok := model.(DeleteOneModel); ok {
doc, err = createDeleteDoc(dom.Filter, dom.Collation, false, registry)
} else if dmm, ok := model.(DeleteManyModel); ok {
doc, err = createDeleteDoc(dmm.Filter, dmm.Collation, true, registry)
}
if err != nil {
return result.Delete{}, err
}
docs[i] = doc
i++
}
cmd := command.Delete{
ContinueOnError: continueOnError,
NS: ns,
Deletes: docs,
Session: sess,
Clock: clock,
WriteConcern: wc,
}
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite || !batch.canRetry {
if cmd.Session != nil {
cmd.Session.RetryWrite = false
}
return delete(ctx, &cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, origErr := delete(ctx, &cmd, ss, nil)
if shouldRetry(origErr, res.WriteConcernError) {
newServer, err := topo.SelectServerLegacy(ctx, selector)
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, origErr
}
return delete(ctx, &cmd, newServer, origErr)
}
return res, origErr
}
func runUpdate(
ctx context.Context,
ns command.Namespace,
topo *topology.Topology,
selector description.ServerSelector,
ss *topology.SelectedServer,
sess *session.Client,
clock *session.ClusterClock,
wc *writeconcern.WriteConcern,
retryWrite bool,
batch bulkWriteBatch,
bypassDocValidation *bool,
continueOnError bool,
registry *bsoncodec.Registry,
) (result.Update, error) {
docs := make([]bsonx.Doc, len(batch.models))
for i, model := range batch.models {
var doc bsonx.Doc
var err error
if rom, ok := model.(ReplaceOneModel); ok {
doc, err = createUpdateDoc(rom.Filter, rom.Replacement, options.ArrayFilters{}, false, rom.UpdateModel, false,
registry)
} else if uom, ok := model.(UpdateOneModel); ok {
doc, err = createUpdateDoc(uom.Filter, uom.Update, uom.ArrayFilters, uom.ArrayFiltersSet, uom.UpdateModel, false,
registry)
} else if umm, ok := model.(UpdateManyModel); ok {
doc, err = createUpdateDoc(umm.Filter, umm.Update, umm.ArrayFilters, umm.ArrayFiltersSet, umm.UpdateModel, true,
registry)
}
if err != nil {
return result.Update{}, err
}
docs[i] = doc
}
cmd := command.Update{
ContinueOnError: continueOnError,
NS: ns,
Docs: docs,
Session: sess,
Clock: clock,
WriteConcern: wc,
}
cmd.Opts = []bsonx.Elem{{"ordered", bsonx.Boolean(!continueOnError)}}
if bypassDocValidation != nil {
// TODO this is temporary!
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*bypassDocValidation)})
//cmd.Opts = []option.UpdateOptioner{option.OptBypassDocumentValidation(bypassDocValidation)}
}
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite || !batch.canRetry {
if cmd.Session != nil {
cmd.Session.RetryWrite = false
}
return update(ctx, &cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, origErr := update(ctx, &cmd, ss, nil)
if shouldRetry(origErr, res.WriteConcernError) {
newServer, err := topo.SelectServerLegacy(ctx, selector)
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, origErr
}
return update(ctx, &cmd, newServer, origErr)
}
return res, origErr
}
func verifyOptions(models []WriteModel, ss *topology.SelectedServer) error {
maxVersion := ss.Description().WireVersion.Max
// 3.4 is wire version 5
// 3.6 is wire version 6
for _, model := range models {
var collationSet bool
var afSet bool // arrayFilters
switch converted := model.(type) {
case DeleteOneModel:
collationSet = converted.Collation != nil
case DeleteManyModel:
collationSet = converted.Collation != nil
case ReplaceOneModel:
collationSet = converted.Collation != nil
case UpdateOneModel:
afSet = converted.ArrayFiltersSet
collationSet = converted.Collation != nil
case UpdateManyModel:
afSet = converted.ArrayFiltersSet
collationSet = converted.Collation != nil
}
if afSet && maxVersion < 6 {
return ErrArrayFilters
}
if collationSet && maxVersion < 5 {
return ErrCollation
}
}
return nil
}
func createBatches(models []WriteModel, ordered bool) []bulkWriteBatch {
if ordered {
return createOrderedBatches(models)
}
batches := make([]bulkWriteBatch, 3)
var i int
for i = 0; i < 3; i++ {
batches[i].canRetry = true
}
var numBatches int // number of batches used. can't use len(batches) because it's set to 3
insertInd := -1
updateInd := -1
deleteInd := -1
for _, model := range models {
switch converted := model.(type) {
case InsertOneModel:
if insertInd == -1 {
// this is the first InsertOneModel
insertInd = numBatches
numBatches++
}
batches[insertInd].models = append(batches[insertInd].models, model)
case DeleteOneModel, DeleteManyModel:
if deleteInd == -1 {
deleteInd = numBatches
numBatches++
}
batches[deleteInd].models = append(batches[deleteInd].models, model)
if _, ok := converted.(DeleteManyModel); ok {
batches[deleteInd].canRetry = false
}
case ReplaceOneModel, UpdateOneModel, UpdateManyModel:
if updateInd == -1 {
updateInd = numBatches
numBatches++
}
batches[updateInd].models = append(batches[updateInd].models, model)
if _, ok := converted.(UpdateManyModel); ok {
batches[updateInd].canRetry = false
}
}
}
return batches
}
func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
var batches []bulkWriteBatch
var prevKind command.WriteCommandKind = -1
i := -1 // batch index
for _, model := range models {
var createNewBatch bool
var canRetry bool
var newKind command.WriteCommandKind
switch model.(type) {
case InsertOneModel:
createNewBatch = prevKind != command.InsertCommand
canRetry = true
newKind = command.InsertCommand
case DeleteOneModel:
createNewBatch = prevKind != command.DeleteCommand
canRetry = true
newKind = command.DeleteCommand
case DeleteManyModel:
createNewBatch = prevKind != command.DeleteCommand
newKind = command.DeleteCommand
case ReplaceOneModel, UpdateOneModel:
createNewBatch = prevKind != command.UpdateCommand
canRetry = true
newKind = command.UpdateCommand
case UpdateManyModel:
createNewBatch = prevKind != command.UpdateCommand
newKind = command.UpdateCommand
}
if createNewBatch {
batches = append(batches, bulkWriteBatch{
models: []WriteModel{model},
canRetry: canRetry,
})
i++
} else {
batches[i].models = append(batches[i].models, model)
if !canRetry {
batches[i].canRetry = false // don't make it true if it was already false
}
}
prevKind = newKind
}
return batches
}
func shouldRetry(cmdErr error, wcErr *result.WriteConcernError) bool {
if cerr, ok := cmdErr.(command.Error); ok && cerr.Retryable() ||
wcErr != nil && command.IsWriteConcernErrorRetryable(wcErr) {
return true
}
return false
}
func createUpdateDoc(
filter interface{},
update interface{},
arrayFilters options.ArrayFilters,
arrayFiltersSet bool,
updateModel UpdateModel,
multi bool,
registry *bsoncodec.Registry,
) (bsonx.Doc, error) {
f, err := interfaceToDocument(filter, registry)
if err != nil {
return nil, err
}
u, err := interfaceToDocument(update, registry)
if err != nil {
return nil, err
}
doc := bsonx.Doc{
{"q", bsonx.Document(f)},
{"u", bsonx.Document(u)},
{"multi", bsonx.Boolean(multi)},
}
if arrayFiltersSet {
filters, err := arrayFilters.ToArray()
if err != nil {
return nil, err
}
arr := make(bsonx.Arr, 0, len(filters))
for _, filter := range filters {
doc, err := bsonx.ReadDoc(filter)
if err != nil {
return nil, err
}
arr = append(arr, bsonx.Document(doc))
}
doc = append(doc, bsonx.Elem{"arrayFilters", bsonx.Array(arr)})
}
if updateModel.Collation != nil {
collDoc, err := bsonx.ReadDoc(updateModel.Collation.ToDocument())
if err != nil {
return nil, err
}
doc = append(doc, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if updateModel.UpsertSet {
doc = append(doc, bsonx.Elem{"upsert", bsonx.Boolean(updateModel.Upsert)})
}
return doc, nil
}
func createDeleteDoc(
filter interface{},
collation *options.Collation,
many bool,
registry *bsoncodec.Registry,
) (bsonx.Doc, error) {
f, err := interfaceToDocument(filter, registry)
if err != nil {
return nil, err
}
var limit int32 = 1
if many {
limit = 0
}
doc := bsonx.Doc{
{"q", bsonx.Document(f)},
{"limit", bsonx.Int32(limit)},
}
if collation != nil {
collDoc, err := bsonx.ReadDoc(collation.ToDocument())
if err != nil {
return nil, err
}
doc = append(doc, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
return doc, nil
}
func mergeResults(aggResult *result.BulkWrite, newResult result.BulkWrite, opIndex int64) {
aggResult.InsertedCount += newResult.InsertedCount
aggResult.MatchedCount += newResult.MatchedCount
aggResult.ModifiedCount += newResult.ModifiedCount
aggResult.DeletedCount += newResult.DeletedCount
aggResult.UpsertedCount += newResult.UpsertedCount
for index, upsertID := range newResult.UpsertedIDs {
aggResult.UpsertedIDs[index+opIndex] = upsertID
}
}

View File

@@ -0,0 +1,120 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// CommitTransaction handles the full cycle dispatch and execution of committing a transaction
// against the provided topology.
func CommitTransaction(
ctx context.Context,
cmd command.CommitTransaction,
topo *topology.Topology,
selector description.ServerSelector,
) (result.TransactionResult, error) {
res, err := commitTransaction(ctx, cmd, topo, selector, nil)
// Apply majority write concern for retries
currWC := cmd.Session.CurrentWc
newTimeout := 10 * time.Second
if currWC != nil && currWC.GetWTimeout() != 0 {
newTimeout = currWC.GetWTimeout()
}
cmd.Session.CurrentWc = currWC.WithOptions(writeconcern.WMajority(), writeconcern.WTimeout(newTimeout))
if cerr, ok := err.(command.Error); ok && err != nil {
// Retry if appropriate
if cerr.Retryable() {
res, err = commitTransaction(ctx, cmd, topo, selector, cerr)
if cerr2, ok := err.(command.Error); ok && err != nil {
// Retry failures also get label
cerr2.Labels = append(cerr2.Labels, command.UnknownTransactionCommitResult)
} else if err != nil {
err = command.Error{Message: err.Error(), Labels: []string{command.UnknownTransactionCommitResult}}
}
}
}
return res, err
}
func commitTransaction(
ctx context.Context,
cmd command.CommitTransaction,
topo *topology.Topology,
selector description.ServerSelector,
oldErr error,
) (result.TransactionResult, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
// If retrying server selection, return the original error if it fails
if oldErr != nil {
return result.TransactionResult{}, oldErr
}
return result.TransactionResult{}, err
}
desc := ss.Description()
if oldErr != nil && (!topo.SupportsSessions() || !description.SessionsSupported(desc.WireVersion)) {
// Assuming we are retrying (oldErr != nil),
// if server doesn't support retryable writes, return the original error
// Conditions for retry write support are the same as that of sessions
return result.TransactionResult{}, oldErr
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.TransactionResult{}, oldErr
}
return result.TransactionResult{}, err
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
// Add UnknownCommitTransaction Error label where appropriate
if err != nil {
var newLabels []string
if cerr, ok := err.(command.Error); ok {
// Replace the label TransientTransactionError with UnknownTransactionCommitResult
// if network error, write concern shutdown, or write concern failed errors
hasUnknownCommitErr := false
for _, label := range cerr.Labels {
if label == command.NetworkError {
hasUnknownCommitErr = true
break
}
}
// network error, retryable error, or write concern fail/timeout (64) get the unknown label
if hasUnknownCommitErr || cerr.Retryable() || cerr.Code == 64 {
for _, label := range cerr.Labels {
if label != command.TransientTransactionError {
newLabels = append(newLabels, label)
}
}
newLabels = append(newLabels, command.UnknownTransactionCommitResult)
cerr.Labels = newLabels
}
err = cerr
}
}
return res, err
}

View File

@@ -0,0 +1,97 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// Count handles the full cycle dispatch and execution of a count command against the provided
// topology.
func Count(
ctx context.Context,
cmd command.Count,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
registry *bsoncodec.Registry,
opts ...*options.CountOptions,
) (int64, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return 0, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return 0, err
}
defer conn.Close()
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return 0, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return 0, err
}
defer cmd.Session.EndSession()
}
countOpts := options.MergeCountOptions(opts...)
if countOpts.Limit != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"limit", bsonx.Int64(*countOpts.Limit)})
}
if countOpts.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{
"maxTimeMS", bsonx.Int64(int64(*countOpts.MaxTime / time.Millisecond)),
})
}
if countOpts.Skip != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"skip", bsonx.Int64(*countOpts.Skip)})
}
if countOpts.Collation != nil {
if desc.WireVersion.Max < 5 {
return 0, ErrCollation
}
collDoc, err := bsonx.ReadDoc(countOpts.Collation.ToDocument())
if err != nil {
return 0, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if countOpts.Hint != nil {
hintElem, err := interfaceToElement("hint", countOpts.Hint, registry)
if err != nil {
return 0, err
}
cmd.Opts = append(cmd.Opts, hintElem)
}
return cmd.RoundTrip(ctx, desc, conn)
}

View File

@@ -0,0 +1,94 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// CountDocuments handles the full cycle dispatch and execution of a countDocuments command against the provided
// topology.
func CountDocuments(
ctx context.Context,
cmd command.CountDocuments,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
registry *bsoncodec.Registry,
opts ...*options.CountOptions,
) (int64, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return 0, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return 0, err
}
defer conn.Close()
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return 0, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return 0, err
}
defer cmd.Session.EndSession()
}
countOpts := options.MergeCountOptions(opts...)
// ignore Skip and Limit because we already have these options in the pipeline
if countOpts.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{
"maxTimeMS", bsonx.Int64(int64(*countOpts.MaxTime / time.Millisecond)),
})
}
if countOpts.Collation != nil {
if desc.WireVersion.Max < 5 {
return 0, ErrCollation
}
collDoc, err := bsonx.ReadDoc(countOpts.Collation.ToDocument())
if err != nil {
return 0, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if countOpts.Hint != nil {
hintElem, err := interfaceToElement("hint", countOpts.Hint, registry)
if err != nil {
return 0, err
}
cmd.Opts = append(cmd.Opts, hintElem)
}
return cmd.RoundTrip(ctx, desc, conn)
}

View File

@@ -0,0 +1,77 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// CreateIndexes handles the full cycle dispatch and execution of a createIndexes
// command against the provided topology.
func CreateIndexes(
ctx context.Context,
cmd command.CreateIndexes,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.CreateIndexesOptions,
) (result.CreateIndexes, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.CreateIndexes{}, err
}
desc := ss.Description()
if desc.WireVersion.Max < 5 && hasCollation(cmd) {
return result.CreateIndexes{}, ErrCollation
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return result.CreateIndexes{}, err
}
defer conn.Close()
cio := options.MergeCreateIndexesOptions(opts...)
if cio.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*cio.MaxTime / time.Millisecond))})
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.CreateIndexes{}, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}
func hasCollation(cmd command.CreateIndexes) bool {
for _, ind := range cmd.Indexes {
if _, err := ind.Document().LookupErr("collation"); err == nil {
return true
}
}
return false
}

View File

@@ -0,0 +1,124 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// Delete handles the full cycle dispatch and execution of a delete command against the provided
// topology.
func Delete(
ctx context.Context,
cmd command.Delete,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
opts ...*options.DeleteOptions,
) (result.Delete, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.Delete{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() && writeconcern.AckWrite(cmd.WriteConcern) {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.Delete{}, err
}
defer cmd.Session.EndSession()
}
deleteOpts := options.MergeDeleteOptions(opts...)
if deleteOpts.Collation != nil {
if ss.Description().WireVersion.Max < 5 {
return result.Delete{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(deleteOpts.Collation.ToDocument())
if err != nil {
return result.Delete{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return delete(ctx, &cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := delete(ctx, &cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return delete(ctx, &cmd, ss, cerr)
}
return res, originalErr
}
func delete(
ctx context.Context,
cmd *command.Delete,
ss *topology.SelectedServer,
oldErr error,
) (result.Delete, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.Delete{}, oldErr
}
return result.Delete{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.Delete{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,62 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// DropIndexes handles the full cycle dispatch and execution of a dropIndexes
// command against the provided topology.
func DropIndexes(
ctx context.Context,
cmd command.DropIndexes,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.DropIndexesOptions,
) (bson.Raw, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
dio := options.MergeDropIndexesOptions(opts...)
if dio.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*dio.MaxTime / time.Millisecond))})
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}

View File

@@ -0,0 +1,67 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy // import "go.mongodb.org/mongo-driver/x/mongo/driverlegacy"
import (
"errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
)
// ErrCollation is caused if a collation is given for an invalid server version.
var ErrCollation = errors.New("collation cannot be set for server versions < 3.4")
// ErrArrayFilters is caused if array filters are given for an invalid server version.
var ErrArrayFilters = errors.New("array filters cannot be set for server versions < 3.6")
func interfaceToDocument(val interface{}, registry *bsoncodec.Registry) (bsonx.Doc, error) {
if val == nil {
return bsonx.Doc{}, nil
}
if registry == nil {
registry = bson.DefaultRegistry
}
if bs, ok := val.([]byte); ok {
// Slight optimization so we'll just use MarshalBSON and not go through the codec machinery.
val = bson.Raw(bs)
}
// TODO(skriptble): Use a pool of these instead.
buf := make([]byte, 0, 256)
b, err := bson.MarshalAppendWithRegistry(registry, buf, val)
if err != nil {
return nil, err
}
return bsonx.ReadDoc(b)
}
func interfaceToElement(key string, i interface{}, registry *bsoncodec.Registry) (bsonx.Elem, error) {
switch conv := i.(type) {
case string:
return bsonx.Elem{key, bsonx.String(conv)}, nil
case bsonx.Doc:
return bsonx.Elem{key, bsonx.Document(conv)}, nil
default:
doc, err := interfaceToDocument(i, registry)
if err != nil {
return bsonx.Elem{}, err
}
return bsonx.Elem{key, bsonx.Document(doc)}, nil
}
}
func closeImplicitSession(sess *session.Client) {
if sess != nil && sess.SessionType == session.Implicit {
sess.EndSession()
}
}

View File

@@ -0,0 +1,84 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// Distinct handles the full cycle dispatch and execution of a distinct command against the provided
// topology.
func Distinct(
ctx context.Context,
cmd command.Distinct,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.DistinctOptions,
) (result.Distinct, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.Distinct{}, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return result.Distinct{}, err
}
defer conn.Close()
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return result.Distinct{}, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.Distinct{}, err
}
defer cmd.Session.EndSession()
}
distinctOpts := options.MergeDistinctOptions(opts...)
if distinctOpts.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{
"maxTimeMS", bsonx.Int64(int64(*distinctOpts.MaxTime / time.Millisecond)),
})
}
if distinctOpts.Collation != nil {
if desc.WireVersion.Max < 5 {
return result.Distinct{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(distinctOpts.Collation.ToDocument())
if err != nil {
return result.Distinct{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
return cmd.RoundTrip(ctx, desc, conn)
}

View File

@@ -0,0 +1,52 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// DropCollection handles the full cycle dispatch and execution of a dropCollection
// command against the provided topology.
func DropCollection(
ctx context.Context,
cmd command.DropCollection,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
) (bson.Raw, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}

View File

@@ -0,0 +1,52 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// DropDatabase handles the full cycle dispatch and execution of a dropDatabase
// command against the provided topology.
func DropDatabase(
ctx context.Context,
cmd command.DropDatabase,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
) (bson.Raw, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}

View File

@@ -0,0 +1,40 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// EndSessions handles the full cycle dispatch and execution of an endSessions command against the provided
// topology.
func EndSessions(
ctx context.Context,
cmd command.EndSessions,
topo *topology.Topology,
selector description.ServerSelector,
) ([]result.EndSessions, []error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, []error{err}
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, []error{err}
}
defer conn.Close()
return cmd.RoundTrip(ctx, desc, conn)
}

View File

@@ -0,0 +1,538 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"errors"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/wiremessage"
)
// Find handles the full cycle dispatch and execution of a find command against the provided
// topology.
func Find(
ctx context.Context,
cmd command.Find,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
registry *bsoncodec.Registry,
opts ...*options.FindOptions,
) (*BatchCursor, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
if desc.WireVersion.Max < 4 {
return legacyFind(ctx, cmd, registry, ss, conn, opts...)
}
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return nil, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
}
fo := options.MergeFindOptions(opts...)
if fo.AllowPartialResults != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"allowPartialResults", bsonx.Boolean(*fo.AllowPartialResults)})
}
if fo.BatchSize != nil {
elem := bsonx.Elem{"batchSize", bsonx.Int32(*fo.BatchSize)}
cmd.Opts = append(cmd.Opts, elem)
cmd.CursorOpts = append(cmd.CursorOpts, elem)
if fo.Limit != nil && *fo.BatchSize != 0 && *fo.Limit <= int64(*fo.BatchSize) {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"singleBatch", bsonx.Boolean(true)})
}
}
if fo.Collation != nil {
if desc.WireVersion.Max < 5 {
return nil, ErrCollation
}
collDoc, err := bsonx.ReadDoc(fo.Collation.ToDocument())
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if fo.Comment != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"comment", bsonx.String(*fo.Comment)})
}
if fo.CursorType != nil {
switch *fo.CursorType {
case options.Tailable:
cmd.Opts = append(cmd.Opts, bsonx.Elem{"tailable", bsonx.Boolean(true)})
case options.TailableAwait:
cmd.Opts = append(cmd.Opts, bsonx.Elem{"tailable", bsonx.Boolean(true)}, bsonx.Elem{"awaitData", bsonx.Boolean(true)})
}
}
if fo.Hint != nil {
hintElem, err := interfaceToElement("hint", fo.Hint, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, hintElem)
}
if fo.Limit != nil {
limit := *fo.Limit
if limit < 0 {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"singleBatch", bsonx.Boolean(true)})
limit = -1 * limit
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"limit", bsonx.Int64(limit)})
}
if fo.Max != nil {
maxElem, err := interfaceToElement("max", fo.Max, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, maxElem)
}
if fo.MaxAwaitTime != nil {
// Specified as maxTimeMS on the in the getMore command and not given in initial find command.
cmd.CursorOpts = append(cmd.CursorOpts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*fo.MaxAwaitTime / time.Millisecond))})
}
if fo.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*fo.MaxTime / time.Millisecond))})
}
if fo.Min != nil {
minElem, err := interfaceToElement("min", fo.Min, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, minElem)
}
if fo.NoCursorTimeout != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"noCursorTimeout", bsonx.Boolean(*fo.NoCursorTimeout)})
}
if fo.OplogReplay != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"oplogReplay", bsonx.Boolean(*fo.OplogReplay)})
}
if fo.Projection != nil {
projElem, err := interfaceToElement("projection", fo.Projection, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, projElem)
}
if fo.ReturnKey != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"returnKey", bsonx.Boolean(*fo.ReturnKey)})
}
if fo.ShowRecordID != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"showRecordId", bsonx.Boolean(*fo.ShowRecordID)})
}
if fo.Skip != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"skip", bsonx.Int64(*fo.Skip)})
}
if fo.Snapshot != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"snapshot", bsonx.Boolean(*fo.Snapshot)})
}
if fo.Sort != nil {
sortElem, err := interfaceToElement("sort", fo.Sort, registry)
if err != nil {
return nil, err
}
cmd.Opts = append(cmd.Opts, sortElem)
}
res, err := cmd.RoundTrip(ctx, desc, conn)
if err != nil {
closeImplicitSession(cmd.Session)
return nil, err
}
return NewBatchCursor(bsoncore.Document(res), cmd.Session, cmd.Clock, ss.Server, cmd.CursorOpts...)
}
// legacyFind handles the dispatch and execution of a find operation against a pre-3.2 server.
func legacyFind(
ctx context.Context,
cmd command.Find,
registry *bsoncodec.Registry,
ss *topology.SelectedServer,
conn connection.Connection,
opts ...*options.FindOptions,
) (*BatchCursor, error) {
query := wiremessage.Query{
FullCollectionName: cmd.NS.DB + "." + cmd.NS.Collection,
}
fo := options.MergeFindOptions(opts...)
optsDoc, err := createLegacyOptionsDoc(fo, registry)
if err != nil {
return nil, err
}
if fo.Projection != nil {
projDoc, err := interfaceToDocument(fo.Projection, registry)
if err != nil {
return nil, err
}
projRaw, err := projDoc.MarshalBSON()
if err != nil {
return nil, err
}
query.ReturnFieldsSelector = projRaw
}
if fo.Skip != nil {
query.NumberToSkip = int32(*fo.Skip)
query.SkipSet = true
}
// batch size of 1 not possible with OP_QUERY because the cursor will be closed immediately
if fo.BatchSize != nil && *fo.BatchSize == 1 {
query.NumberToReturn = 2
} else {
query.NumberToReturn = calculateNumberToReturn(fo)
}
query.Flags = calculateLegacyFlags(fo)
query.BatchSize = fo.BatchSize
if fo.Limit != nil {
i := int32(*fo.Limit)
query.Limit = &i
}
// set read preference and/or slaveOK flag
desc := ss.SelectedDescription()
if slaveOkNeeded(cmd.ReadPref, desc) {
query.Flags |= wiremessage.SlaveOK
}
optsDoc = addReadPref(cmd.ReadPref, desc.Server.Kind, optsDoc)
if cmd.Filter == nil {
cmd.Filter = bsonx.Doc{}
}
// filter must be wrapped in $query if other $modifiers are used
var queryDoc bsonx.Doc
if len(optsDoc) == 0 {
queryDoc = cmd.Filter
} else {
filterDoc := bsonx.Doc{
{"$query", bsonx.Document(cmd.Filter)},
}
// $query should go first
queryDoc = append(filterDoc, optsDoc...)
}
queryRaw, err := queryDoc.MarshalBSON()
if err != nil {
return nil, err
}
query.Query = queryRaw
reply, err := roundTripQuery(ctx, query, conn)
if err != nil {
return nil, err
}
var cursorLimit int32
var cursorBatchSize int32
if query.Limit != nil {
cursorLimit = int32(*query.Limit)
if cursorLimit < 0 {
cursorLimit *= -1
}
}
if query.BatchSize != nil {
cursorBatchSize = int32(*query.BatchSize)
}
// TODO(GODRIVER-617): When the wiremessage package is updated, we should ensure we can get all
// of the documents as a single slice instead of having to reslice.
ds := new(bsoncore.DocumentSequence)
ds.Style = bsoncore.SequenceStyle
for _, doc := range reply.Documents {
ds.Data = append(ds.Data, doc...)
}
return NewLegacyBatchCursor(cmd.NS, reply.CursorID, ds, cursorLimit, cursorBatchSize, ss.Server)
}
func createLegacyOptionsDoc(fo *options.FindOptions, registry *bsoncodec.Registry) (bsonx.Doc, error) {
var optsDoc bsonx.Doc
if fo.Collation != nil {
return nil, ErrCollation
}
if fo.Comment != nil {
optsDoc = append(optsDoc, bsonx.Elem{"$comment", bsonx.String(*fo.Comment)})
}
if fo.Hint != nil {
hintElem, err := interfaceToElement("$hint", fo.Hint, registry)
if err != nil {
return nil, err
}
optsDoc = append(optsDoc, hintElem)
}
if fo.Max != nil {
maxElem, err := interfaceToElement("$max", fo.Max, registry)
if err != nil {
return nil, err
}
optsDoc = append(optsDoc, maxElem)
}
if fo.MaxTime != nil {
optsDoc = append(optsDoc, bsonx.Elem{"$maxTimeMS", bsonx.Int64(int64(*fo.MaxTime / time.Millisecond))})
}
if fo.Min != nil {
minElem, err := interfaceToElement("$min", fo.Min, registry)
if err != nil {
return nil, err
}
optsDoc = append(optsDoc, minElem)
}
if fo.ReturnKey != nil {
optsDoc = append(optsDoc, bsonx.Elem{"$returnKey", bsonx.Boolean(*fo.ReturnKey)})
}
if fo.ShowRecordID != nil {
optsDoc = append(optsDoc, bsonx.Elem{"$showDiskLoc", bsonx.Boolean(*fo.ShowRecordID)})
}
if fo.Snapshot != nil {
optsDoc = append(optsDoc, bsonx.Elem{"$snapshot", bsonx.Boolean(*fo.Snapshot)})
}
if fo.Sort != nil {
sortElem, err := interfaceToElement("$orderby", fo.Sort, registry)
if err != nil {
return nil, err
}
optsDoc = append(optsDoc, sortElem)
}
return optsDoc, nil
}
func calculateLegacyFlags(fo *options.FindOptions) wiremessage.QueryFlag {
var flags wiremessage.QueryFlag
if fo.AllowPartialResults != nil {
flags |= wiremessage.Partial
}
if fo.CursorType != nil {
switch *fo.CursorType {
case options.Tailable:
flags |= wiremessage.TailableCursor
case options.TailableAwait:
flags |= wiremessage.TailableCursor
flags |= wiremessage.AwaitData
}
}
if fo.NoCursorTimeout != nil {
flags |= wiremessage.NoCursorTimeout
}
if fo.OplogReplay != nil {
flags |= wiremessage.OplogReplay
}
return flags
}
// calculate the number to return for the first find query
func calculateNumberToReturn(opts *options.FindOptions) int32 {
var numReturn int32
var limit int32
var batchSize int32
if opts.Limit != nil {
limit = int32(*opts.Limit)
}
if opts.BatchSize != nil {
batchSize = int32(*opts.BatchSize)
}
if limit < 0 {
numReturn = limit
} else if limit == 0 {
numReturn = batchSize
} else if limit < batchSize {
numReturn = limit
} else {
numReturn = batchSize
}
return numReturn
}
func slaveOkNeeded(rp *readpref.ReadPref, desc description.SelectedServer) bool {
if desc.Kind == description.Single && desc.Server.Kind != description.Mongos {
return true
}
if rp == nil {
// assume primary
return false
}
return rp.Mode() != readpref.PrimaryMode
}
func addReadPref(rp *readpref.ReadPref, kind description.ServerKind, query bsonx.Doc) bsonx.Doc {
if !readPrefNeeded(rp, kind) {
return query
}
doc := createReadPref(rp)
if doc == nil {
return query
}
return query.Append("$readPreference", bsonx.Document(doc))
}
func readPrefNeeded(rp *readpref.ReadPref, kind description.ServerKind) bool {
if kind != description.Mongos || rp == nil {
return false
}
// simple Primary or SecondaryPreferred is communicated via slaveOk to Mongos.
if rp.Mode() == readpref.PrimaryMode || rp.Mode() == readpref.SecondaryPreferredMode {
if _, ok := rp.MaxStaleness(); !ok && len(rp.TagSets()) == 0 {
return false
}
}
return true
}
func createReadPref(rp *readpref.ReadPref) bsonx.Doc {
if rp == nil {
return nil
}
doc := bsonx.Doc{}
switch rp.Mode() {
case readpref.PrimaryMode:
doc = append(doc, bsonx.Elem{"mode", bsonx.String("primary")})
case readpref.PrimaryPreferredMode:
doc = append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
case readpref.SecondaryPreferredMode:
doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondaryPreferred")})
case readpref.SecondaryMode:
doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondary")})
case readpref.NearestMode:
doc = append(doc, bsonx.Elem{"mode", bsonx.String("nearest")})
}
sets := make([]bsonx.Val, 0, len(rp.TagSets()))
for _, ts := range rp.TagSets() {
if len(ts) == 0 {
continue
}
set := bsonx.Doc{}
for _, t := range ts {
set = append(set, bsonx.Elem{t.Name, bsonx.String(t.Value)})
}
sets = append(sets, bsonx.Document(set))
}
if len(sets) > 0 {
doc = append(doc, bsonx.Elem{"tags", bsonx.Array(sets)})
}
if d, ok := rp.MaxStaleness(); ok {
doc = append(doc, bsonx.Elem{"maxStalenessSeconds", bsonx.Int32(int32(d.Seconds()))})
}
return doc
}
func roundTripQuery(ctx context.Context, query wiremessage.Query, conn connection.Connection) (wiremessage.Reply, error) {
err := conn.WriteWireMessage(ctx, query)
if err != nil {
if _, ok := err.(command.Error); ok {
return wiremessage.Reply{}, err
}
return wiremessage.Reply{}, command.Error{
Message: err.Error(),
Labels: []string{command.NetworkError},
}
}
wm, err := conn.ReadWireMessage(ctx)
if err != nil {
if _, ok := err.(command.Error); ok {
return wiremessage.Reply{}, err
}
// Connection errors are transient
return wiremessage.Reply{}, command.Error{
Message: err.Error(),
Labels: []string{command.NetworkError},
}
}
reply, ok := wm.(wiremessage.Reply)
if !ok {
return wiremessage.Reply{}, errors.New("did not receive OP_REPLY response")
}
err = validateOpReply(reply)
if err != nil {
return wiremessage.Reply{}, err
}
return reply, nil
}
func validateOpReply(reply wiremessage.Reply) error {
if int(reply.NumberReturned) != len(reply.Documents) {
return command.NewCommandResponseError(command.ReplyDocumentMismatch, nil)
}
if reply.ResponseFlags&wiremessage.QueryFailure == wiremessage.QueryFailure {
return command.QueryFailureError{
Message: "query failure",
Response: reply.Documents[0],
}
}
return nil
}

View File

@@ -0,0 +1,147 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// FindOneAndDelete handles the full cycle dispatch and execution of a FindOneAndDelete command against the provided
// topology.
func FindOneAndDelete(
ctx context.Context,
cmd command.FindOneAndDelete,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
registry *bsoncodec.Registry,
opts ...*options.FindOneAndDeleteOptions,
) (result.FindAndModify, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.FindAndModify{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.FindAndModify{}, err
}
defer cmd.Session.EndSession()
}
do := options.MergeFindOneAndDeleteOptions(opts...)
if do.Collation != nil {
if ss.Description().WireVersion.Max < 5 {
return result.FindAndModify{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(do.Collation.ToDocument())
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if do.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMs", bsonx.Int64(int64(*do.MaxTime / time.Millisecond))})
}
if do.Projection != nil {
projElem, err := interfaceToElement("fields", do.Projection, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, projElem)
}
if do.Sort != nil {
sortElem, err := interfaceToElement("sort", do.Sort, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, sortElem)
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return findOneAndDelete(ctx, cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := findOneAndDelete(ctx, cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return findOneAndDelete(ctx, cmd, ss, cerr)
}
return res, originalErr
}
func findOneAndDelete(
ctx context.Context,
cmd command.FindOneAndDelete,
ss *topology.SelectedServer,
oldErr error,
) (result.FindAndModify, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.FindAndModify{}, oldErr
}
return result.FindAndModify{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.FindAndModify{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,156 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// FindOneAndReplace handles the full cycle dispatch and execution of a FindOneAndReplace command against the provided
// topology.
func FindOneAndReplace(
ctx context.Context,
cmd command.FindOneAndReplace,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
registry *bsoncodec.Registry,
opts ...*options.FindOneAndReplaceOptions,
) (result.FindAndModify, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.FindAndModify{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.FindAndModify{}, err
}
defer cmd.Session.EndSession()
}
ro := options.MergeFindOneAndReplaceOptions(opts...)
if ro.BypassDocumentValidation != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"byapssDocumentValidation", bsonx.Boolean(*ro.BypassDocumentValidation)})
}
if ro.Collation != nil {
if ss.Description().WireVersion.Max < 5 {
return result.FindAndModify{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(ro.Collation.ToDocument())
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if ro.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*ro.MaxTime / time.Millisecond))})
}
if ro.Projection != nil {
maxElem, err := interfaceToElement("fields", ro.Projection, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, maxElem)
}
if ro.ReturnDocument != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"new", bsonx.Boolean(*ro.ReturnDocument == options.After)})
}
if ro.Sort != nil {
sortElem, err := interfaceToElement("sort", ro.Sort, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, sortElem)
}
if ro.Upsert != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"upsert", bsonx.Boolean(*ro.Upsert)})
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return findOneAndReplace(ctx, cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := findOneAndReplace(ctx, cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return findOneAndReplace(ctx, cmd, ss, cerr)
}
return res, originalErr
}
func findOneAndReplace(
ctx context.Context,
cmd command.FindOneAndReplace,
ss *topology.SelectedServer,
oldErr error,
) (result.FindAndModify, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.FindAndModify{}, oldErr
}
return result.FindAndModify{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.FindAndModify{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,172 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// FindOneAndUpdate handles the full cycle dispatch and execution of a FindOneAndUpdate command against the provided
// topology.
func FindOneAndUpdate(
ctx context.Context,
cmd command.FindOneAndUpdate,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
registry *bsoncodec.Registry,
opts ...*options.FindOneAndUpdateOptions,
) (result.FindAndModify, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.FindAndModify{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.FindAndModify{}, err
}
defer cmd.Session.EndSession()
}
uo := options.MergeFindOneAndUpdateOptions(opts...)
if uo.ArrayFilters != nil {
filters, err := uo.ArrayFilters.ToArray()
if err != nil {
return result.FindAndModify{}, err
}
arr := make(bsonx.Arr, 0, len(filters))
for _, filter := range filters {
doc, err := bsonx.ReadDoc(filter)
if err != nil {
return result.FindAndModify{}, err
}
arr = append(arr, bsonx.Document(doc))
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"arrayFilters", bsonx.Array(arr)})
}
if uo.BypassDocumentValidation != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*uo.BypassDocumentValidation)})
}
if uo.Collation != nil {
if ss.Description().WireVersion.Max < 5 {
return result.FindAndModify{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(uo.Collation.ToDocument())
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if uo.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*uo.MaxTime / time.Millisecond))})
}
if uo.Projection != nil {
projElem, err := interfaceToElement("fields", uo.Projection, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, projElem)
}
if uo.ReturnDocument != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"new", bsonx.Boolean(*uo.ReturnDocument == options.After)})
}
if uo.Sort != nil {
sortElem, err := interfaceToElement("sort", uo.Sort, registry)
if err != nil {
return result.FindAndModify{}, err
}
cmd.Opts = append(cmd.Opts, sortElem)
}
if uo.Upsert != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"upsert", bsonx.Boolean(*uo.Upsert)})
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return findOneAndUpdate(ctx, cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := findOneAndUpdate(ctx, cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return findOneAndUpdate(ctx, cmd, ss, cerr)
}
return res, originalErr
}
func findOneAndUpdate(
ctx context.Context,
cmd command.FindOneAndUpdate,
ss *topology.SelectedServer,
oldErr error,
) (result.FindAndModify, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.FindAndModify{}, oldErr
}
return result.FindAndModify{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.FindAndModify{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,123 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// Insert handles the full cycle dispatch and execution of an insert command against the provided
// topology.
func Insert(
ctx context.Context,
cmd command.Insert,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
opts ...*options.InsertManyOptions,
) (result.Insert, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.Insert{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.Insert{}, err
}
defer cmd.Session.EndSession()
}
insertOpts := options.MergeInsertManyOptions(opts...)
if insertOpts.BypassDocumentValidation != nil && ss.Description().WireVersion.Includes(4) {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*insertOpts.BypassDocumentValidation)})
}
if insertOpts.Ordered != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"ordered", bsonx.Boolean(*insertOpts.Ordered)})
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return insert(ctx, &cmd, ss, nil)
}
// TODO figure out best place to put retry write. Command shouldn't have to know about this field.
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := insert(ctx, &cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return insert(ctx, &cmd, ss, cerr)
}
return res, originalErr
}
func insert(
ctx context.Context,
cmd *command.Insert,
ss *topology.SelectedServer,
oldErr error,
) (result.Insert, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.Insert{}, oldErr
}
return result.Insert{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.Insert{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,67 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/x/network/connection"
"go.mongodb.org/mongo-driver/x/network/wiremessage"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// KillCursors handles the full cycle dispatch and execution of an aggregate command against the provided
// topology.
func KillCursors(
ctx context.Context,
ns command.Namespace,
server driver.Server,
cursorID int64,
) (result.KillCursors, error) {
var conn connection.Connection
var desc description.SelectedServer
var err error
if legacyServer, ok := server.(*topology.Server); ok {
desc = legacyServer.SelectedDescription()
conn, err = legacyServer.ConnectionLegacy(ctx)
} else if legacySs, ok := server.(*topology.SelectedServer); ok {
desc = legacySs.Description()
conn, err = legacySs.ConnectionLegacy(ctx)
}
if err != nil {
return result.KillCursors{}, err
}
defer conn.Close()
if desc.WireVersion == nil || desc.WireVersion.Max < 4 {
return result.KillCursors{}, legacyKillCursors(ctx, ns, cursorID, conn)
}
cmd := command.KillCursors{
NS: ns,
IDs: []int64{cursorID},
}
return cmd.RoundTrip(ctx, desc, conn)
}
func legacyKillCursors(ctx context.Context, ns command.Namespace, cursorID int64, conn connection.Connection) error {
kc := wiremessage.KillCursors{
NumberOfCursorIDs: 1,
CursorIDs: []int64{cursorID},
CollectionName: ns.Collection,
DatabaseName: ns.DB,
}
return conn.WriteWireMessage(ctx, kc)
}

View File

@@ -0,0 +1,133 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"errors"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
)
// ErrFilterType is thrown when a non-string filter is specified.
var ErrFilterType = errors.New("filter must be a string")
// ListCollections handles the full cycle dispatch and execution of a listCollections command against the provided
// topology.
func ListCollections(
ctx context.Context,
cmd command.ListCollections,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.ListCollectionsOptions,
) (*ListCollectionsBatchCursor, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
if ss.Description().WireVersion.Max < 3 {
return legacyListCollections(ctx, cmd, ss, conn)
}
rp, err := getReadPrefBasedOnTransaction(cmd.ReadPref, cmd.Session)
if err != nil {
return nil, err
}
cmd.ReadPref = rp
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
}
lc := options.MergeListCollectionsOptions(opts...)
if lc.NameOnly != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"nameOnly", bsonx.Boolean(*lc.NameOnly)})
}
res, err := cmd.RoundTrip(ctx, ss.Description(), conn)
if err != nil {
closeImplicitSession(cmd.Session)
return nil, err
}
batchCursor, err := NewBatchCursor(bsoncore.Document(res), cmd.Session, cmd.Clock, ss.Server, cmd.CursorOpts...)
if err != nil {
closeImplicitSession(cmd.Session)
return nil, err
}
return NewListCollectionsBatchCursor(batchCursor)
}
func legacyListCollections(
ctx context.Context,
cmd command.ListCollections,
ss *topology.SelectedServer,
conn connection.Connection,
) (*ListCollectionsBatchCursor, error) {
filter, err := transformFilter(cmd.Filter, cmd.DB)
if err != nil {
return nil, err
}
findCmd := command.Find{
NS: command.NewNamespace(cmd.DB, "system.namespaces"),
ReadPref: cmd.ReadPref,
Filter: filter,
}
// don't need registry because it's used to create BSON docs for find options that don't exist in this case
batchCursor, err := legacyFind(ctx, findCmd, nil, ss, conn)
if err != nil {
return nil, err
}
return NewLegacyListCollectionsBatchCursor(batchCursor)
}
// modify the user-supplied filter to prefix the "name" field with the database name.
// returns the original filter if the name field is not present or a copy with the modified name field if it is
func transformFilter(filter bsonx.Doc, dbName string) (bsonx.Doc, error) {
if filter == nil {
return filter, nil
}
if nameVal, err := filter.LookupErr("name"); err == nil {
name, ok := nameVal.StringValueOK()
if !ok {
return nil, ErrFilterType
}
filterCopy := filter.Copy()
filterCopy.Set("name", bsonx.String(dbName+"."+name))
return filterCopy, nil
}
return filter, nil
}

View File

@@ -0,0 +1,124 @@
package driverlegacy
import (
"context"
"errors"
"io"
"strings"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
)
// ListCollectionsBatchCursor is a special batch cursor returned from ListCollections that properly
// handles current and legacy ListCollections operations.
type ListCollectionsBatchCursor struct {
legacy bool
bc *BatchCursor
currentBatch *bsoncore.DocumentSequence
err error
}
// NewListCollectionsBatchCursor creates a new non-legacy ListCollectionsCursor.
func NewListCollectionsBatchCursor(bc *BatchCursor) (*ListCollectionsBatchCursor, error) {
if bc == nil {
return nil, errors.New("batch cursor must not be nil")
}
return &ListCollectionsBatchCursor{bc: bc, currentBatch: new(bsoncore.DocumentSequence)}, nil
}
// NewLegacyListCollectionsBatchCursor creates a new legacy ListCollectionsCursor.
func NewLegacyListCollectionsBatchCursor(bc *BatchCursor) (*ListCollectionsBatchCursor, error) {
if bc == nil {
return nil, errors.New("batch cursor must not be nil")
}
return &ListCollectionsBatchCursor{legacy: true, bc: bc, currentBatch: new(bsoncore.DocumentSequence)}, nil
}
// ID returns the cursor ID for this batch cursor.
func (lcbc *ListCollectionsBatchCursor) ID() int64 {
return lcbc.bc.ID()
}
// Next indicates if there is another batch available. Returning false does not necessarily indicate
// that the cursor is closed. This method will return false when an empty batch is returned.
//
// If Next returns true, there is a valid batch of documents available. If Next returns false, there
// is not a valid batch of documents available.
func (lcbc *ListCollectionsBatchCursor) Next(ctx context.Context) bool {
if !lcbc.bc.Next(ctx) {
return false
}
if !lcbc.legacy {
lcbc.currentBatch.Style = lcbc.bc.currentBatch.Style
lcbc.currentBatch.Data = lcbc.bc.currentBatch.Data
lcbc.currentBatch.ResetIterator()
return true
}
lcbc.currentBatch.Style = bsoncore.SequenceStyle
lcbc.currentBatch.Data = lcbc.currentBatch.Data[:0]
var doc bsoncore.Document
for {
doc, lcbc.err = lcbc.bc.currentBatch.Next()
if lcbc.err != nil {
if lcbc.err == io.EOF {
lcbc.err = nil
break
}
return false
}
doc, lcbc.err = lcbc.projectNameElement(doc)
if lcbc.err != nil {
return false
}
lcbc.currentBatch.Data = append(lcbc.currentBatch.Data, doc...)
}
return true
}
// Batch will return a DocumentSequence for the current batch of documents. The returned
// DocumentSequence is only valid until the next call to Next or Close.
func (lcbc *ListCollectionsBatchCursor) Batch() *bsoncore.DocumentSequence { return lcbc.currentBatch }
// Server returns a pointer to the cursor's server.
func (lcbc *ListCollectionsBatchCursor) Server() driver.Server { return lcbc.bc.server }
// Err returns the latest error encountered.
func (lcbc *ListCollectionsBatchCursor) Err() error {
if lcbc.err != nil {
return lcbc.err
}
return lcbc.bc.Err()
}
// Close closes this batch cursor.
func (lcbc *ListCollectionsBatchCursor) Close(ctx context.Context) error { return lcbc.bc.Close(ctx) }
// project out the database name for a legacy server
func (*ListCollectionsBatchCursor) projectNameElement(rawDoc bsoncore.Document) (bsoncore.Document, error) {
elems, err := rawDoc.Elements()
if err != nil {
return nil, err
}
var filteredElems []byte
for _, elem := range elems {
key := elem.Key()
if key != "name" {
filteredElems = append(filteredElems, elem...)
continue
}
name := elem.Value().StringValue()
collName := name[strings.Index(name, ".")+1:]
filteredElems = bsoncore.AppendStringElement(filteredElems, "name", collName)
}
var filteredDoc []byte
filteredDoc = bsoncore.BuildDocument(filteredDoc, filteredElems)
return filteredDoc, nil
}

View File

@@ -0,0 +1,60 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// ListDatabases handles the full cycle dispatch and execution of a listDatabases command against the provided
// topology.
func ListDatabases(
ctx context.Context,
cmd command.ListDatabases,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.ListDatabasesOptions,
) (result.ListDatabases, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.ListDatabases{}, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return result.ListDatabases{}, err
}
defer conn.Close()
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.ListDatabases{}, err
}
defer cmd.Session.EndSession()
}
ld := options.MergeListDatabasesOptions(opts...)
if ld.NameOnly != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"nameOnly", bsonx.Boolean(*ld.NameOnly)})
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}

View File

@@ -0,0 +1,105 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/connection"
)
// ListIndexes handles the full cycle dispatch and execution of a listIndexes command against the provided
// topology.
func ListIndexes(
ctx context.Context,
cmd command.ListIndexes,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
opts ...*options.ListIndexesOptions,
) (*BatchCursor, error) {
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
if ss.Description().WireVersion.Max < 3 {
return legacyListIndexes(ctx, cmd, ss, conn, opts...)
}
lio := options.MergeListIndexesOptions(opts...)
if lio.BatchSize != nil {
elem := bsonx.Elem{"batchSize", bsonx.Int32(*lio.BatchSize)}
cmd.Opts = append(cmd.Opts, elem)
cmd.CursorOpts = append(cmd.CursorOpts, elem)
}
if lio.MaxTime != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*lio.MaxTime / time.Millisecond))})
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
}
res, err := cmd.RoundTrip(ctx, ss.Description(), conn)
if err != nil {
closeImplicitSession(cmd.Session)
return nil, err
}
return NewBatchCursor(bsoncore.Document(res), cmd.Session, cmd.Clock, ss.Server, cmd.CursorOpts...)
}
func legacyListIndexes(
ctx context.Context,
cmd command.ListIndexes,
ss *topology.SelectedServer,
conn connection.Connection,
opts ...*options.ListIndexesOptions,
) (*BatchCursor, error) {
lio := options.MergeListIndexesOptions(opts...)
ns := cmd.NS.DB + "." + cmd.NS.Collection
findCmd := command.Find{
NS: command.NewNamespace(cmd.NS.DB, "system.indexes"),
Filter: bsonx.Doc{
{"ns", bsonx.String(ns)},
},
}
findOpts := options.Find()
if lio.BatchSize != nil {
findOpts.SetBatchSize(*lio.BatchSize)
}
if lio.MaxTime != nil {
findOpts.SetMaxTime(*lio.MaxTime)
}
return legacyFind(ctx, findCmd, nil, ss, conn, findOpts)
}

View File

@@ -0,0 +1,81 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"go.mongodb.org/mongo-driver/mongo/options"
)
// WriteModel is the interface satisfied by all models for bulk writes.
type WriteModel interface {
writeModel()
}
// InsertOneModel is the write model for insert operations.
type InsertOneModel struct {
Document interface{}
}
func (InsertOneModel) writeModel() {}
// DeleteOneModel is the write model for delete operations.
type DeleteOneModel struct {
Filter interface{}
Collation *options.Collation
}
func (DeleteOneModel) writeModel() {}
// DeleteManyModel is the write model for deleteMany operations.
type DeleteManyModel struct {
Filter interface{}
Collation *options.Collation
}
func (DeleteManyModel) writeModel() {}
// UpdateModel contains the fields that are shared between the ReplaceOneModel, UpdateOneModel, and UpdateManyModel types
type UpdateModel struct {
Collation *options.Collation
Upsert bool
UpsertSet bool
}
// ReplaceOneModel is the write model for replace operations.
type ReplaceOneModel struct {
Filter interface{}
Replacement interface{}
UpdateModel
}
func (ReplaceOneModel) writeModel() {}
// UpdateOneModel is the write model for update operations.
type UpdateOneModel struct {
Filter interface{}
Update interface{}
// default is to not send a value. for servers < 3.6, error raised if value given. for unack writes using opcodes,
// error raised if value given
ArrayFilters options.ArrayFilters
ArrayFiltersSet bool
UpdateModel
}
func (UpdateOneModel) writeModel() {}
// UpdateManyModel is the write model for updateMany operations.
type UpdateManyModel struct {
Filter interface{}
Update interface{}
// default is to not send a value. for servers < 3.6, error raised if value given. for unack writes using opcodes,
// error raised if value given
ArrayFilters options.ArrayFilters
ArrayFiltersSet bool
UpdateModel
}
func (UpdateManyModel) writeModel() {}

View File

@@ -0,0 +1,88 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// Read handles the full cycle dispatch and execution of a read command against the provided
// topology.
func Read(
ctx context.Context,
cmd command.Read,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
) (bson.Raw, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
if cmd.Session != nil && cmd.Session.TransactionRunning() {
// When command.read is directly used, this implies an operation level
// read preference, so we do not override it with the transaction read pref.
err = checkTransactionReadPref(cmd.ReadPref)
if err != nil {
return nil, err
}
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, ss.Description(), conn)
}
func getReadPrefBasedOnTransaction(current *readpref.ReadPref, sess *session.Client) (*readpref.ReadPref, error) {
if sess != nil && sess.TransactionRunning() {
// Transaction's read preference always takes priority
current = sess.CurrentRp
err := checkTransactionReadPref(current)
if err != nil {
return nil, err
}
}
return current, nil
}
func checkTransactionReadPref(pref *readpref.ReadPref) error {
if pref != nil && (pref.Mode() == readpref.SecondaryMode ||
pref.Mode() == readpref.SecondaryPreferredMode ||
pref.Mode() == readpref.NearestMode ||
pref.Mode() == readpref.PrimaryPreferredMode) {
return command.ErrNonPrimaryRP
}
return nil
}

View File

@@ -0,0 +1,72 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// ReadCursor handles the full dispatch cycle and execution of a read command against the provided topology and returns
// a Cursor over the resulting BSON reader.
func ReadCursor(
ctx context.Context,
cmd command.Read,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
cursorOpts ...bsonx.Elem,
) (*BatchCursor, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
}
rdr, err := cmd.RoundTrip(ctx, desc, conn)
if err != nil {
if cmd.Session != nil && cmd.Session.SessionType == session.Implicit {
cmd.Session.EndSession()
}
return nil, err
}
cursor, err := NewBatchCursor(bsoncore.Document(rdr), cmd.Session, cmd.Clock, ss.Server, cursorOpts...)
if err != nil {
if cmd.Session != nil && cmd.Session.SessionType == session.Implicit {
cmd.Session.EndSession()
}
return nil, err
}
return cursor, nil
}

View File

@@ -0,0 +1,151 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
"go.mongodb.org/mongo-driver/x/network/result"
)
// Update handles the full cycle dispatch and execution of an update command against the provided
// topology.
func Update(
ctx context.Context,
cmd command.Update,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
retryWrite bool,
opts ...*options.UpdateOptions,
) (result.Update, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return result.Update{}, err
}
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return result.Update{}, err
}
defer cmd.Session.EndSession()
}
updateOpts := options.MergeUpdateOptions(opts...)
if updateOpts.ArrayFilters != nil {
if ss.Description().WireVersion.Max < 6 {
return result.Update{}, ErrArrayFilters
}
filters, err := updateOpts.ArrayFilters.ToArray()
if err != nil {
return result.Update{}, err
}
arr := make(bsonx.Arr, 0, len(filters))
for _, filter := range filters {
doc, err := bsonx.ReadDoc(filter)
if err != nil {
return result.Update{}, err
}
arr = append(arr, bsonx.Document(doc))
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"arrayFilters", bsonx.Array(arr)})
}
if updateOpts.BypassDocumentValidation != nil && ss.Description().WireVersion.Includes(4) {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*updateOpts.BypassDocumentValidation)})
}
if updateOpts.Collation != nil {
if ss.Description().WireVersion.Max < 5 {
return result.Update{}, ErrCollation
}
collDoc, err := bsonx.ReadDoc(updateOpts.Collation.ToDocument())
if err != nil {
return result.Update{}, err
}
cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(collDoc)})
}
if updateOpts.Upsert != nil {
cmd.Opts = append(cmd.Opts, bsonx.Elem{"upsert", bsonx.Boolean(*updateOpts.Upsert)})
}
// Execute in a single trip if retry writes not supported, or retry not enabled
if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
if cmd.Session != nil {
cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
}
return update(ctx, &cmd, ss, nil)
}
cmd.Session.RetryWrite = retryWrite
cmd.Session.IncrementTxnNumber()
res, originalErr := update(ctx, &cmd, ss, nil)
// Retry if appropriate
if cerr, ok := originalErr.(command.Error); (ok && cerr.Retryable()) ||
(res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError)) {
ss, err := topo.SelectServerLegacy(ctx, selector)
// Return original error if server selection fails or new server does not support retryable writes
if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
return res, originalErr
}
return update(ctx, &cmd, ss, cerr)
}
return res, originalErr
}
func update(
ctx context.Context,
cmd *command.Update,
ss *topology.SelectedServer,
oldErr error,
) (result.Update, error) {
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
if oldErr != nil {
return result.Update{}, oldErr
}
return result.Update{}, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return result.Update{}, command.ErrUnacknowledgedWrite
}
defer conn.Close()
res, err := cmd.RoundTrip(ctx, desc, conn)
ss.ProcessWriteConcernError(res.WriteConcernError)
return res, err
}

View File

@@ -0,0 +1,82 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package driverlegacy
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
"go.mongodb.org/mongo-driver/x/mongo/driver/uuid"
"go.mongodb.org/mongo-driver/x/network/command"
)
// Write handles the full cycle dispatch and execution of a write command against the provided
// topology.
func Write(
ctx context.Context,
cmd command.Write,
topo *topology.Topology,
selector description.ServerSelector,
clientID uuid.UUID,
pool *session.Pool,
) (bson.Raw, error) {
if cmd.Session != nil && cmd.Session.PinnedServer != nil {
selector = cmd.Session.PinnedServer
}
ss, err := topo.SelectServerLegacy(ctx, selector)
if err != nil {
return nil, err
}
desc := ss.Description()
conn, err := ss.ConnectionLegacy(ctx)
if err != nil {
return nil, err
}
if !writeconcern.AckWrite(cmd.WriteConcern) {
go func() {
defer func() { _ = recover() }()
defer conn.Close()
_, _ = cmd.RoundTrip(ctx, desc, conn)
}()
return nil, command.ErrUnacknowledgedWrite
}
defer conn.Close()
// If no explicit session and deployment supports sessions, start implicit session.
if cmd.Session == nil && topo.SupportsSessions() {
cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
if err != nil {
return nil, err
}
defer cmd.Session.EndSession()
}
return cmd.RoundTrip(ctx, desc, conn)
}
// Retryable writes are supported if the server supports sessions, the operation is not
// within a transaction, and the write is acknowledged
func retrySupported(
topo *topology.Topology,
desc description.SelectedServer,
sess *session.Client,
wc *writeconcern.WriteConcern,
) bool {
return topo.SupportsSessions() &&
description.SessionsSupported(desc.WireVersion) &&
!(sess.TransactionInProgress() || sess.TransactionStarting()) &&
writeconcern.AckWrite(wc)
}