Documentation ¶
Index ¶
- Constants
- Variables
- func BuildFullWhereStatement(f MongoCollectionFilter) *bson.D
- func DriverConnectionOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)
- func DriverConnectionSrvOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)
- func EnableChangeStreams(ctx context.Context, client *MongoClientWrapper, colls []MongoCollection) error
- func ExtractKey(id interface{}, isHomo bool) (interface{}, error)
- func FromMongoTimestamp(t primitive.Timestamp) time.Time
- func GetDocument(columns []interface{}) bson.D
- func GetID(columns []interface{}) interface{}
- func GetIntFromEnv(varName string) int
- func GetLocalOplogInterval(ctx context.Context, client *MongoClientWrapper) (from, to primitive.Timestamp, _err error)
- func GetUpdatedDocument(columns []interface{}) bson.D
- func GetValueByPath(doc any, path string) (any, bool)
- func GetVersion(ctx context.Context, client *MongoClientWrapper, authSource string) (*semver.Version, error)
- func IsNativeMongoSchema(tableSchema []abstract.ColSchema) bool
- func IsUpdateDocumentSchema(tableSchema []abstract.ColSchema) bool
- func MarshalFilter(filter ShardingFilter) (string, error)
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewDatabaseDocumentKeyWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase) (*databaseDocumentKeyWatcher, error)
- func NewDatabaseFullDocumentWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase, rs MongoReplicationSource) (*databaseFullDocumentWatcher, error)
- func NewKeyBatcher(ctx context.Context, logger log.Logger, ...) (*keyBatcher, error)
- func NewOneshotNamespaceRetriever(s *mongoSource, resumeToken bson.Raw, dbName string) (*oneshotNamespaceRetriever, error)
- func NewSinker(lgr log.Logger, dst *MongoDestination, mtrcs metrics.Registry) (abstract.Sinker, error)
- func NewSource(src *MongoSource, transferID string, objects *model.DataObjects, ...) (abstract.Source, error)
- func SetValueByPath(doc any, path string, val any, emptyBsonContainerFactory func() any) (any, error)
- func SyncClusterTime(ctx context.Context, src *MongoSource, defaultCACertPaths []string) error
- func ToMongoTimestamp(t time.Time) primitive.Timestamp
- func ToRegexp(c MongoCollection) string
- func ValueInMongoFormat(item *abstract.ChangeItem, index int) (interface{}, error)
- type BatcherParameters
- type CACertificatePEMFilePaths
- type ChangeEvent
- type ChangeStreamWatcher
- type DExtension
- type DValue
- type DocumentKey
- type FullDocumentExtractor
- type InlineCACertificatePEM
- type KeyChangeEvent
- type MongoClientWrapper
- type MongoCollection
- type MongoCollectionFilter
- type MongoConnectionOptions
- type MongoContainer
- type MongoDestination
- func (d *MongoDestination) CleanupMode() model.CleanupType
- func (d *MongoDestination) ConnectionOptions(caCertPaths []string) MongoConnectionOptions
- func (d *MongoDestination) GetProviderType() abstract.ProviderType
- func (d *MongoDestination) HasTLS() bool
- func (MongoDestination) IsDestination()
- func (d *MongoDestination) MDBClusterID() string
- func (d *MongoDestination) ToStorageParams() *MongoStorageParams
- func (d *MongoDestination) Transformer() map[string]string
- func (d *MongoDestination) Validate() error
- func (d *MongoDestination) WithDefaults()
- type MongoReplicationSource
- type MongoSource
- func (s *MongoSource) AllIncludes() []string
- func (s *MongoSource) ConnectionOptions(defaultCACertPaths []string) MongoConnectionOptions
- func (s *MongoSource) FulfilledIncludes(tID abstract.TableID) (result []string)
- func (s *MongoSource) GetMongoCollectionFilter() MongoCollectionFilter
- func (s *MongoSource) GetProviderType() abstract.ProviderType
- func (s *MongoSource) HasTLS() bool
- func (s *MongoSource) Include(tID abstract.TableID) bool
- func (MongoSource) IsSource()
- func (MongoSource) IsStrictSource()
- func (s *MongoSource) MDBClusterID() string
- func (s *MongoSource) ToStorageParams() *MongoStorageParams
- func (s *MongoSource) Validate() error
- func (s *MongoSource) WithDefaults()
- type MongoStorageParams
- type Namespace
- type ParallelizationUnit
- type ParallelizationUnitDatabase
- func (p ParallelizationUnitDatabase) GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error)
- func (p ParallelizationUnitDatabase) Ping(ctx context.Context, client *MongoClientWrapper) error
- func (p ParallelizationUnitDatabase) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, ...) error
- func (p ParallelizationUnitDatabase) String() string
- type ParallelizationUnitOplog
- func (p ParallelizationUnitOplog) GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error)
- func (p ParallelizationUnitOplog) Ping(ctx context.Context, client *MongoClientWrapper) error
- func (p ParallelizationUnitOplog) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, ...) error
- func (p ParallelizationUnitOplog) String() string
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, ...) error
- func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
- func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
- func (p *Provider) Storage() (abstract.Storage, error)
- func (p *Provider) Type() abstract.ProviderType
- type RecipeOption
- type SchemaDescription
- type ShardKeysInfo
- type ShardingFilter
- type Storage
- func (s *Storage) Close()
- func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, ...) error
- func (s *Storage) LoadSchema() (dbSchema abstract.DBSchema, err error)
- func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) Ping() error
- func (s Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
- func (s *Storage) TableAccessible(table abstract.TableDescription) bool
- func (s *Storage) TableExists(table abstract.TableID) (bool, error)
- func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)
- type StorageOpt
- type TimeCollectionScheme
- type TruncatedArray
- type TrustedCACertificate
- type UpdateDescription
- type UpdateDocumentChangeItem
- func (u *UpdateDocumentChangeItem) CheckDiffByKeys(checkKeys []string) map[string]any
- func (u *UpdateDocumentChangeItem) FullDocument() bson.D
- func (u *UpdateDocumentChangeItem) HasTruncatedArrays() bool
- func (u *UpdateDocumentChangeItem) IsApplicablePatch() bool
- func (u *UpdateDocumentChangeItem) RemovedFields() []string
- func (u *UpdateDocumentChangeItem) TruncatedArrays() []TruncatedArray
- func (u *UpdateDocumentChangeItem) UpdatedFields() bson.D
Constants ¶
const ( // TODO(@kry127) tune constant, or make more deterministic approach of measuring pipeline BSON size DefaultKeySizeThreshold = 4 * 1024 * 1024 // soft upper border for batch size in bytes (if one key is bigger, it'll fit) DefaultBatchFlushInterval = 5 * time.Second // batch is guaranteed to be flushed every five seconds, but it may flush more frequently DefaultBatchSizeLimit = 500 // limit of amount of events inside one batch // desired part size of collection TablePartByteSize = 1024 * 1024 * 1024 DataTransferSystemDatabase = "__data_transfer" )
const ( SystemDatabase = "__data_transfer" // used only for old versions of mongo ClusterTimeCollName = "__dt_cluster_time" )
const ( ID = "_id" UpdatedFields = "updatedFields" RemovedFields = "removedFields" TruncatedArrays = "truncatedArrays" FullDocument = "fullDocument" Document = "document" )
const ( ChangeStreamFatalErrorCode = 280 ChangeStreamHistoryLostCode = 286 )
const DefaultAuthSource = "admin"
const OplogProtocolVersion = 2
const ProviderType = abstract.ProviderType("mongo")
Variables ¶
var ( DocumentSchema = newSchemaDescription(abstract.NewTableSchema([]abstract.ColSchema{{ ColumnName: ID, DataType: ytschema.TypeString.String(), PrimaryKey: true, OriginalType: "mongo:bson_id", }, { ColumnName: Document, DataType: ytschema.TypeAny.String(), OriginalType: "mongo:bson", }})) UpdateDocumentSchema = newSchemaDescription(abstract.NewTableSchema([]abstract.ColSchema{{ ColumnName: ID, DataType: ytschema.TypeString.String(), PrimaryKey: true, OriginalType: "mongo:bson", }, { ColumnName: UpdatedFields, DataType: ytschema.TypeAny.String(), OriginalType: "mongo:bson", }, { ColumnName: RemovedFields, DataType: ytschema.TypeAny.String(), OriginalType: "mongo:bson", }, { ColumnName: TruncatedArrays, DataType: ytschema.TypeAny.String(), OriginalType: "mongo:bson", }, { ColumnName: FullDocument, DataType: ytschema.TypeAny.String(), OriginalType: "mongo:bson", }})) CollectionFilter = bson.D{ { Key: "type", Value: "collection", }, } SystemDBs = []string{ "admin", "config", "local", "mdb_internal", } )
var (
ErrEmptyFilter = xerrors.New("Filters pass empty collection list")
)
var (
MongoVersion4_0 = must(semver.ParseTolerant("4.0"))
)
Functions ¶
func BuildFullWhereStatement ¶
func BuildFullWhereStatement(f MongoCollectionFilter) *bson.D
func DriverConnectionOptions ¶
func DriverConnectionOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)
func DriverConnectionSrvOptions ¶
func DriverConnectionSrvOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)
func EnableChangeStreams ¶
func EnableChangeStreams(ctx context.Context, client *MongoClientWrapper, colls []MongoCollection) error
func ExtractKey ¶
func GetDocument ¶
func GetIntFromEnv ¶
func GetLocalOplogInterval ¶
func GetUpdatedDocument ¶
func GetVersion ¶
func GetVersion(ctx context.Context, client *MongoClientWrapper, authSource string) (*semver.Version, error)
GetVersion tries to get version from database authSource. If authSource is empty string, default will be used
func IsNativeMongoSchema ¶
func IsUpdateDocumentSchema ¶
func MarshalFilter ¶
func MarshalFilter(filter ShardingFilter) (string, error)
func New ¶
func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *model.Transfer) providers.Provider
func NewDatabaseDocumentKeyWatcher ¶
func NewDatabaseDocumentKeyWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase) (*databaseDocumentKeyWatcher, error)
func NewDatabaseFullDocumentWatcher ¶
func NewDatabaseFullDocumentWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase, rs MongoReplicationSource) (*databaseFullDocumentWatcher, error)
func NewKeyBatcher ¶
func NewKeyBatcher( ctx context.Context, logger log.Logger, fullDocumentExtractor FullDocumentExtractor, fullDocumentPusher changeEventPusher, parameters *BatcherParameters, ) (*keyBatcher, error)
NewKeyBatcher nil parameters are default parameters
func NewOneshotNamespaceRetriever ¶
func NewOneshotNamespaceRetriever(s *mongoSource, resumeToken bson.Raw, dbName string) (*oneshotNamespaceRetriever, error)
NewOneshotNamespaceRetriever creates namespace-only watcher recovery point (resume token) should be specified precisely, or current time will be used
func NewSource ¶
func NewSource(src *MongoSource, transferID string, objects *model.DataObjects, logger log.Logger, registry metrics.Registry, cp coordinator.Coordinator) (abstract.Source, error)
func SetValueByPath ¶
func SyncClusterTime ¶
func SyncClusterTime(ctx context.Context, src *MongoSource, defaultCACertPaths []string) error
SyncClusterTime is mongo version dependent code
func ToRegexp ¶
func ToRegexp(c MongoCollection) string
func ValueInMongoFormat ¶
func ValueInMongoFormat(item *abstract.ChangeItem, index int) (interface{}, error)
ValueInMongoFormat extracts the column value at the given index and converts it to MongoDB format
Types ¶
type BatcherParameters ¶
type CACertificatePEMFilePaths ¶
type CACertificatePEMFilePaths []string
type ChangeEvent ¶
type ChangeEvent struct { KeyChangeEvent `bson:",inline"` FullDocument bson.D `bson:"fullDocument"` UpdateDescription *UpdateDescription `bson:"updateDescription"` }
type ChangeStreamWatcher ¶
type ChangeStreamWatcher interface { // Watch is one-shot method. After return all allocated resources should be freed with calling Close Watch(context.Context, changeEventPusher) error Close(context.Context) // GetResumeToken returns last resume token to watch from, or nil if info is not available // this may be used in future for chaining watchers in the oplog during fallback GetResumeToken() bson.Raw }
ChangeStreamWatcher produces changeEvents encapsulates method with which full documents of mongo collections are retrieved
type DExtension ¶
func DExt ¶
func DExt(root bson.D) *DExtension
func (*DExtension) Map ¶
func (d *DExtension) Map() bson.M
func (*DExtension) RawValue ¶
func (d *DExtension) RawValue() bson.D
func (*DExtension) SetKey ¶
func (d *DExtension) SetKey(key string, val interface{})
func (*DExtension) Value ¶
func (d *DExtension) Value(isHomo, preventJSONRepack bool) DValue
type DValue ¶
DValue struct is used as document in change items in order to:
- Provide jsonSerializable interface in our type system
- Bring back legacy behaviour for typesystem < 7
func (DValue) MarshalJSON ¶
func (DValue) RepackValue ¶
type DocumentKey ¶
type DocumentKey struct {
ID interface{} `bson:"_id"`
}
type FullDocumentExtractor ¶
type FullDocumentExtractor func(ctx context.Context, ns Namespace, keyList bson.A) ([]sizedFullDocument, error)
FullDocumentExtractor to get default instance of this type use MakeDefaultFullDocumentExtractor
func MakeDefaultFullDocumentExtractor ¶
func MakeDefaultFullDocumentExtractor(client *MongoClientWrapper) FullDocumentExtractor
MakeDefaultFullDocumentExtractor Constructs default document extractor for batcher that uses Mongo connection
type InlineCACertificatePEM ¶
type InlineCACertificatePEM []byte
type KeyChangeEvent ¶
type KeyChangeEvent struct { OperationType string `bson:"operationType"` DocumentKey DocumentKey `bson:"documentKey"` Namespace Namespace `bson:"ns"` ToNamespace Namespace `bson:"to"` ClusterTime primitive.Timestamp `bson:"clusterTime"` }
func (*KeyChangeEvent) ToChangeEvent ¶
func (k *KeyChangeEvent) ToChangeEvent() *ChangeEvent
type MongoClientWrapper ¶
type MongoClientWrapper struct { *mongo.Client IsDocDB bool // contains filtered or unexported fields }
func Connect ¶
func Connect(ctx context.Context, opts MongoConnectionOptions, lgr log.Logger) (*MongoClientWrapper, error)
Connect function should be one and only one valid method of creation mongo client this method logs into 'lgr' about connection options
type MongoCollection ¶
func GetAllExistingCollections ¶
func GetAllExistingCollections(ctx context.Context, client *MongoClientWrapper) ([]MongoCollection, error)
func NewMongoCollection ¶
func NewMongoCollection(planeName string) *MongoCollection
func (MongoCollection) String ¶
func (c MongoCollection) String() string
type MongoCollectionFilter ¶
type MongoCollectionFilter struct { Collections []MongoCollection ExcludedCollections []MongoCollection }
func (MongoCollectionFilter) BuildPipeline ¶
func (f MongoCollectionFilter) BuildPipeline(forDatabaseName string) (mongo.Pipeline, error)
BuildPipeline returns mongo pipeline that should be able to filter the oplog from unwanted changes in database 'forDatabaseName'
type MongoConnectionOptions ¶
type MongoConnectionOptions struct { ClusterID string Hosts []string Port int ReplicaSet string AuthSource string User string Password string CACert TrustedCACertificate Direct bool SRVMode bool }
func (MongoConnectionOptions) IsDocDB ¶
func (o MongoConnectionOptions) IsDocDB() bool
IsDocDB check if we connect to amazon doc DB
type MongoContainer ¶
type MongoContainer struct { testcontainers.Container // contains filtered or unexported fields }
PostgresContainer represents the postgres container type used in the module
func StartMongoContainer ¶
func StartMongoContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*MongoContainer, error)
type MongoDestination ¶
type MongoDestination struct { ClusterID string Hosts []string Port int Database string ReplicaSet string AuthSource string User string Password model.SecretString TransformerConfig map[string]string Cleanup model.CleanupType SubNetworkID string SecurityGroupIDs []string TLSFile string // make a `direct` connection to mongo, see: https://www.mongodb.com/docs/drivers/go/current/fundamentals/connections/connection-guide/ Direct bool RootCAFiles []string // indicates whether the mongoDB client uses a mongodb+srv connection SRVMode bool }
func RecipeTarget ¶
func RecipeTarget(options ...RecipeOption) *MongoDestination
func (*MongoDestination) CleanupMode ¶
func (d *MongoDestination) CleanupMode() model.CleanupType
func (*MongoDestination) ConnectionOptions ¶
func (d *MongoDestination) ConnectionOptions(caCertPaths []string) MongoConnectionOptions
func (*MongoDestination) GetProviderType ¶
func (d *MongoDestination) GetProviderType() abstract.ProviderType
func (*MongoDestination) HasTLS ¶
func (d *MongoDestination) HasTLS() bool
func (MongoDestination) IsDestination ¶
func (MongoDestination) IsDestination()
func (*MongoDestination) MDBClusterID ¶
func (d *MongoDestination) MDBClusterID() string
func (*MongoDestination) ToStorageParams ¶
func (d *MongoDestination) ToStorageParams() *MongoStorageParams
func (*MongoDestination) Transformer ¶
func (d *MongoDestination) Transformer() map[string]string
func (*MongoDestination) Validate ¶
func (d *MongoDestination) Validate() error
func (*MongoDestination) WithDefaults ¶
func (d *MongoDestination) WithDefaults()
type MongoReplicationSource ¶
type MongoReplicationSource string
var ( MongoReplicationSourceUnspecified MongoReplicationSource = "" MongoReplicationSourcePerDatabase MongoReplicationSource = "PerDatabase" MongoReplicationSourcePerDatabaseFullDocument MongoReplicationSource = "PerDatabase_FullDocument" MongoReplicationSourcePerDatabaseUpdateDocument MongoReplicationSource = "PerDatabase_UpdateDocument" MongoReplicationSourceOplog MongoReplicationSource = "Oplog" )
func ReplicationSourceFallback ¶
func ReplicationSourceFallback(logger log.Logger, pl MongoReplicationSource, mongoVersion *semver.Version) MongoReplicationSource
type MongoSource ¶
type MongoSource struct { ClusterID string Hosts []string Port int ReplicaSet string AuthSource string User string Password model.SecretString Collections []MongoCollection ExcludedCollections []MongoCollection SubNetworkID string SecurityGroupIDs []string TechnicalDatabase string // deprecated: should be always "" IsHomo bool SlotID string // It's synthetic entity. Always equal to transfer_id! SecondaryPreferredMode bool TLSFile string ReplicationSource MongoReplicationSource BatchingParams *BatcherParameters // for now this is private params DesiredPartSize uint64 PreventJSONRepack bool // should not be used, use cases for migration: DTSUPPORT-1596 // FilterOplogWithRegexp is matters when ReplicationSource==MongoReplicationSourceOplog // // When it is set to false (recommended default value): no filtering of oplog will happen // // When it is set to true, oplog events are filtered with regexp build according to // Collections and ExcludedCollections // Regexp unconditionally includes collections '$cmd.*' for technical events and ” for noops // + Advantage of this mode: network efficiency // - Disadvantage of this mode: when there are no changes on listened database, oplog will be lost // TODO(@kry127) Consider turning on consumer keeper in this case (separate tech db + filtering will be enough) FilterOplogWithRegexp bool // make a `direct` connection to mongo, see: https://www.mongodb.com/docs/drivers/go/current/fundamentals/connections/connection-guide/ Direct bool RootCAFiles []string // indicates whether the mongoDB client uses a mongodb+srv connection SRVMode bool }
func RecipeSource ¶
func RecipeSource(options ...RecipeOption) *MongoSource
func (*MongoSource) AllIncludes ¶
func (s *MongoSource) AllIncludes() []string
func (*MongoSource) ConnectionOptions ¶
func (s *MongoSource) ConnectionOptions(defaultCACertPaths []string) MongoConnectionOptions
func (*MongoSource) FulfilledIncludes ¶
func (s *MongoSource) FulfilledIncludes(tID abstract.TableID) (result []string)
func (*MongoSource) GetMongoCollectionFilter ¶
func (s *MongoSource) GetMongoCollectionFilter() MongoCollectionFilter
func (*MongoSource) GetProviderType ¶
func (s *MongoSource) GetProviderType() abstract.ProviderType
func (*MongoSource) HasTLS ¶
func (s *MongoSource) HasTLS() bool
func (MongoSource) IsSource ¶
func (MongoSource) IsSource()
func (MongoSource) IsStrictSource ¶
func (MongoSource) IsStrictSource()
func (*MongoSource) MDBClusterID ¶
func (s *MongoSource) MDBClusterID() string
func (*MongoSource) ToStorageParams ¶
func (s *MongoSource) ToStorageParams() *MongoStorageParams
func (*MongoSource) Validate ¶
func (s *MongoSource) Validate() error
func (*MongoSource) WithDefaults ¶
func (s *MongoSource) WithDefaults()
type MongoStorageParams ¶
type MongoStorageParams struct { TLSFile string ClusterID string Hosts []string Port int ReplicaSet string AuthSource string User string Password string Collections []MongoCollection DesiredPartSize uint64 PreventJSONRepack bool Direct bool RootCAFiles []string SRVMode bool }
func (*MongoStorageParams) ConnectionOptions ¶
func (s *MongoStorageParams) ConnectionOptions(defaultCACertPaths []string) MongoConnectionOptions
type ParallelizationUnit ¶
type ParallelizationUnit interface { fmt.Stringer Ping(ctx context.Context, client *MongoClientWrapper) error GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error }
type ParallelizationUnitDatabase ¶
type ParallelizationUnitDatabase struct { // SlotID -- identifier of resource associated with replication (e.g. transfer ID) SlotID string // UnitDatabase -- what database is a replication unit of parallelizm UnitDatabase string // contains filtered or unexported fields }
func MakeParallelizationUnitDatabase ¶
func MakeParallelizationUnitDatabase(technicalDatabase, slotID, dbName string) ParallelizationUnitDatabase
func (ParallelizationUnitDatabase) GetClusterTime ¶
func (p ParallelizationUnitDatabase) GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error)
func (ParallelizationUnitDatabase) Ping ¶
func (p ParallelizationUnitDatabase) Ping(ctx context.Context, client *MongoClientWrapper) error
func (ParallelizationUnitDatabase) SaveClusterTime ¶
func (p ParallelizationUnitDatabase) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error
func (ParallelizationUnitDatabase) String ¶
func (p ParallelizationUnitDatabase) String() string
type ParallelizationUnitOplog ¶
type ParallelizationUnitOplog struct { // SlotID -- identifier of resource associated with replication (e.g. transfer ID) SlotID string // contains filtered or unexported fields }
func MakeParallelizationUnitOplog ¶
func MakeParallelizationUnitOplog(technicalDatabase, slotID string) ParallelizationUnitOplog
func (ParallelizationUnitOplog) GetClusterTime ¶
func (p ParallelizationUnitOplog) GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error)
func (ParallelizationUnitOplog) Ping ¶
func (p ParallelizationUnitOplog) Ping(ctx context.Context, client *MongoClientWrapper) error
func (ParallelizationUnitOplog) SaveClusterTime ¶
func (p ParallelizationUnitOplog) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error
func (ParallelizationUnitOplog) String ¶
func (p ParallelizationUnitOplog) String() string
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) DestinationSampleableStorage ¶
func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
func (*Provider) SourceSampleableStorage ¶
func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type RecipeOption ¶
type RecipeOption func(opts *recipeOpts)
func WithCollections ¶
func WithCollections(collections ...MongoCollection) RecipeOption
func WithPrefix ¶
func WithPrefix(prefix string) RecipeOption
type SchemaDescription ¶
type SchemaDescription struct { Columns *abstract.TableSchema ColumnsNames []string Indexes map[string]int }
type ShardKeysInfo ¶
type ShardKeysInfo struct { ID string `bson:"_id"` Lastmod primitive.DateTime `bson:"lastmod"` Timestamp primitive.Timestamp `bson:"timestamp"` Key bson.D `bson:"key"` Unique bool `bson:"unique"` LastmodEpoch primitive.ObjectID `bson:"lastmodEpoch"` // UUID issue: https://stackoverflow.com/questions/64723089/how-to-store-a-uuid-in-mongodb-with-golang // UUID interface{} `bson:"uuid"` KeyFields []string // contains filtered or unexported fields }
func GetShardingKey ¶
func GetShardingKey(ctx context.Context, client *MongoClientWrapper, ns Namespace) (*ShardKeysInfo, error)
func (*ShardKeysInfo) ContainsID ¶
func (s *ShardKeysInfo) ContainsID() bool
func (*ShardKeysInfo) Fields ¶
func (s *ShardKeysInfo) Fields() []string
func (*ShardKeysInfo) GetNamespace ¶
func (s *ShardKeysInfo) GetNamespace() *Namespace
func (*ShardKeysInfo) IsTrivial ¶
func (s *ShardKeysInfo) IsTrivial() bool
type ShardingFilter ¶
func UnmarshalFilter ¶
func UnmarshalFilter(marshalledFilter string) (ShardingFilter, error)
type Storage ¶
type Storage struct { Client *MongoClientWrapper IsHomo bool // contains filtered or unexported fields }
func NewStorage ¶
func NewStorage(config *MongoStorageParams, opts ...StorageOpt) (*Storage, error)
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) LoadRandomSample ¶
func (*Storage) LoadSampleBySet ¶
func (*Storage) LoadTopBottomSample ¶
func (Storage) ShardTable ¶
func (s Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
func (*Storage) TableAccessible ¶
func (s *Storage) TableAccessible(table abstract.TableDescription) bool
func (*Storage) TableSchema ¶
type StorageOpt ¶
func WithMetrics ¶
func WithMetrics(registry metrics.Registry) StorageOpt
type TimeCollectionScheme ¶
type TruncatedArray ¶
type TrustedCACertificate ¶
type TrustedCACertificate interface {
// contains filtered or unexported methods
}
type UpdateDescription ¶
type UpdateDescription struct { UpdatedFields bson.D `bson:"updatedFields"` RemovedFields []string `bson:"removedFields"` TruncatedArrays []TruncatedArray `bson:"truncatedArrays"` }
type UpdateDocumentChangeItem ¶
type UpdateDocumentChangeItem struct {
// contains filtered or unexported fields
}
func NewUpdateDocumentChangeItem ¶
func NewUpdateDocumentChangeItem(item *abstract.ChangeItem) (*UpdateDocumentChangeItem, error)
func (*UpdateDocumentChangeItem) CheckDiffByKeys ¶
func (u *UpdateDocumentChangeItem) CheckDiffByKeys(checkKeys []string) map[string]any
func (*UpdateDocumentChangeItem) FullDocument ¶
func (u *UpdateDocumentChangeItem) FullDocument() bson.D
func (*UpdateDocumentChangeItem) HasTruncatedArrays ¶
func (u *UpdateDocumentChangeItem) HasTruncatedArrays() bool
func (*UpdateDocumentChangeItem) IsApplicablePatch ¶
func (u *UpdateDocumentChangeItem) IsApplicablePatch() bool
func (*UpdateDocumentChangeItem) RemovedFields ¶
func (u *UpdateDocumentChangeItem) RemovedFields() []string
func (*UpdateDocumentChangeItem) TruncatedArrays ¶
func (u *UpdateDocumentChangeItem) TruncatedArrays() []TruncatedArray
func (*UpdateDocumentChangeItem) UpdatedFields ¶
func (u *UpdateDocumentChangeItem) UpdatedFields() bson.D
Source Files ¶
- batcher.go
- bson.go
- bulk_splitter.go
- change_stream.go
- change_stream_watcher.go
- client.go
- convert.go
- database_document_key_watcher.go
- database_full_document_watcher.go
- deep_copy.go
- document.go
- fallback_dvalue_json_repack.go
- local_oplog_rs_watcher.go
- model_mongo_connection_options.go
- model_mongo_destination.go
- model_mongo_source.go
- model_mongo_storage_params.go
- mongo_recipe.go
- namespace_only_watcher.go
- oplog_v2_parser.go
- parallelization_unit.go
- parallelization_unit_database.go
- parallelization_unit_oplog.go
- provider.go
- sampleable_storage.go
- schema.go
- shard_key.go
- sharded_collection.go
- sharding_storage.go
- sink.go
- sink_bulk_operations.go
- source.go
- storage.go
- time.go
- typesystem.go
- utils.go
- version.go
- write_models.go