full

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024
)

Variables

This section is empty.

Functions

func StartFullReplication

func StartFullReplication(ctx context.Context, checkpointManager checkpoint.CheckpointManager,
	dbAndCollections map[string][]string)

Types

type DocumentReader

type DocumentReader struct {
	// The database name
	Database string
	// The name of the collection
	Collection string
	// The batch size
	Batch int
	// The source database
	Source *mong.Mong
	// The document writer
	Writer *DocumentWriter
	// Progression state
	Progress *SyncProgress
}

func NewDocumentReader

func NewDocumentReader(database string, collection string, source *mong.Mong, batch int, writer *DocumentWriter) *DocumentReader

func (*DocumentReader) ReportResult

func (r *DocumentReader) ReportResult(result WriteResult)

Report the result of the write operation

func (*DocumentReader) SetProgress

func (r *DocumentReader) SetProgress(progress *SyncProgress)

Set the total count of documents to sync

func (*DocumentReader) StartSync

func (r *DocumentReader) StartSync(ctx context.Context) error

Read a batch of documents from the source

type DocumentWriter

type DocumentWriter struct {
	// The database name
	Database string
	// The name of the collection
	Collection string
	// The source database
	Target *mong.Mong
	// Progression state
	Progress *SyncProgress
}

func NewDocumentWriter

func NewDocumentWriter(database string, collection string, target *mong.Mong) *DocumentWriter

func (*DocumentWriter) SetProgress

func (r *DocumentWriter) SetProgress(progress *SyncProgress)

Set the total count of documents to sync

func (*DocumentWriter) WriteDocuments

func (r *DocumentWriter) WriteDocuments(docs []*bson.Raw) (WriteResult, error)

Sync the documents to the target

type QpsLimit

type QpsLimit struct {
	Start  time.Time
	MaxQps uint
	// contains filtered or unexported fields
}

func NewQpsLimit

func NewQpsLimit(maxQps uint) *QpsLimit

func (*QpsLimit) Incr

func (q *QpsLimit) Incr(incr int)

func (*QpsLimit) Reset

func (q *QpsLimit) Reset()

func (*QpsLimit) Wait

func (q *QpsLimit) Wait()

type SyncProgress

type SyncProgress struct {
	Database   string
	Collection string
	// contains filtered or unexported fields
}

func NewSyncProgress

func NewSyncProgress(database string, collection string) *SyncProgress

func (*SyncProgress) Increment

func (f *SyncProgress) Increment(incr int)

func (*SyncProgress) Progress

func (f *SyncProgress) Progress() float64

func (*SyncProgress) SetTotal

func (f *SyncProgress) SetTotal(total uint64)

type WriteResult

type WriteResult struct {
	InsertedCount           int
	UpdatedCount            int
	SkippedOnDuplicateCount int
	ErrorCount              int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL