mongo

package
v0.0.0-rc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2024 License: Apache-2.0 Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TODO(@kry127) tune constant, or make more deterministic approach of measuring pipeline BSON size
	DefaultKeySizeThreshold   = 4 * 1024 * 1024 // soft upper border for batch size in bytes (if one key is bigger, it'll fit)
	DefaultBatchFlushInterval = 5 * time.Second // batch is guaranteed to be flushed every five seconds, but it may flush more frequently
	DefaultBatchSizeLimit     = 500             // limit of amount of events inside one batch

	// desired part size of collection
	TablePartByteSize = 1024 * 1024 * 1024

	DataTransferSystemDatabase = "__data_transfer"
)
View Source
const (
	SystemDatabase      = "__data_transfer" // used only for old versions of mongo
	ClusterTimeCollName = "__dt_cluster_time"
)
View Source
const (
	ID              = "_id"
	UpdatedFields   = "updatedFields"
	RemovedFields   = "removedFields"
	TruncatedArrays = "truncatedArrays"
	FullDocument    = "fullDocument"
	Document        = "document"
)
View Source
const (
	ChangeStreamFatalErrorCode  = 280
	ChangeStreamHistoryLostCode = 286
)
View Source
const DefaultAuthSource = "admin"
View Source
const OplogProtocolVersion = 2
View Source
const ProviderType = abstract.ProviderType("mongo")

Variables

View Source
var (
	DocumentSchema = newSchemaDescription(abstract.NewTableSchema([]abstract.ColSchema{{
		ColumnName:   ID,
		DataType:     ytschema.TypeString.String(),
		PrimaryKey:   true,
		OriginalType: "mongo:bson_id",
	}, {
		ColumnName:   Document,
		DataType:     ytschema.TypeAny.String(),
		OriginalType: "mongo:bson",
	}}))

	UpdateDocumentSchema = newSchemaDescription(abstract.NewTableSchema([]abstract.ColSchema{{
		ColumnName:   ID,
		DataType:     ytschema.TypeString.String(),
		PrimaryKey:   true,
		OriginalType: "mongo:bson",
	}, {
		ColumnName:   UpdatedFields,
		DataType:     ytschema.TypeAny.String(),
		OriginalType: "mongo:bson",
	}, {
		ColumnName:   RemovedFields,
		DataType:     ytschema.TypeAny.String(),
		OriginalType: "mongo:bson",
	}, {
		ColumnName:   TruncatedArrays,
		DataType:     ytschema.TypeAny.String(),
		OriginalType: "mongo:bson",
	}, {
		ColumnName:   FullDocument,
		DataType:     ytschema.TypeAny.String(),
		OriginalType: "mongo:bson",
	}}))

	CollectionFilter = bson.D{

		{
			Key:   "type",
			Value: "collection",
		},
	}

	SystemDBs = []string{
		"admin",
		"config",
		"local",
		"mdb_internal",
	}
)
View Source
var (
	ErrEmptyFilter = xerrors.New("Filters pass empty collection list")
)
View Source
var (
	MongoVersion4_0 = must(semver.ParseTolerant("4.0"))
)

Functions

func BuildFullWhereStatement

func BuildFullWhereStatement(f MongoCollectionFilter) *bson.D

func DriverConnectionOptions

func DriverConnectionOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)

func DriverConnectionSrvOptions

func DriverConnectionSrvOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error)

func EnableChangeStreams

func EnableChangeStreams(ctx context.Context, client *MongoClientWrapper, colls []MongoCollection) error

func ExtractKey

func ExtractKey(id interface{}, isHomo bool) (interface{}, error)

func FromMongoTimestamp

func FromMongoTimestamp(t primitive.Timestamp) time.Time

func GetDocument

func GetDocument(columns []interface{}) bson.D

func GetID

func GetID(columns []interface{}) interface{}

func GetIntFromEnv

func GetIntFromEnv(varName string) int

func GetLocalOplogInterval

func GetLocalOplogInterval(ctx context.Context, client *MongoClientWrapper) (from, to primitive.Timestamp, _err error)

func GetUpdatedDocument

func GetUpdatedDocument(columns []interface{}) bson.D

func GetValueByPath

func GetValueByPath(doc any, path string) (any, bool)

func GetVersion

func GetVersion(ctx context.Context, client *MongoClientWrapper, authSource string) (*semver.Version, error)

GetVersion tries to get version from database authSource. If authSource is empty string, default will be used

func IsNativeMongoSchema

func IsNativeMongoSchema(tableSchema []abstract.ColSchema) bool

func IsUpdateDocumentSchema

func IsUpdateDocumentSchema(tableSchema []abstract.ColSchema) bool

func MarshalFilter

func MarshalFilter(filter ShardingFilter) (string, error)

func New

func NewDatabaseDocumentKeyWatcher

func NewDatabaseDocumentKeyWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase) (*databaseDocumentKeyWatcher, error)

func NewDatabaseFullDocumentWatcher

func NewDatabaseFullDocumentWatcher(s *mongoSource, dbPu ParallelizationUnitDatabase, rs MongoReplicationSource) (*databaseFullDocumentWatcher, error)

func NewKeyBatcher

func NewKeyBatcher(
	ctx context.Context,
	logger log.Logger,
	fullDocumentExtractor FullDocumentExtractor,
	fullDocumentPusher changeEventPusher,
	parameters *BatcherParameters,
) (*keyBatcher, error)

NewKeyBatcher nil parameters are default parameters

func NewOneshotNamespaceRetriever

func NewOneshotNamespaceRetriever(s *mongoSource, resumeToken bson.Raw, dbName string) (*oneshotNamespaceRetriever, error)

NewOneshotNamespaceRetriever creates namespace-only watcher recovery point (resume token) should be specified precisely, or current time will be used

func NewSinker

func NewSinker(lgr log.Logger, dst *MongoDestination, mtrcs metrics.Registry) (abstract.Sinker, error)

func NewSource

func NewSource(src *MongoSource, transferID string, objects *model.DataObjects, logger log.Logger, registry metrics.Registry, cp coordinator.Coordinator) (abstract.Source, error)

func SetValueByPath

func SetValueByPath(doc any, path string, val any, emptyBsonContainerFactory func() any) (any, error)

func SyncClusterTime

func SyncClusterTime(ctx context.Context, src *MongoSource, defaultCACertPaths []string) error

SyncClusterTime is mongo version dependent code

func ToMongoTimestamp

func ToMongoTimestamp(t time.Time) primitive.Timestamp

func ToRegexp

func ToRegexp(c MongoCollection) string

func ValueInMongoFormat

func ValueInMongoFormat(item *abstract.ChangeItem, index int) (interface{}, error)

ValueInMongoFormat extracts the column value at the given index and converts it to MongoDB format

Types

type BatcherParameters

type BatcherParameters struct {
	BatchSizeLimit     uint
	KeySizeThreshold   uint64
	BatchFlushInterval time.Duration
}

type CACertificatePEMFilePaths

type CACertificatePEMFilePaths []string

type ChangeEvent

type ChangeEvent struct {
	KeyChangeEvent    `bson:",inline"`
	FullDocument      bson.D             `bson:"fullDocument"`
	UpdateDescription *UpdateDescription `bson:"updateDescription"`
}

type ChangeStreamWatcher

type ChangeStreamWatcher interface {
	// Watch is one-shot method. After return all allocated resources should be freed with calling Close
	Watch(context.Context, changeEventPusher) error
	Close(context.Context)
	// GetResumeToken returns last resume token to watch from, or nil if info is not available
	// this may be used in future for chaining watchers in the oplog during fallback
	GetResumeToken() bson.Raw
}

ChangeStreamWatcher produces changeEvents encapsulates method with which full documents of mongo collections are retrieved

type DExtension

type DExtension struct {
	bson.D
	// contains filtered or unexported fields
}

func DExt

func DExt(root bson.D) *DExtension

func (*DExtension) Map

func (d *DExtension) Map() bson.M

func (*DExtension) RawValue

func (d *DExtension) RawValue() bson.D

func (*DExtension) SetKey

func (d *DExtension) SetKey(key string, val interface{})

func (*DExtension) Value

func (d *DExtension) Value(isHomo, preventJSONRepack bool) DValue

type DValue

type DValue struct {
	bson.D
	// contains filtered or unexported fields
}

DValue struct is used as document in change items in order to:

  1. Provide jsonSerializable interface in our type system
  2. Bring back legacy behaviour for typesystem < 7

func MakeDValue

func MakeDValue(val bson.D, isHomo, preventJSONRepack bool) DValue

func (DValue) MarshalJSON

func (d DValue) MarshalJSON() ([]byte, error)

func (DValue) RepackValue

func (d DValue) RepackValue() (interface{}, error)

type DocumentKey

type DocumentKey struct {
	ID interface{} `bson:"_id"`
}

type FullDocumentExtractor

type FullDocumentExtractor func(ctx context.Context, ns Namespace, keyList bson.A) ([]sizedFullDocument, error)

FullDocumentExtractor to get default instance of this type use MakeDefaultFullDocumentExtractor

func MakeDefaultFullDocumentExtractor

func MakeDefaultFullDocumentExtractor(client *MongoClientWrapper) FullDocumentExtractor

MakeDefaultFullDocumentExtractor Constructs default document extractor for batcher that uses Mongo connection

type InlineCACertificatePEM

type InlineCACertificatePEM []byte

type KeyChangeEvent

type KeyChangeEvent struct {
	OperationType string              `bson:"operationType"`
	DocumentKey   DocumentKey         `bson:"documentKey"`
	Namespace     Namespace           `bson:"ns"`
	ToNamespace   Namespace           `bson:"to"`
	ClusterTime   primitive.Timestamp `bson:"clusterTime"`
}

func (*KeyChangeEvent) ToChangeEvent

func (k *KeyChangeEvent) ToChangeEvent() *ChangeEvent

type MongoClientWrapper

type MongoClientWrapper struct {
	*mongo.Client

	IsDocDB bool
	// contains filtered or unexported fields
}

func Connect

Connect function should be one and only one valid method of creation mongo client this method logs into 'lgr' about connection options

func (*MongoClientWrapper) Close

func (w *MongoClientWrapper) Close(ctx context.Context) error

type MongoCollection

type MongoCollection struct {
	DatabaseName   string
	CollectionName string
}

func GetAllExistingCollections

func GetAllExistingCollections(ctx context.Context, client *MongoClientWrapper) ([]MongoCollection, error)

func NewMongoCollection

func NewMongoCollection(planeName string) *MongoCollection

func (MongoCollection) String

func (c MongoCollection) String() string

type MongoCollectionFilter

type MongoCollectionFilter struct {
	Collections         []MongoCollection
	ExcludedCollections []MongoCollection
}

func (MongoCollectionFilter) BuildPipeline

func (f MongoCollectionFilter) BuildPipeline(forDatabaseName string) (mongo.Pipeline, error)

BuildPipeline returns mongo pipeline that should be able to filter the oplog from unwanted changes in database 'forDatabaseName'

type MongoConnectionOptions

type MongoConnectionOptions struct {
	ClusterID  string
	Hosts      []string
	Port       int
	ReplicaSet string
	AuthSource string
	User       string
	Password   string
	CACert     TrustedCACertificate
	Direct     bool
	SRVMode    bool
}

func (MongoConnectionOptions) IsDocDB

func (o MongoConnectionOptions) IsDocDB() bool

IsDocDB check if we connect to amazon doc DB

type MongoContainer

type MongoContainer struct {
	testcontainers.Container
	// contains filtered or unexported fields
}

PostgresContainer represents the postgres container type used in the module

func StartMongoContainer

func StartMongoContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*MongoContainer, error)

type MongoDestination

type MongoDestination struct {
	ClusterID         string
	Hosts             []string
	Port              int
	Database          string
	ReplicaSet        string
	AuthSource        string
	User              string
	Password          model.SecretString
	TransformerConfig map[string]string
	Cleanup           model.CleanupType
	SubNetworkID      string
	SecurityGroupIDs  []string
	TLSFile           string
	// make a `direct` connection to mongo, see: https://www.mongodb.com/docs/drivers/go/current/fundamentals/connections/connection-guide/
	Direct bool

	RootCAFiles []string
	// indicates whether the mongoDB client uses a mongodb+srv connection
	SRVMode bool
}

func RecipeTarget

func RecipeTarget(options ...RecipeOption) *MongoDestination

func (*MongoDestination) CleanupMode

func (d *MongoDestination) CleanupMode() model.CleanupType

func (*MongoDestination) ConnectionOptions

func (d *MongoDestination) ConnectionOptions(caCertPaths []string) MongoConnectionOptions

func (*MongoDestination) GetProviderType

func (d *MongoDestination) GetProviderType() abstract.ProviderType

func (*MongoDestination) HasTLS

func (d *MongoDestination) HasTLS() bool

func (MongoDestination) IsDestination

func (MongoDestination) IsDestination()

func (*MongoDestination) MDBClusterID

func (d *MongoDestination) MDBClusterID() string

func (*MongoDestination) ToStorageParams

func (d *MongoDestination) ToStorageParams() *MongoStorageParams

func (*MongoDestination) Transformer

func (d *MongoDestination) Transformer() map[string]string

func (*MongoDestination) Validate

func (d *MongoDestination) Validate() error

func (*MongoDestination) WithDefaults

func (d *MongoDestination) WithDefaults()

type MongoReplicationSource

type MongoReplicationSource string
var (
	MongoReplicationSourceUnspecified               MongoReplicationSource = ""
	MongoReplicationSourcePerDatabase               MongoReplicationSource = "PerDatabase"
	MongoReplicationSourcePerDatabaseFullDocument   MongoReplicationSource = "PerDatabase_FullDocument"
	MongoReplicationSourcePerDatabaseUpdateDocument MongoReplicationSource = "PerDatabase_UpdateDocument"
	MongoReplicationSourceOplog                     MongoReplicationSource = "Oplog"
)

func ReplicationSourceFallback

func ReplicationSourceFallback(logger log.Logger, pl MongoReplicationSource, mongoVersion *semver.Version) MongoReplicationSource

type MongoSource

type MongoSource struct {
	ClusterID              string
	Hosts                  []string
	Port                   int
	ReplicaSet             string
	AuthSource             string
	User                   string
	Password               model.SecretString
	Collections            []MongoCollection
	ExcludedCollections    []MongoCollection
	SubNetworkID           string
	SecurityGroupIDs       []string
	TechnicalDatabase      string // deprecated: should be always ""
	IsHomo                 bool
	SlotID                 string // It's synthetic entity. Always equal to transfer_id!
	SecondaryPreferredMode bool
	TLSFile                string
	ReplicationSource      MongoReplicationSource
	BatchingParams         *BatcherParameters // for now this is private params
	DesiredPartSize        uint64
	PreventJSONRepack      bool // should not be used, use cases for migration: DTSUPPORT-1596

	// FilterOplogWithRegexp is matters when ReplicationSource==MongoReplicationSourceOplog
	//
	// When it is set to false (recommended default value): no filtering of oplog will happen
	//
	// When it is set to true, oplog events are filtered with regexp build according to
	// Collections and ExcludedCollections
	// Regexp unconditionally includes collections '$cmd.*' for technical events and ” for noops
	// + Advantage of this mode: network efficiency
	// - Disadvantage of this mode: when there are no changes on listened database, oplog will be lost
	//   TODO(@kry127) Consider turning on consumer keeper in this case (separate tech db + filtering will be enough)
	FilterOplogWithRegexp bool
	// make a `direct` connection to mongo, see: https://www.mongodb.com/docs/drivers/go/current/fundamentals/connections/connection-guide/
	Direct bool

	RootCAFiles []string
	// indicates whether the mongoDB client uses a mongodb+srv connection
	SRVMode bool
}

func RecipeSource

func RecipeSource(options ...RecipeOption) *MongoSource

func (*MongoSource) AllIncludes

func (s *MongoSource) AllIncludes() []string

func (*MongoSource) ConnectionOptions

func (s *MongoSource) ConnectionOptions(defaultCACertPaths []string) MongoConnectionOptions

func (*MongoSource) FulfilledIncludes

func (s *MongoSource) FulfilledIncludes(tID abstract.TableID) (result []string)

func (*MongoSource) GetMongoCollectionFilter

func (s *MongoSource) GetMongoCollectionFilter() MongoCollectionFilter

func (*MongoSource) GetProviderType

func (s *MongoSource) GetProviderType() abstract.ProviderType

func (*MongoSource) HasTLS

func (s *MongoSource) HasTLS() bool

func (*MongoSource) Include

func (s *MongoSource) Include(tID abstract.TableID) bool

func (MongoSource) IsSource

func (MongoSource) IsSource()

func (MongoSource) IsStrictSource

func (MongoSource) IsStrictSource()

func (*MongoSource) MDBClusterID

func (s *MongoSource) MDBClusterID() string

func (*MongoSource) ToStorageParams

func (s *MongoSource) ToStorageParams() *MongoStorageParams

func (*MongoSource) Validate

func (s *MongoSource) Validate() error

func (*MongoSource) WithDefaults

func (s *MongoSource) WithDefaults()

type MongoStorageParams

type MongoStorageParams struct {
	TLSFile           string
	ClusterID         string
	Hosts             []string
	Port              int
	ReplicaSet        string
	AuthSource        string
	User              string
	Password          string
	Collections       []MongoCollection
	DesiredPartSize   uint64
	PreventJSONRepack bool
	Direct            bool
	RootCAFiles       []string
	SRVMode           bool
}

func (*MongoStorageParams) ConnectionOptions

func (s *MongoStorageParams) ConnectionOptions(defaultCACertPaths []string) MongoConnectionOptions

type Namespace

type Namespace struct {
	Database   string `bson:"db"`
	Collection string `bson:"coll"`
}

func MakeNamespace

func MakeNamespace(database, collection string) Namespace

func ParseNamespace

func ParseNamespace(rawNamespace string) *Namespace

func (*Namespace) GetFullName

func (namespace *Namespace) GetFullName() string

type ParallelizationUnit

type ParallelizationUnit interface {
	fmt.Stringer
	Ping(ctx context.Context, client *MongoClientWrapper) error
	GetClusterTime(ctx context.Context, client *MongoClientWrapper) (*primitive.Timestamp, error)
	SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error
}

type ParallelizationUnitDatabase

type ParallelizationUnitDatabase struct {

	// SlotID -- identifier of resource associated with replication (e.g. transfer ID)
	SlotID string
	// UnitDatabase --  what database is a replication unit of parallelizm
	UnitDatabase string
	// contains filtered or unexported fields
}

func MakeParallelizationUnitDatabase

func MakeParallelizationUnitDatabase(technicalDatabase, slotID, dbName string) ParallelizationUnitDatabase

func (ParallelizationUnitDatabase) GetClusterTime

func (ParallelizationUnitDatabase) Ping

func (ParallelizationUnitDatabase) SaveClusterTime

func (p ParallelizationUnitDatabase) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error

func (ParallelizationUnitDatabase) String

type ParallelizationUnitOplog

type ParallelizationUnitOplog struct {

	// SlotID -- identifier of resource associated with replication (e.g. transfer ID)
	SlotID string
	// contains filtered or unexported fields
}

func MakeParallelizationUnitOplog

func MakeParallelizationUnitOplog(technicalDatabase, slotID string) ParallelizationUnitOplog

func (ParallelizationUnitOplog) GetClusterTime

func (ParallelizationUnitOplog) Ping

func (ParallelizationUnitOplog) SaveClusterTime

func (p ParallelizationUnitOplog) SaveClusterTime(ctx context.Context, client *MongoClientWrapper, timestamp *primitive.Timestamp) error

func (ParallelizationUnitOplog) String

func (p ParallelizationUnitOplog) String() string

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error

func (*Provider) DestinationSampleableStorage

func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)

func (*Provider) Sink

func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) SourceSampleableStorage

func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type RecipeOption

type RecipeOption func(opts *recipeOpts)

func WithCollections

func WithCollections(collections ...MongoCollection) RecipeOption

func WithPrefix

func WithPrefix(prefix string) RecipeOption

type SchemaDescription

type SchemaDescription struct {
	Columns      *abstract.TableSchema
	ColumnsNames []string
	Indexes      map[string]int
}

type ShardKeysInfo

type ShardKeysInfo struct {
	ID           string              `bson:"_id"`
	Lastmod      primitive.DateTime  `bson:"lastmod"`
	Timestamp    primitive.Timestamp `bson:"timestamp"`
	Key          bson.D              `bson:"key"`
	Unique       bool                `bson:"unique"`
	LastmodEpoch primitive.ObjectID  `bson:"lastmodEpoch"`
	// UUID issue: https://stackoverflow.com/questions/64723089/how-to-store-a-uuid-in-mongodb-with-golang
	// UUID         interface{}          `bson:"uuid"`
	KeyFields []string
	// contains filtered or unexported fields
}

func GetShardingKey

func GetShardingKey(ctx context.Context, client *MongoClientWrapper, ns Namespace) (*ShardKeysInfo, error)

func (*ShardKeysInfo) ContainsID

func (s *ShardKeysInfo) ContainsID() bool

func (*ShardKeysInfo) Fields

func (s *ShardKeysInfo) Fields() []string

func (*ShardKeysInfo) GetNamespace

func (s *ShardKeysInfo) GetNamespace() *Namespace

func (*ShardKeysInfo) IsTrivial

func (s *ShardKeysInfo) IsTrivial() bool

type ShardingFilter

type ShardingFilter bson.D

func UnmarshalFilter

func UnmarshalFilter(marshalledFilter string) (ShardingFilter, error)

type Storage

type Storage struct {
	Client *MongoClientWrapper

	IsHomo bool
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(config *MongoStorageParams, opts ...StorageOpt) (*Storage, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) LoadRandomSample

func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadSampleBySet

func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, pusher abstract.Pusher) error

func (*Storage) LoadSchema

func (s *Storage) LoadSchema() (dbSchema abstract.DBSchema, err error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTopBottomSample

func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (Storage) ShardTable

func (*Storage) TableAccessible

func (s *Storage) TableAccessible(table abstract.TableDescription) bool

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

func (*Storage) TableSizeInBytes

func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)

type StorageOpt

type StorageOpt func(storage *Storage) *Storage

func WithMetrics

func WithMetrics(registry metrics.Registry) StorageOpt

type TimeCollectionScheme

type TimeCollectionScheme struct {
	Time       primitive.Timestamp `bson:"cluster_time"`
	WorkerTime time.Time           `bson:"worker_time"`
}

type TruncatedArray

type TruncatedArray struct {
	Field   string `bson:"field"`
	NewSize int    `bson:"newSize"`
}

type TrustedCACertificate

type TrustedCACertificate interface {
	// contains filtered or unexported methods
}

type UpdateDescription

type UpdateDescription struct {
	UpdatedFields   bson.D           `bson:"updatedFields"`
	RemovedFields   []string         `bson:"removedFields"`
	TruncatedArrays []TruncatedArray `bson:"truncatedArrays"`
}

type UpdateDocumentChangeItem

type UpdateDocumentChangeItem struct {
	// contains filtered or unexported fields
}

func NewUpdateDocumentChangeItem

func NewUpdateDocumentChangeItem(item *abstract.ChangeItem) (*UpdateDocumentChangeItem, error)

func (*UpdateDocumentChangeItem) CheckDiffByKeys

func (u *UpdateDocumentChangeItem) CheckDiffByKeys(checkKeys []string) map[string]any

func (*UpdateDocumentChangeItem) FullDocument

func (u *UpdateDocumentChangeItem) FullDocument() bson.D

func (*UpdateDocumentChangeItem) HasTruncatedArrays

func (u *UpdateDocumentChangeItem) HasTruncatedArrays() bool

func (*UpdateDocumentChangeItem) IsApplicablePatch

func (u *UpdateDocumentChangeItem) IsApplicablePatch() bool

func (*UpdateDocumentChangeItem) RemovedFields

func (u *UpdateDocumentChangeItem) RemovedFields() []string

func (*UpdateDocumentChangeItem) TruncatedArrays

func (u *UpdateDocumentChangeItem) TruncatedArrays() []TruncatedArray

func (*UpdateDocumentChangeItem) UpdatedFields

func (u *UpdateDocumentChangeItem) UpdatedFields() bson.D

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL