Versions in this module Expand all Collapse all v0 v0.0.14 Jan 31, 2025 Changes in this version + const MAX_BUFFER_BYTE_SIZE + func SynchronizeCollection(ctx context.Context, batchSize int, sourceReader ItemReader, ...) error + type DeltaReplication struct + type DocumentReader struct + Batch int + Collection string + Database string + Progress *SyncProgress + Source *mdb.MDB + Writer *DocumentWriter + func NewDocumentReader(database string, collection string, source *mdb.MDB, batch int, ...) *DocumentReader + func (r *DocumentReader) Replicate(ctx context.Context) error + func (r *DocumentReader) ReportResult(result WriteResult) + func (r *DocumentReader) SetProgress(progress *SyncProgress) + type DocumentWriter struct + Collection string + Database string + Progress *SyncProgress + Target *mdb.MDB + func NewDocumentWriter(database string, collection string, target *mdb.MDB) *DocumentWriter + func (r *DocumentWriter) SetProgress(progress *SyncProgress) + func (r *DocumentWriter) WriteDocuments(docs []*bson.Raw) (WriteResult, error) + type ItemReader interface + ReadItems func(ctx context.Context, batchSize int, startId primitive.ObjectID) ([]*bson.D, error) + type MongoItemReader struct + Collection string + Database string + Source *mdb.MDB + func NewMongoItemReader(source *mdb.MDB, database string, collection string) *MongoItemReader + func (r *MongoItemReader) ReadItems(ctx context.Context, batchSize int, first primitive.ObjectID) ([]*bson.D, error) + type MongoSynchronizer struct + Collection string + Database string + Target *mdb.MDB + func NewMongoSynchronizer(target *mdb.MDB, database string, collection string) *MongoSynchronizer + func (s *MongoSynchronizer) Delete(ctx context.Context, id primitive.ObjectID) error + func (s *MongoSynchronizer) Insert(ctx context.Context, item *primitive.D) error + func (s *MongoSynchronizer) Update(ctx context.Context, source *primitive.D, target *primitive.D) error + type QpsLimit struct + MaxQps uint + Start time.Time + func NewQpsLimit(maxQps uint) *QpsLimit + func (q *QpsLimit) Incr(incr int) + func (q *QpsLimit) Reset() + func (q *QpsLimit) Wait() + type Snapshot struct + func NewSnapshot(ckpt checkpoint.CheckpointManager) *Snapshot + func (s *Snapshot) ReplicateIndexes(ctx context.Context, database string, collection string) error + func (s *Snapshot) RunSnapshot(ctx context.Context, database string, collection string) error + func (s *Snapshot) RunSnapshots(ctx context.Context, dbAndCollections map[string][]string) + type SyncProgress struct + Collection string + Database string + func NewSyncProgress(database string, collection string) *SyncProgress + func (f *SyncProgress) Increment(incr int) + func (f *SyncProgress) Progress() float64 + func (f *SyncProgress) SetTotal(total uint64) + type Synchronizer interface + Delete func(ctx context.Context, id primitive.ObjectID) error + Insert func(ctx context.Context, item *primitive.D) error + Update func(ctx context.Context, source *primitive.D, target *primitive.D) error + type WriteResult struct + ErrorCount int + InsertedCount int + SkippedOnDuplicateCount int + UpdatedCount int