collector

package
v0.0.0-...-04f94e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2022 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueryTs    = "ts"
	QueryGid   = "g"
	QueryOpGT  = "$gt"
	QueryOpGTE = "$gte"
)
View Source
const (
	CollectionCapped           = "CollectionScan died due to position in capped" // bigger than 3.0
	CollectionCappedLowVersion = "UnknownError"                                  // <= 3.0 version
)
View Source
const (
	SYNCMODE_ALL      = "all"
	SYNCMODE_DOCUMENT = "document"
	SYNCMODE_OPLOG    = "oplog"
)
View Source
const (

	// bson deserialize workload is CPU-intensive task
	PipelineQueueMaxNr = 4
	PipelineQueueMinNr = 1
	PipelineQueueLen   = 64

	DurationTime          = 6000 //ms
	DDLCheckpointInterval = 300  // ms
)
View Source
const MaxUnAckListLength = 128 * 256

Variables

View Source
var CollectionCappedError = errors.New("collection capped error")
View Source
var TimeoutError = errors.New("read next log timeout, It shouldn't be happen")

TimeoutError. mongodb query executed timeout

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.

func NewBatcher

func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain,
	handler OplogHandler, workerGroup []*Worker) *Batcher

type GidOplogReader

type GidOplogReader struct {
	OplogReader
}

GidOplogReader. query along with gid

func NewGidOplogReader

func NewGidOplogReader(src string) *GidOplogReader

func (*GidOplogReader) SetQueryGid

func (reader *GidOplogReader) SetQueryGid(gid string)

type Module

type Module interface {
	IsRegistered() bool

	/**
	 * Module install and initialize. return false on failed
	 * and only invocation on WriteController is preparing
	 */
	Install() bool

	/**
	 * Handle outstanding request message. and messages
	 * are passed one by one. Any changes of message in
	 * Handle() will be preserved and delivery to next
	 *
	 * @return tunnel's error code (<0) or ack value
	 *
	 */
	Handle(message *tunnel.WMessage) int64
}

type OplogHandler

type OplogHandler interface {
	// invocation on every oplog consumed
	Handle(log *oplog.PartialLog)
}

type OplogReader

type OplogReader struct {
	// contains filtered or unexported fields
}

OplogReader represents stream reader from mongodb that specified by an url. And with query options. user can iterate oplogs.

func NewOplogReader

func NewOplogReader(src string) *OplogReader

NewOplogReader creates reader with mongodb url

func (*OplogReader) Next

func (reader *OplogReader) Next() (*bson.Raw, error)

Next returns an oplog by raw bytes which is []byte

func (*OplogReader) NextOplog

func (reader *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)

NextOplog returns an oplog by oplog.GenericOplog struct

func (*OplogReader) SetQueryTimestampOnEmpty

func (reader *OplogReader) SetQueryTimestampOnEmpty(ts bson.MongoTimestamp)

SetQueryTimestampOnEmpty set internal timestamp if not exist in this reader. initial stage most of the time

func (*OplogReader) StartFetcher

func (reader *OplogReader) StartFetcher()

start fetcher if not exist

func (*OplogReader) UpdateQueryTimestamp

func (reader *OplogReader) UpdateQueryTimestamp(ts bson.MongoTimestamp)

type OplogSyncer

type OplogSyncer struct {
	OplogHandler
	// contains filtered or unexported fields
}

OplogSyncer poll oplogs from original source MongoDB.

func NewOplogSyncer

func NewOplogSyncer(
	coordinator *ReplicationCoordinator,
	replset string,
	startPosition int64,
	fullSyncFinishPosition int64,
	mongoUrl string,
	gid string) *OplogSyncer

* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.

func (*OplogSyncer) Handle

func (sync *OplogSyncer) Handle(log *oplog.PartialLog)

func (*OplogSyncer) RestAPI

func (sync *OplogSyncer) RestAPI()

type ReplicationCoordinator

type ReplicationCoordinator struct {
	Sources []*utils.MongoSource
	// contains filtered or unexported fields
}

ReplicationCoordinator global coordinator instance. consist of one syncerGroup and a number of workers

func (*ReplicationCoordinator) Run

func (coordinator *ReplicationCoordinator) Run() error

type TransferEventListener

type TransferEventListener struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(coordinator *ReplicationCoordinator, syncer *OplogSyncer, id uint32) *Worker

func (*Worker) AllAcked

func (worker *Worker) AllAcked(allAcked bool)

func (*Worker) IsAllAcked

func (worker *Worker) IsAllAcked() bool

func (*Worker) Offer

func (worker *Worker) Offer(batch []*oplog.GenericOplog)

func (*Worker) RestAPI

func (worker *Worker) RestAPI()

type WriteController

type WriteController struct {

	// current max lsn_ack value
	LatestLsnAck int64
	// contains filtered or unexported fields
}

func NewWriteController

func NewWriteController(worker *Worker) *WriteController

func (*WriteController) Send

func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL