Documentation ¶
Overview ¶
Package db implements generic connection to MongoDB, and contains subpackages for specific methods of connection.
Index ¶
- Constants
- func ApplyFlags(opts *mopt.FindOneOptions, flags int)
- func CanIgnoreError(err error) bool
- func FilterError(stopOnError bool, err error) error
- func GetCollections(database *mongo.Database, name string) (*mongo.Cursor, error)
- func GetIndexes(coll *mongo.Collection) (*mongo.Cursor, error)
- func IsMMAPV1(database *mongo.Database, collectionName string) (bool, error)
- func NewMongoWriteConcern(writeConcern string, cs *connstring.ConnString) (wc *writeconcern.WriteConcern, err error)
- func NewReadPreference(rp string, cs *connstring.ConnString) (*readpref.ReadPref, error)
- func OpTimeEquals(lhs OpTime, rhs OpTime) bool
- func OpTimeGreaterThan(lhs OpTime, rhs OpTime) bool
- func OpTimeIsEmpty(opTime OpTime) bool
- func OpTimeLessThan(lhs OpTime, rhs OpTime) bool
- func StripDBFromNamespace(namespace string, dbName string) (string, error)
- type ApplyOpsResponse
- type BSONSource
- type BufferedBulkInserter
- func (bb *BufferedBulkInserter) Delete(selector, replacement bson.D) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Flush() (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Insert(doc interface{}) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) InsertRaw(rawBytes []byte) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Replace(selector, replacement bson.D) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) Update(selector, update bson.D) (*mongo.BulkWriteResult, error)
- type CollectionInfo
- type CommandRunner
- type DecodedBSONSource
- type DeferredQuery
- type NodeType
- type OpTime
- func GetLatestOplogOpTime(client *mongo.Client, query interface{}) (OpTime, error)
- func GetLatestVisibleOplogOpTime(client *mongo.Client) (OpTime, error)
- func GetOldestActiveTransactionOpTime(client *mongo.Client) (OpTime, error)
- func GetOpTimeFromOplogEntry(oplogEntry *Oplog) OpTime
- func GetOpTimeFromRawOplogEntry(rawOplogEntry bson.Raw) (OpTime, error)
- type Oplog
- type OplogTailTime
- type RawDocSource
- type SessionProvider
- func (sp *SessionProvider) Close()
- func (sp *SessionProvider) CreateCollection(dbName, collName string) error
- func (sp *SessionProvider) DB(name string) *mongo.Database
- func (sp *SessionProvider) DatabaseNames() ([]string, error)
- func (sp *SessionProvider) DropDatabase(dbName string) error
- func (sp *SessionProvider) FindOne(db, collection string, skip int, query interface{}, sort interface{}, ...) error
- func (sp *SessionProvider) GetNodeType() (NodeType, error)
- func (sp *SessionProvider) GetSession() (*mongo.Client, error)
- func (sp *SessionProvider) IsAtlasProxy() (bool, error)
- func (sp *SessionProvider) IsMongos() (bool, error)
- func (sp *SessionProvider) IsReplicaSet() (bool, error)
- func (sp *SessionProvider) Run(command interface{}, out interface{}, name string) error
- func (sp *SessionProvider) RunApplyOpsCreateIndex(C, DB string, index bson.D, UUID *primitive.Binary, result *interface{}) error
- func (sp *SessionProvider) RunString(commandName string, out interface{}, name string) error
- func (sp *SessionProvider) ServerVersion() (string, error)
- func (sp *SessionProvider) ServerVersionArray() (Version, error)
- type Version
Constants ¶
const ( Snapshot = 1 << iota LogReplay Prefetch )
Query flags
const ( Mongos NodeType = "mongos" Standalone = "standalone" ReplSet = "replset" Unknown = "unknown" )
const ( None sessionFlag = 0 Monotonic sessionFlag = 1 << iota DisableSocketTimeout )
Session flags.
const ( ErrLostConnection = "lost connection to server" ErrNoReachableServers = "no reachable servers" ErrNsNotFound = "ns not found" // replication errors list the replset name if we are talking to a mongos, // so we can only check for this universal prefix ErrReplTimeoutPrefix = "waiting for replication timed out" ErrCouldNotContactPrimaryPrefix = "could not contact primary for replica set" ErrCouldNotFindPrimaryPrefix = `could not find host matching read preference { mode: "primary"` ErrUnableToTargetPrefix = "unable to target" ErrNotMaster = "not master" ErrConnectionRefusedSuffix = "Connection refused" // ignorable errors ErrDuplicateKeyCode = 11000 ErrFailedDocumentValidation = 121 ErrUnacknowledgedWrite = "unacknowledged write" )
const (
DefaultTestPort = "33333"
)
Default port for integration tests
const MAX_MESSAGE_SIZE_BYTES = 48000000
The default value of maxMessageSizeBytes See: https://docs.mongodb.com/manual/reference/command/hello/#mongodb-data-hello.maxMessageSizeBytes
const (
MaxBSONSize = 16 * 1024 * 1024 // 16MB - maximum BSON document size
)
MongoDB enforced limits.
const (
WarningNonPrimaryMongosConnection = "Warning: using a non-primary readPreference with a " +
"connection to mongos may produce inconsistent duplicates or miss some documents."
)
Variables ¶
This section is empty.
Functions ¶
func ApplyFlags ¶
func ApplyFlags(opts *mopt.FindOneOptions, flags int)
ApplyFlags applies flags to the given query session.
func CanIgnoreError ¶
Returns whether the tools can continue when encountering the given error. Currently, only DuplicateKeyErrors are ignorable.
func FilterError ¶
FilterError determines whether an error needs to be propagated back to the user or can be continued through. If an error cannot be ignored, a non-nil error is returned. If an error can be continued through, it is logged and nil is returned.
func GetCollections ¶
Assumes that mongo.Database will normalize legacy names to omit database name as required by the Enumerate Collections spec
func GetIndexes ¶
func GetIndexes(coll *mongo.Collection) (*mongo.Cursor, error)
GetIndexes returns an iterator to thethe raw index info for a collection by using the listIndexes command if available, or by falling back to querying against system.indexes (pre-3.0 systems). nil is returned if the collection does not exist.
func IsMMAPV1 ¶
IsMMAPV1 returns whether the storage engine is MMAPV1. Also returns false if the storage engine type cannot be determined for some reason.
func NewMongoWriteConcern ¶
func NewMongoWriteConcern(writeConcern string, cs *connstring.ConnString) (wc *writeconcern.WriteConcern, err error)
NewMongoWriteConcern takes a string (from the command line writeConcern option) and a ConnString object (from the command line uri option) and returns a WriteConcern. If both are provided, preference is given to the command line writeConcern option. If neither is provided, the default 'majority' write concern is constructed.
func NewReadPreference ¶
func NewReadPreference(rp string, cs *connstring.ConnString) (*readpref.ReadPref, error)
NewReadPreference takes a string (command line read preference argument) and a ConnString (from the command line URI argument) and returns a ReadPref. If both are provided, preference is given to the command line argument. If both are empty, a default read preference of primary will be returned.
func OpTimeEquals ¶
OpTimeEquals returns true if lhs equals rhs, false otherwise. We first check for nil / not nil mismatches between the terms and between the hashes. Then we check for equality between the terms and between the hashes (if they exist) before checking the timestamps.
func OpTimeGreaterThan ¶
OpTimeGreaterThan returns true if lhs comes after rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.
func OpTimeIsEmpty ¶
OpTimeIsEmpty returns true if opTime is uninitialized, false otherwise.
func OpTimeLessThan ¶
OpTimeLessThan returns true if lhs comes before rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.
Types ¶
type ApplyOpsResponse ¶
ApplyOpsResponse represents the response from an 'applyOps' command.
type BSONSource ¶
type BSONSource struct { Stream io.ReadCloser MaxBSONSize int32 // contains filtered or unexported fields }
BSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.
func NewBSONSource ¶
func NewBSONSource(in io.ReadCloser) *BSONSource
NewBSONSource creates a BSONSource with a reusable I/O buffer
func NewBufferlessBSONSource ¶
func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource
NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer
func (*BSONSource) Close ¶
func (bs *BSONSource) Close() error
Close closes the BSONSource, rendering it unusable for I/O. It returns an error, if any.
func (*BSONSource) Err ¶
func (bs *BSONSource) Err() error
func (*BSONSource) LoadNext ¶
func (bs *BSONSource) LoadNext() []byte
LoadNext reads and returns the next BSON document in the stream. If the BSONSource was created with NewBSONSource then each returned []byte will be a slice of a single reused I/O buffer. If the BSONSource was created with NewBufferlessBSONSource then each returend []byte will be individually allocated
func (*BSONSource) SetMaxBSONSize ¶
func (bs *BSONSource) SetMaxBSONSize(size int32)
type BufferedBulkInserter ¶
type BufferedBulkInserter struct {
// contains filtered or unexported fields
}
BufferedBulkInserter implements a bufio.Writer-like design for queuing up documents and inserting them in bulk when the given doc limit (or max message size) is reached. Must be flushed at the end to ensure that all documents are written.
func NewOrderedBufferedBulkInserter ¶
func NewOrderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter
NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing ordered bulk writes.
func NewUnorderedBufferedBulkInserter ¶
func NewUnorderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter
NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing unordered bulk writes.
func (*BufferedBulkInserter) Delete ¶
func (bb *BufferedBulkInserter) Delete(selector, replacement bson.D) (*mongo.BulkWriteResult, error)
Delete adds a document to the buffer for bulk removal. If the buffer becomes full, the bulk delete is performed, returning any error that occurs.
func (*BufferedBulkInserter) Flush ¶
func (bb *BufferedBulkInserter) Flush() (*mongo.BulkWriteResult, error)
Flush writes all buffered documents in one bulk write and then resets the buffer.
func (*BufferedBulkInserter) Insert ¶
func (bb *BufferedBulkInserter) Insert(doc interface{}) (*mongo.BulkWriteResult, error)
Insert adds a document to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) InsertRaw ¶
func (bb *BufferedBulkInserter) InsertRaw(rawBytes []byte) (*mongo.BulkWriteResult, error)
InsertRaw adds a document, represented as raw bson bytes, to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) Replace ¶
func (bb *BufferedBulkInserter) Replace(selector, replacement bson.D) (*mongo.BulkWriteResult, error)
Replace adds a document to the buffer for bulk replacement. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) SetBypassDocumentValidation ¶
func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter
func (*BufferedBulkInserter) SetOrdered ¶
func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter
func (*BufferedBulkInserter) SetUpsert ¶
func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter
func (*BufferedBulkInserter) Update ¶
func (bb *BufferedBulkInserter) Update(selector, update bson.D) (*mongo.BulkWriteResult, error)
Update adds a document to the buffer for bulk update. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
type CollectionInfo ¶
type CollectionInfo struct { Name string `bson:"name"` Type string `bson:"type"` Options bson.M `bson:"options"` Info bson.M `bson:"info"` }
func GetCollectionInfo ¶
func GetCollectionInfo(coll *mongo.Collection) (*CollectionInfo, error)
func (*CollectionInfo) GetUUID ¶
func (ci *CollectionInfo) GetUUID() string
func (*CollectionInfo) IsSystemCollection ¶
func (ci *CollectionInfo) IsSystemCollection() bool
func (*CollectionInfo) IsTimeseries ¶
func (ci *CollectionInfo) IsTimeseries() bool
func (*CollectionInfo) IsView ¶
func (ci *CollectionInfo) IsView() bool
type CommandRunner ¶
type CommandRunner interface { Run(command interface{}, out interface{}, database string) error RunString(commandName string, out interface{}, database string) error FindOne(db, collection string, skip int, query interface{}, sort []string, into interface{}, opts int) error Remove(db, collection string, query interface{}) error DatabaseNames() ([]string, error) CollectionNames(db string) ([]string, error) }
CommandRunner exposes functions that can be run against a server XXX Does anything rely on this?
type DecodedBSONSource ¶
type DecodedBSONSource struct { RawDocSource // contains filtered or unexported fields }
DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.
func NewDecodedBSONSource ¶
func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource
func (*DecodedBSONSource) Err ¶
func (dbs *DecodedBSONSource) Err() error
Err returns any error in the DecodedBSONSource or its RawDocSource.
func (*DecodedBSONSource) Next ¶
func (dbs *DecodedBSONSource) Next(result interface{}) bool
NextGBSON unmarshals the next BSON document into result using the official go driver. Returns true if no errors are encountered and false otherwise. This function does NOT zero out the result before writing to it.
type DeferredQuery ¶
type DeferredQuery struct { Coll *mongo.Collection Filter interface{} Hint interface{} LogReplay bool }
DeferredQuery represents a deferred query
type OpTime ¶
type OpTime struct { Timestamp primitive.Timestamp `json:"timestamp"` Term *int64 `json:"term"` Hash *int64 `json:"hash"` }
OpTime represents the values to uniquely identify an oplog entry. An OpTime must always have a timestamp, but may or may not have a term. The hash is set uniquely up until (and including) version 4.0, but is set to zero in version 4.2+ with plans to remove it soon (see SERVER-36334).
func GetLatestOplogOpTime ¶
GetLatestOplogOpTime returns the optime of the most recent oplog record satisfying the given `query` or a zero-value db.OpTime{} if no oplog record matches. This method does not ensure that all prior oplog entries are visible (i.e. have been storage-committed).
func GetLatestVisibleOplogOpTime ¶
GetLatestVisibleOplogOpTime returns the optime of the most recent "visible" oplog record. By "visible", we mean that all prior oplog entries have been storage-committed. See SERVER-30724 for a more detailed description.
func GetOldestActiveTransactionOpTime ¶
GetOldestActiveTransactionOpTime returns the oldest active transaction optime from the config.transactions table or else a zero-value db.OpTime{}
func GetOpTimeFromOplogEntry ¶
GetOpTimeFromOplogEntry returns an OpTime struct from the relevant fields in an Oplog struct.
func GetOpTimeFromRawOplogEntry ¶
GetOpTimeFromRawOplogEntry looks up the ts (timestamp), t (term), and h (hash) fields in a raw oplog entry, and assigns them to an OpTime. If the Timestamp can't be found or is an invalid format, it throws an error. If the Term or Hash fields can't be found, it returns the OpTime without them.
type Oplog ¶
type Oplog struct { Timestamp primitive.Timestamp `bson:"ts"` Term *int64 `bson:"t"` Hash *int64 `bson:"h"` Version int `bson:"v"` Operation string `bson:"op"` Namespace string `bson:"ns"` Object bson.D `bson:"o"` Query bson.D `bson:"o2,omitempty"` UI *primitive.Binary `bson:"ui,omitempty"` LSID bson.Raw `bson:"lsid,omitempty"` TxnNumber *int64 `bson:"txnNumber,omitempty"` PrevOpTime bson.Raw `bson:"prevOpTime,omitempty"` }
Oplog represents a MongoDB oplog document.
type OplogTailTime ¶
OplogTailTime represents two ways of describing the "end" of the oplog at a point in time. The Latest field represents the last visible (storage committed) timestamp. The Restart field represents a (possibly older) timestamp that can be used to start tailing or copying the oplog without losing parts of transactions in progress.
func GetOplogTailTime ¶
func GetOplogTailTime(client *mongo.Client) (OplogTailTime, error)
GetOplogTailTime constructs an OplogTailTime
type RawDocSource ¶
RawDocSource wraps basic functions for reading a BSON source file.
type SessionProvider ¶
Used to manage database sessions
func NewSessionProvider ¶
func NewSessionProvider(opts options.ToolOptions) (*SessionProvider, error)
NewSessionProvider constructs a session provider, including a connected client.
func (*SessionProvider) Close ¶
func (sp *SessionProvider) Close()
Close closes the master session in the connection pool
func (*SessionProvider) CreateCollection ¶
func (sp *SessionProvider) CreateCollection(dbName, collName string) error
func (*SessionProvider) DB ¶
func (sp *SessionProvider) DB(name string) *mongo.Database
DB provides a database with the default read preference
func (*SessionProvider) DatabaseNames ¶
func (sp *SessionProvider) DatabaseNames() ([]string, error)
DatabaseNames returns a slice containing the names of all the databases on the connected server.
func (*SessionProvider) DropDatabase ¶
func (sp *SessionProvider) DropDatabase(dbName string) error
func (*SessionProvider) FindOne ¶
func (sp *SessionProvider) FindOne(db, collection string, skip int, query interface{}, sort interface{}, into interface{}, flags int) error
FindOne retuns the first document in the collection and database that matches the query after skip, sort and query flags are applied.
func (*SessionProvider) GetNodeType ¶
func (sp *SessionProvider) GetNodeType() (NodeType, error)
GetNodeType checks if the connected SessionProvider is a mongos, standalone, or replset, by looking at the result of calling isMaster.
func (*SessionProvider) GetSession ¶
func (sp *SessionProvider) GetSession() (*mongo.Client, error)
Returns a mongo.Client connected to the database server for which the session provider is configured.
func (*SessionProvider) IsAtlasProxy ¶
func (sp *SessionProvider) IsAtlasProxy() (bool, error)
IsAtlasProxy checks if the connected SessionProvider is an atlas proxy.
func (*SessionProvider) IsMongos ¶
func (sp *SessionProvider) IsMongos() (bool, error)
IsMongos returns true if the connected server is a mongos.
func (*SessionProvider) IsReplicaSet ¶
func (sp *SessionProvider) IsReplicaSet() (bool, error)
IsReplicaSet returns a boolean which is true if the connected server is part of a replica set.
func (*SessionProvider) Run ¶
func (sp *SessionProvider) Run(command interface{}, out interface{}, name string) error
func (*SessionProvider) RunApplyOpsCreateIndex ¶
func (sp *SessionProvider) RunApplyOpsCreateIndex(C, DB string, index bson.D, UUID *primitive.Binary, result *interface{}) error
RunApplyOpsCreateIndex will create index using applyOps. For versions that support collection UUIDs (<3.6) it uses an insert to system indexes. Later versions use the createIndexes command.
func (*SessionProvider) RunString ¶
func (sp *SessionProvider) RunString(commandName string, out interface{}, name string) error
func (*SessionProvider) ServerVersion ¶
func (sp *SessionProvider) ServerVersion() (string, error)
func (*SessionProvider) ServerVersionArray ¶
func (sp *SessionProvider) ServerVersionArray() (Version, error)