db

package
v0.0.0-...-6df37af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2023 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package db implements generic connection to MongoDB, and contains subpackages for specific methods of connection.

Index

Constants

View Source
const (
	Snapshot = 1 << iota
	LogReplay
	Prefetch
)

Query flags

View Source
const (
	Mongos     NodeType = "mongos"
	Standalone          = "standalone"
	ReplSet             = "replset"
	Unknown             = "unknown"
)
View Source
const (
	None      sessionFlag = 0
	Monotonic sessionFlag = 1 << iota
	DisableSocketTimeout
)

Session flags.

View Source
const (
	ErrLostConnection     = "lost connection to server"
	ErrNoReachableServers = "no reachable servers"
	ErrNsNotFound         = "ns not found"
	// replication errors list the replset name if we are talking to a mongos,
	// so we can only check for this universal prefix
	ErrReplTimeoutPrefix            = "waiting for replication timed out"
	ErrCouldNotContactPrimaryPrefix = "could not contact primary for replica set"
	ErrWriteResultsUnavailable      = "write results unavailable from"
	ErrCouldNotFindPrimaryPrefix    = `could not find host matching read preference { mode: "primary"`
	ErrUnableToTargetPrefix         = "unable to target"
	ErrNotMaster                    = "not master"
	ErrConnectionRefusedSuffix      = "Connection refused"

	// ignorable errors
	ErrDuplicateKeyCode         = 11000
	ErrFailedDocumentValidation = 121
	ErrUnacknowledgedWrite      = "unacknowledged write"
)
View Source
const (
	DefaultTestPort = "33333"
)

Default port for integration tests

View Source
const MAX_MESSAGE_SIZE_BYTES = 48000000

The default value of maxMessageSizeBytes See: https://docs.mongodb.com/manual/reference/command/hello/#mongodb-data-hello.maxMessageSizeBytes

View Source
const (
	MaxBSONSize = 16 * 1024 * 1024 // 16MB - maximum BSON document size
)

MongoDB enforced limits.

View Source
const (
	WarningNonPrimaryMongosConnection = "Warning: using a non-primary readPreference with a " +
		"connection to mongos may produce inconsistent duplicates or miss some documents."
)

Variables

This section is empty.

Functions

func ApplyFlags

func ApplyFlags(opts *mopt.FindOneOptions, flags int)

ApplyFlags applies flags to the given query session.

func CanIgnoreError

func CanIgnoreError(err error) bool

Returns whether the tools can continue when encountering the given error. Currently, only DuplicateKeyErrors are ignorable.

func FilterError

func FilterError(stopOnError bool, err error) error

FilterError determines whether an error needs to be propagated back to the user or can be continued through. If an error cannot be ignored, a non-nil error is returned. If an error can be continued through, it is logged and nil is returned.

func GetCollections

func GetCollections(database *mongo.Database, name string) (*mongo.Cursor, error)

Assumes that mongo.Database will normalize legacy names to omit database name as required by the Enumerate Collections spec

func GetIndexes

func GetIndexes(coll *mongo.Collection) (*mongo.Cursor, error)

GetIndexes returns an iterator to thethe raw index info for a collection by using the listIndexes command if available, or by falling back to querying against system.indexes (pre-3.0 systems). nil is returned if the collection does not exist.

func IsMMAPV1

func IsMMAPV1(database *mongo.Database, collectionName string) (bool, error)

IsMMAPV1 returns whether the storage engine is MMAPV1. Also returns false if the storage engine type cannot be determined for some reason.

func NewMongoWriteConcern

func NewMongoWriteConcern(writeConcern string, cs *connstring.ConnString) (wc *writeconcern.WriteConcern, err error)

NewMongoWriteConcern takes a string (from the command line writeConcern option) and a ConnString object (from the command line uri option) and returns a WriteConcern. If both are provided, preference is given to the command line writeConcern option. If neither is provided, the default 'majority' write concern is constructed.

func NewReadPreference

func NewReadPreference(rp string, cs *connstring.ConnString) (*readpref.ReadPref, error)

NewReadPreference takes a string (command line read preference argument) and a ConnString (from the command line URI argument) and returns a ReadPref. If both are provided, preference is given to the command line argument. If both are empty, a default read preference of primary will be returned.

func OpTimeEquals

func OpTimeEquals(lhs OpTime, rhs OpTime) bool

OpTimeEquals returns true if lhs equals rhs, false otherwise. We first check for nil / not nil mismatches between the terms and between the hashes. Then we check for equality between the terms and between the hashes (if they exist) before checking the timestamps.

func OpTimeGreaterThan

func OpTimeGreaterThan(lhs OpTime, rhs OpTime) bool

OpTimeGreaterThan returns true if lhs comes after rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.

func OpTimeIsEmpty

func OpTimeIsEmpty(opTime OpTime) bool

OpTimeIsEmpty returns true if opTime is uninitialized, false otherwise.

func OpTimeLessThan

func OpTimeLessThan(lhs OpTime, rhs OpTime) bool

OpTimeLessThan returns true if lhs comes before rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.

func StripDBFromNamespace

func StripDBFromNamespace(namespace string, dbName string) (string, error)

Types

type ApplyOpsResponse

type ApplyOpsResponse struct {
	Ok     bool   `bson:"ok"`
	ErrMsg string `bson:"errmsg"`
}

ApplyOpsResponse represents the response from an 'applyOps' command.

type BSONSource

type BSONSource struct {
	Stream io.ReadCloser

	MaxBSONSize int32
	// contains filtered or unexported fields
}

BSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.

func NewBSONSource

func NewBSONSource(in io.ReadCloser) *BSONSource

NewBSONSource creates a BSONSource with a reusable I/O buffer

func NewBufferlessBSONSource

func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource

NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer

func (*BSONSource) Close

func (bs *BSONSource) Close() error

Close closes the BSONSource, rendering it unusable for I/O. It returns an error, if any.

func (*BSONSource) Err

func (bs *BSONSource) Err() error

func (*BSONSource) LoadNext

func (bs *BSONSource) LoadNext() []byte

LoadNext reads and returns the next BSON document in the stream. If the BSONSource was created with NewBSONSource then each returned []byte will be a slice of a single reused I/O buffer. If the BSONSource was created with NewBufferlessBSONSource then each returend []byte will be individually allocated

func (*BSONSource) SetMaxBSONSize

func (bs *BSONSource) SetMaxBSONSize(size int32)

type BufferedBulkInserter

type BufferedBulkInserter struct {
	// contains filtered or unexported fields
}

BufferedBulkInserter implements a bufio.Writer-like design for queuing up documents and inserting them in bulk when the given doc limit (or max message size) is reached. Must be flushed at the end to ensure that all documents are written.

func NewOrderedBufferedBulkInserter

func NewOrderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter

NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing ordered bulk writes.

func NewUnorderedBufferedBulkInserter

func NewUnorderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter

NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing unordered bulk writes.

func (*BufferedBulkInserter) Delete

func (bb *BufferedBulkInserter) Delete(selector, replacement bson.D) (*mongo.BulkWriteResult, error)

Delete adds a document to the buffer for bulk removal. If the buffer becomes full, the bulk delete is performed, returning any error that occurs.

func (*BufferedBulkInserter) Flush

Flush writes all buffered documents in one bulk write and then resets the buffer.

func (*BufferedBulkInserter) Insert

func (bb *BufferedBulkInserter) Insert(doc interface{}) (*mongo.BulkWriteResult, error)

Insert adds a document to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.

func (*BufferedBulkInserter) InsertRaw

func (bb *BufferedBulkInserter) InsertRaw(rawBytes []byte) (*mongo.BulkWriteResult, error)

InsertRaw adds a document, represented as raw bson bytes, to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.

func (*BufferedBulkInserter) Replace

func (bb *BufferedBulkInserter) Replace(selector, replacement bson.D) (*mongo.BulkWriteResult, error)

Replace adds a document to the buffer for bulk replacement. If the buffer becomes full, the bulk write is performed, returning any error that occurs.

func (*BufferedBulkInserter) SetBypassDocumentValidation

func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter

func (*BufferedBulkInserter) SetOrdered

func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter

func (*BufferedBulkInserter) SetUpsert

func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter

func (*BufferedBulkInserter) Update

func (bb *BufferedBulkInserter) Update(selector, update bson.D) (*mongo.BulkWriteResult, error)

Update adds a document to the buffer for bulk update. If the buffer becomes full, the bulk write is performed, returning any error that occurs.

type CollectionInfo

type CollectionInfo struct {
	Name    string `bson:"name"`
	Type    string `bson:"type"`
	Options bson.M `bson:"options"`
	Info    bson.M `bson:"info"`
}

func GetCollectionInfo

func GetCollectionInfo(coll *mongo.Collection) (*CollectionInfo, error)

func (*CollectionInfo) GetUUID

func (ci *CollectionInfo) GetUUID() string

func (*CollectionInfo) IsSystemCollection

func (ci *CollectionInfo) IsSystemCollection() bool

func (*CollectionInfo) IsTimeseries

func (ci *CollectionInfo) IsTimeseries() bool

func (*CollectionInfo) IsView

func (ci *CollectionInfo) IsView() bool

type CommandRunner

type CommandRunner interface {
	Run(command interface{}, out interface{}, database string) error
	RunString(commandName string, out interface{}, database string) error
	FindOne(db, collection string, skip int, query interface{}, sort []string, into interface{}, opts int) error
	Remove(db, collection string, query interface{}) error
	DatabaseNames() ([]string, error)
	CollectionNames(db string) ([]string, error)
}

CommandRunner exposes functions that can be run against a server XXX Does anything rely on this?

type DecodedBSONSource

type DecodedBSONSource struct {
	RawDocSource
	// contains filtered or unexported fields
}

DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.

func NewDecodedBSONSource

func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource

func (*DecodedBSONSource) Err

func (dbs *DecodedBSONSource) Err() error

Err returns any error in the DecodedBSONSource or its RawDocSource.

func (*DecodedBSONSource) Next

func (dbs *DecodedBSONSource) Next(result interface{}) bool

NextGBSON unmarshals the next BSON document into result using the official go driver. Returns true if no errors are encountered and false otherwise. This function does NOT zero out the result before writing to it.

type DeferredQuery

type DeferredQuery struct {
	Coll      *mongo.Collection
	Filter    interface{}
	Hint      interface{}
	LogReplay bool
}

DeferredQuery represents a deferred query

func (*DeferredQuery) Count

func (q *DeferredQuery) Count(isView bool) (int, error)

Count issues a EstimatedDocumentCount command when there is no Filter in the query and a CountDocuments command otherwise.

func (*DeferredQuery) Iter

func (q *DeferredQuery) Iter() (*mongo.Cursor, error)

Iter executes a find query and returns a cursor.

type NodeType

type NodeType string

type OpTime

type OpTime struct {
	Timestamp primitive.Timestamp `json:"timestamp"`
	Term      *int64              `json:"term"`
	Hash      *int64              `json:"hash"`
}

OpTime represents the values to uniquely identify an oplog entry. An OpTime must always have a timestamp, but may or may not have a term. The hash is set uniquely up until (and including) version 4.0, but is set to zero in version 4.2+ with plans to remove it soon (see SERVER-36334).

func GetLatestOplogOpTime

func GetLatestOplogOpTime(client *mongo.Client, query interface{}) (OpTime, error)

GetLatestOplogOpTime returns the optime of the most recent oplog record satisfying the given `query` or a zero-value db.OpTime{} if no oplog record matches. This method does not ensure that all prior oplog entries are visible (i.e. have been storage-committed).

func GetLatestVisibleOplogOpTime

func GetLatestVisibleOplogOpTime(client *mongo.Client) (OpTime, error)

GetLatestVisibleOplogOpTime returns the optime of the most recent "visible" oplog record. By "visible", we mean that all prior oplog entries have been storage-committed. See SERVER-30724 for a more detailed description.

func GetOldestActiveTransactionOpTime

func GetOldestActiveTransactionOpTime(client *mongo.Client) (OpTime, error)

GetOldestActiveTransactionOpTime returns the oldest active transaction optime from the config.transactions table or else a zero-value db.OpTime{}

func GetOpTimeFromOplogEntry

func GetOpTimeFromOplogEntry(oplogEntry *Oplog) OpTime

GetOpTimeFromOplogEntry returns an OpTime struct from the relevant fields in an Oplog struct.

func GetOpTimeFromRawOplogEntry

func GetOpTimeFromRawOplogEntry(rawOplogEntry bson.Raw) (OpTime, error)

GetOpTimeFromRawOplogEntry looks up the ts (timestamp), t (term), and h (hash) fields in a raw oplog entry, and assigns them to an OpTime. If the Timestamp can't be found or is an invalid format, it throws an error. If the Term or Hash fields can't be found, it returns the OpTime without them.

func (OpTime) String

func (ot OpTime) String() string

type Oplog

type Oplog struct {
	Timestamp  primitive.Timestamp `bson:"ts"`
	Term       *int64              `bson:"t"`
	Hash       *int64              `bson:"h"`
	Version    int                 `bson:"v"`
	Operation  string              `bson:"op"`
	Namespace  string              `bson:"ns"`
	Object     bson.D              `bson:"o"`
	Query      bson.D              `bson:"o2,omitempty"`
	UI         *primitive.Binary   `bson:"ui,omitempty"`
	LSID       bson.Raw            `bson:"lsid,omitempty"`
	TxnNumber  *int64              `bson:"txnNumber,omitempty"`
	PrevOpTime bson.Raw            `bson:"prevOpTime,omitempty"`
}

Oplog represents a MongoDB oplog document.

type OplogTailTime

type OplogTailTime struct {
	Latest  OpTime
	Restart OpTime
}

OplogTailTime represents two ways of describing the "end" of the oplog at a point in time. The Latest field represents the last visible (storage committed) timestamp. The Restart field represents a (possibly older) timestamp that can be used to start tailing or copying the oplog without losing parts of transactions in progress.

func GetOplogTailTime

func GetOplogTailTime(client *mongo.Client) (OplogTailTime, error)

GetOplogTailTime constructs an OplogTailTime

type RawDocSource

type RawDocSource interface {
	LoadNext() []byte
	Close() error
	Err() error
}

RawDocSource wraps basic functions for reading a BSON source file.

type SessionProvider

type SessionProvider struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Used to manage database sessions

func NewSessionProvider

func NewSessionProvider(opts options.ToolOptions) (*SessionProvider, error)

NewSessionProvider constructs a session provider, including a connected client.

func (*SessionProvider) Close

func (sp *SessionProvider) Close()

Close closes the master session in the connection pool

func (*SessionProvider) CreateCollection

func (sp *SessionProvider) CreateCollection(dbName, collName string) error

func (*SessionProvider) DB

func (sp *SessionProvider) DB(name string) *mongo.Database

DB provides a database with the default read preference

func (*SessionProvider) DatabaseNames

func (sp *SessionProvider) DatabaseNames() ([]string, error)

DatabaseNames returns a slice containing the names of all the databases on the connected server.

func (*SessionProvider) DropDatabase

func (sp *SessionProvider) DropDatabase(dbName string) error

func (*SessionProvider) FindOne

func (sp *SessionProvider) FindOne(db, collection string, skip int, query interface{}, sort interface{}, into interface{}, flags int) error

FindOne retuns the first document in the collection and database that matches the query after skip, sort and query flags are applied.

func (*SessionProvider) GetNodeType

func (sp *SessionProvider) GetNodeType() (NodeType, error)

GetNodeType checks if the connected SessionProvider is a mongos, standalone, or replset, by looking at the result of calling isMaster.

func (*SessionProvider) GetSession

func (sp *SessionProvider) GetSession() (*mongo.Client, error)

Returns a mongo.Client connected to the database server for which the session provider is configured.

func (*SessionProvider) IsAtlasProxy

func (sp *SessionProvider) IsAtlasProxy() (bool, error)

IsAtlasProxy checks if the connected SessionProvider is an atlas proxy.

func (*SessionProvider) IsMongos

func (sp *SessionProvider) IsMongos() (bool, error)

IsMongos returns true if the connected server is a mongos.

func (*SessionProvider) IsReplicaSet

func (sp *SessionProvider) IsReplicaSet() (bool, error)

IsReplicaSet returns a boolean which is true if the connected server is part of a replica set.

func (*SessionProvider) Run

func (sp *SessionProvider) Run(command interface{}, out interface{}, name string) error

func (*SessionProvider) RunApplyOpsCreateIndex

func (sp *SessionProvider) RunApplyOpsCreateIndex(C, DB string, index bson.D, UUID *primitive.Binary, result *interface{}) error

RunApplyOpsCreateIndex will create index using applyOps. For versions that support collection UUIDs (<3.6) it uses an insert to system indexes. Later versions use the createIndexes command.

func (*SessionProvider) RunString

func (sp *SessionProvider) RunString(commandName string, out interface{}, name string) error

func (*SessionProvider) ServerVersion

func (sp *SessionProvider) ServerVersion() (string, error)

func (*SessionProvider) ServerVersionArray

func (sp *SessionProvider) ServerVersionArray() (Version, error)

type Version

type Version [3]int

func (Version) Cmp

func (v1 Version) Cmp(v2 Version) int

func (Version) GT

func (v1 Version) GT(v2 Version) bool

func (Version) GTE

func (v1 Version) GTE(v2 Version) bool

func (Version) LT

func (v1 Version) LT(v2 Version) bool

func (Version) LTE

func (v1 Version) LTE(v2 Version) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL