collector

package
v0.0.0-...-10178d1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StorageTypeAPI   = "api"
	StorageTypeDB    = "database"
	CheckpointName   = "name"
	CheckpointAckTs  = "ackTs"
	CheckpointSyncTs = "syncTs"

	CheckpointMoveChunkIntervalMS = 5000
)
View Source
const (
	CheckpointKeyNs     = "ns"
	CheckpointKeyObject = "obj"
	CheckpointBlocklog  = "blockLog"
	CheckpointDBMap     = "dbMap"

	DDLCheckInterval       = 1  // s
	DDLUnResponseThreshold = 30 // s
)
View Source
const (
	MoveChunkBarrierKey          = "barrierKey"
	MoveChunkKeyName             = "key"
	MoveChunkInsertMap           = "insertMap"
	MoveChunkDeleteItem          = "deleteItem"
	MoveChunkBufferSize          = 1000
	MoveChunkUnResponseThreshold = 30 // s
)
View Source
const (
	SYNCMODE_ALL      = "all"
	SYNCMODE_DOCUMENT = "document"
	SYNCMODE_OPLOG    = "oplog"
)
View Source
const (

	// bson deserialize workload is CPU-intensive task
	PipelineQueueMaxNr = 4
	PipelineQueueMinNr = 1
	PipelineQueueLen   = 64

	WaitBarrierAckLogTimes = 10

	DurationTime        = 6000 // unit: ms.
	DDLCheckpointGap    = 5    // unit: seconds.
	FilterCheckpointGap = 180  // unit: seconds. no checkpoint update, flush checkpoint mandatory

	ShardingWorkerId = 0
)
View Source
const (
	MaxUnAckListLength    = 128 * 256
	DDLCheckpointInterval = 300 // unit: ms
)
View Source
const (
	WaitAckIntervalMS = 1000
)

Variables

This section is empty.

Functions

func DDLSupportForSharding

func DDLSupportForSharding() bool

func TransformDDL

func TransformDDL(replset string, log *oplog.PartialLog, shardColSpec *utils.ShardCollectionSpec, toIsSharding bool) []*oplog.PartialLog

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.

func NewBatcher

func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain,
	handler OplogHandler, workerGroup []*Worker) *Batcher

func (*Batcher) Next

func (batcher *Batcher) Next() []*oplog.GenericOplog

* return batched oplogs and barrier flag * return the last oplog, if the current batch is empty(first oplog in this batch is ddl), * just return the last oplog in the previous batch. * if just start, this is nil.

func (*Batcher) WaitAllAck

func (batcher *Batcher) WaitAllAck()

type CheckpointManager

type CheckpointManager struct {
	FlushChan chan bool
	// contains filtered or unexported fields
}

func NewCheckpointManager

func NewCheckpointManager(startPosition int64) *CheckpointManager

func (*CheckpointManager) Flush

func (manager *CheckpointManager) Flush() error

func (*CheckpointManager) Get

func (manager *CheckpointManager) Get(replset string) bson.MongoTimestamp

func (*CheckpointManager) Load

func (manager *CheckpointManager) Load() error

firstly load checkpoit info to CheckpointManager without concurrent access

type DDLKey

type DDLKey struct {
	Namespace string
	ObjectStr string
}

type DDLManager

type DDLManager struct {
	FromCsConn   *utils.MongoConn // share config server url
	ToIsSharding bool
	// contains filtered or unexported fields
}

func NewDDLManager

func NewDDLManager(ckptManager *CheckpointManager) *DDLManager

func (*DDLManager) BlockDDL

func (manager *DDLManager) BlockDDL(replset string, log *oplog.PartialLog) bool

func (*DDLManager) Flush

func (manager *DDLManager) Flush() error

func (*DDLManager) Load

func (manager *DDLManager) Load() error

func (*DDLManager) UnBlockDDL

func (manager *DDLManager) UnBlockDDL(replset string, log *oplog.PartialLog)

type DDLValue

type DDLValue struct {
	// contains filtered or unexported fields
}

type MCIItem

type MCIItem struct {
	Replset   string              `bson:"replset"`
	Timestamp bson.MongoTimestamp `bson:"deleteTs"`
}

type Module

type Module interface {
	IsRegistered() bool

	/**
	 * Module install and initialize. return false on failed
	 * and only invocation on WriteController is preparing
	 */
	Install() bool

	/**
	 * Handle outstanding request message. and messages
	 * are passed one by one. Any changes of message in
	 * Handle() will be preserved and delivery to next
	 *
	 * @return tunnel's error code (<0) or ack value
	 *
	 */
	Handle(message *tunnel.WMessage) int64
}

type MoveChunkKey

type MoveChunkKey struct {
	Id        interface{} `bson:"docId"`
	Namespace string      `bson:"namespace"`
}

type MoveChunkManager

type MoveChunkManager struct {
	// contains filtered or unexported fields
}

func NewMoveChunkManager

func NewMoveChunkManager(ckptManager *CheckpointManager) *MoveChunkManager

func (*MoveChunkManager) BarrierOplog

func (manager *MoveChunkManager) BarrierOplog(replset string, partialLog *oplog.PartialLog) (bool, bool, interface{})

func (*MoveChunkManager) Flush

func (manager *MoveChunkManager) Flush() error

func (*MoveChunkManager) Load

func (manager *MoveChunkManager) Load() error

type MoveChunkValue

type MoveChunkValue struct {
	// contains filtered or unexported fields
}

type OplogHandler

type OplogHandler interface {
	// invocation on every oplog consumed
	Handle(log *oplog.PartialLog)
}

type OplogSyncer

type OplogSyncer struct {
	OplogHandler
	// contains filtered or unexported fields
}

OplogSyncer poll oplogs from original source MongoDB.

func NewOplogSyncer

func NewOplogSyncer(
	coordinator *ReplicationCoordinator,
	replset string,
	fullSyncFinishPosition int64,
	mongoUrl string,
	gids []string,
	ckptManager *CheckpointManager,
	mvckManager *MoveChunkManager,
	ddlManager *DDLManager) *OplogSyncer

* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.

func (*OplogSyncer) Handle

func (sync *OplogSyncer) Handle(log *oplog.PartialLog)

func (*OplogSyncer) RestAPI

func (sync *OplogSyncer) RestAPI()

type Persist

type Persist interface {
	Load() error
	Flush() error
}

type ReplicationCoordinator

type ReplicationCoordinator struct {
	Sources []*utils.MongoSource
	// contains filtered or unexported fields
}

ReplicationCoordinator global coordinator instance. consist of one syncerGroup and a number of workers

func (*ReplicationCoordinator) Run

func (coordinator *ReplicationCoordinator) Run() error

type SyncerMoveChunk

type SyncerMoveChunk struct {
	// contains filtered or unexported fields
}

type TransferEventListener

type TransferEventListener struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(coordinator *ReplicationCoordinator, syncer *OplogSyncer, id uint32) *Worker

func (*Worker) AllAcked

func (worker *Worker) AllAcked(allAcked bool)

func (*Worker) IsAllAcked

func (worker *Worker) IsAllAcked() bool

func (*Worker) Offer

func (worker *Worker) Offer(batch []*oplog.GenericOplog)

func (*Worker) RestAPI

func (worker *Worker) RestAPI()

type WriteController

type WriteController struct {

	// current max lsn_ack value
	LatestLsnAck int64
	// contains filtered or unexported fields
}

func NewWriteController

func NewWriteController(worker *Worker) *WriteController

func (*WriteController) Send

func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL