docsyncer

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024
)

Variables

View Source
var (
	GlobalCollExecutorId int32 = -1
	GlobalDocExecutorId  int32 = -1
)

Functions

func Checkpoint

func Checkpoint(ckptMap map[string]utils.TimestampNode) error

func GenerateCollExecutorId

func GenerateCollExecutorId() int

func GenerateDocExecutorId

func GenerateDocExecutorId() int

func IsShardingToSharding

func IsShardingToSharding(fromIsSharding bool, toConn *utils.MongoCommunityConn) bool

func StartDropDestCollection

func StartDropDestCollection(nsSet map[utils.NS]struct{}, toConn *utils.MongoCommunityConn,
	nsTrans *transform.NamespaceTransform) error

func StartIndexSync

func StartIndexSync(indexMap map[utils.NS][]bson.D, toUrl string,
	nsTrans *transform.NamespaceTransform, background bool) (syncError error)

func StartNamespaceSpecSyncForSharding

func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoCommunityConn,
	nsTrans *transform.NamespaceTransform) error

Types

type CollectionExecutor

type CollectionExecutor struct {
	// contains filtered or unexported fields
}

func NewCollectionExecutor

func NewCollectionExecutor(id int, mongoUrl string, ns utils.NS, syncer *DBSyncer, sslRootFile string) *CollectionExecutor

func (*CollectionExecutor) Start

func (colExecutor *CollectionExecutor) Start() error

func (*CollectionExecutor) Sync

func (colExecutor *CollectionExecutor) Sync(docs []*bson.Raw)

func (*CollectionExecutor) Wait

func (colExecutor *CollectionExecutor) Wait() error

type CollectionMetric

type CollectionMetric struct {
	CollectionStatus Status
	TotalCount       uint64
	FinishCount      uint64
}

func NewCollectionMetric

func NewCollectionMetric() *CollectionMetric

func (*CollectionMetric) String

func (cm *CollectionMetric) String() string

type DBSyncer

type DBSyncer struct {

	// source mongodb url
	FromMongoUrl string

	// destination mongodb url
	ToMongoUrl string

	// source is sharding?
	FromIsSharding bool
	// contains filtered or unexported fields
}

********************************************************************** 1 shard -> 1 DBSyncer

func NewDBSyncer

func NewDBSyncer(
	id int,
	fromMongoUrl string,
	fromReplset string,
	toMongoUrl string,
	nsTrans *transform.NamespaceTransform,
	orphanFilter *filter.OrphanFilter,
	qos *utils.Qos,
	fromIsSharding bool) *DBSyncer

func (*DBSyncer) Close

func (syncer *DBSyncer) Close()

func (*DBSyncer) Init

func (syncer *DBSyncer) Init()

func (*DBSyncer) RestAPI

func (syncer *DBSyncer) RestAPI()

********************************************************************** restful api

func (*DBSyncer) Start

func (syncer *DBSyncer) Start() (syncError error)

func (*DBSyncer) String

func (syncer *DBSyncer) String() string

type DocExecutor

type DocExecutor struct {
	// contains filtered or unexported fields
}

func NewDocExecutor

func NewDocExecutor(id int, colExecutor *CollectionExecutor, conn *utils.MongoCommunityConn,
	syncer *DBSyncer) *DocExecutor

func (*DocExecutor) String

func (exec *DocExecutor) String() string

type DocumentReader

type DocumentReader struct {
	// contains filtered or unexported fields
}

*********************************************** DocumentReader: the reader of single piece

func NewDocumentReader

func NewDocumentReader(id int, src string, ns utils.NS, key string, start, end interface{}, sslRootCaFile string) *DocumentReader

NewDocumentReader creates reader with mongodb url

func (*DocumentReader) Close

func (reader *DocumentReader) Close()

func (*DocumentReader) NextDoc

func (reader *DocumentReader) NextDoc() (doc bson.Raw, err error)

NextDoc returns an document by raw bytes which is []byte reader.docCursor.Current is valid only before next docCursor.Next(), So must be copy

func (*DocumentReader) String

func (reader *DocumentReader) String() string

type DocumentSplitter

type DocumentSplitter struct {
	// contains filtered or unexported fields
}

*********************************************** splitter: pre-split the collection into several pieces

func NewDocumentSplitter

func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitter

func (*DocumentSplitter) Close

func (ds *DocumentSplitter) Close()

func (*DocumentSplitter) Run

func (ds *DocumentSplitter) Run() error

TODO, need add retry

func (*DocumentSplitter) String

func (ds *DocumentSplitter) String() string

type Status

type Status string
const (
	StatusWaitStart  Status = "wait start"
	StatusProcessing Status = "in processing"
	StatusFinish     Status = "finish"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL