snapshot

package
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024
)

Variables

This section is empty.

Functions

func SynchronizeCollection

func SynchronizeCollection(ctx context.Context, batchSize int, sourceReader ItemReader, targetReader ItemReader,
	synchronizer Synchronizer, database string, collection string) error

Types

type DeltaReplication

type DeltaReplication struct {
}

type DocumentReader

type DocumentReader struct {
	// The database name
	Database string
	// The name of the collection
	Collection string
	// The batch size
	Batch int
	// The source database
	Source *mdb.MDB
	// The document writer
	Writer *DocumentWriter
	// Progression state
	Progress *SyncProgress
}

func NewDocumentReader

func NewDocumentReader(database string, collection string, source *mdb.MDB, batch int, writer *DocumentWriter) *DocumentReader

func (*DocumentReader) Replicate

func (r *DocumentReader) Replicate(ctx context.Context) error

Read a batch of documents from the source

func (*DocumentReader) ReportResult

func (r *DocumentReader) ReportResult(result WriteResult)

Report the result of the write operation

func (*DocumentReader) SetProgress

func (r *DocumentReader) SetProgress(progress *SyncProgress)

Set the total count of documents to sync

type DocumentWriter

type DocumentWriter struct {
	// The database name
	Database string
	// The name of the collection
	Collection string
	// The source database
	Target *mdb.MDB
	// Progression state
	Progress *SyncProgress
}

func NewDocumentWriter

func NewDocumentWriter(database string, collection string, target *mdb.MDB) *DocumentWriter

func (*DocumentWriter) SetProgress

func (r *DocumentWriter) SetProgress(progress *SyncProgress)

Set the total count of documents to sync

func (*DocumentWriter) WriteDocuments

func (r *DocumentWriter) WriteDocuments(docs []*bson.Raw) (WriteResult, error)

Sync the documents to the target

type ItemReader

type ItemReader interface {
	ReadItems(ctx context.Context, batchSize int, startId primitive.ObjectID) ([]*bson.D, error)
}

type MongoItemReader

type MongoItemReader struct {
	Database   string
	Collection string
	Source     *mdb.MDB
}

func NewMongoItemReader

func NewMongoItemReader(source *mdb.MDB, database string, collection string) *MongoItemReader

func (*MongoItemReader) ReadItems

func (r *MongoItemReader) ReadItems(ctx context.Context, batchSize int,
	first primitive.ObjectID) ([]*bson.D, error)

Reads a batch of items from the database starting with the next ID after the `first` and sorted ascendingly by ID

type MongoSynchronizer

type MongoSynchronizer struct {
	Target     *mdb.MDB
	Database   string
	Collection string
}

func NewMongoSynchronizer

func NewMongoSynchronizer(target *mdb.MDB, database string, collection string) *MongoSynchronizer

func (*MongoSynchronizer) Delete

func (*MongoSynchronizer) Insert

func (s *MongoSynchronizer) Insert(ctx context.Context, item *primitive.D) error

func (*MongoSynchronizer) Update

func (s *MongoSynchronizer) Update(ctx context.Context, source *primitive.D, target *primitive.D) error

type QpsLimit

type QpsLimit struct {
	Start  time.Time
	MaxQps uint
	// contains filtered or unexported fields
}

func NewQpsLimit

func NewQpsLimit(maxQps uint) *QpsLimit

func (*QpsLimit) Incr

func (q *QpsLimit) Incr(incr int)

func (*QpsLimit) Reset

func (q *QpsLimit) Reset()

func (*QpsLimit) Wait

func (q *QpsLimit) Wait()

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot

func NewSnapshot(ckpt checkpoint.CheckpointManager) *Snapshot

func (*Snapshot) ReplicateIndexes

func (s *Snapshot) ReplicateIndexes(ctx context.Context, database string, collection string) error

Replicates the indexes from the source to the target

func (*Snapshot) RunSnapshot

func (s *Snapshot) RunSnapshot(ctx context.Context, database string, collection string) error

func (*Snapshot) RunSnapshots

func (s *Snapshot) RunSnapshots(ctx context.Context, dbAndCollections map[string][]string)

type SyncProgress

type SyncProgress struct {
	Database   string
	Collection string
	// contains filtered or unexported fields
}

func NewSyncProgress

func NewSyncProgress(database string, collection string) *SyncProgress

func (*SyncProgress) Increment

func (f *SyncProgress) Increment(incr int)

func (*SyncProgress) Progress

func (f *SyncProgress) Progress() float64

func (*SyncProgress) SetTotal

func (f *SyncProgress) SetTotal(total uint64)

type Synchronizer

type Synchronizer interface {
	Insert(ctx context.Context, item *primitive.D) error
	Update(ctx context.Context, source *primitive.D, target *primitive.D) error
	Delete(ctx context.Context, id primitive.ObjectID) error
}

type WriteResult

type WriteResult struct {
	InsertedCount           int
	UpdatedCount            int
	SkippedOnDuplicateCount int
	ErrorCount              int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL