Documentation ¶
Index ¶
- Constants
- Variables
- type Batcher
- type GidOplogReader
- type Module
- type OplogHandler
- type OplogReader
- func (reader *OplogReader) Next() (*bson.Raw, error)
- func (reader *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)
- func (reader *OplogReader) SetQueryTimestampOnEmpty(ts bson.MongoTimestamp)
- func (reader *OplogReader) StartFetcher()
- func (reader *OplogReader) UpdateQueryTimestamp(ts bson.MongoTimestamp)
- type OplogSyncer
- type ReplicationCoordinator
- type TransferEventListener
- type Worker
- type WriteController
Constants ¶
const ( QueryTs = "ts" QueryGid = "g" QueryOpGT = "$gt" QueryOpGTE = "$gte" )
const ( CollectionCapped = "CollectionScan died due to position in capped" // bigger than 3.0 CollectionCappedLowVersion = "UnknownError" // <= 3.0 version )
const ( SYNCMODE_ALL = "all" SYNCMODE_DOCUMENT = "document" SYNCMODE_OPLOG = "oplog" )
const ( // bson deserialize workload is CPU-intensive task PipelineQueueMaxNr = 4 PipelineQueueMinNr = 1 PipelineQueueLen = 64 DurationTime = 6000 //ms DDLCheckpointInterval = 300 // ms )
const MaxUnAckListLength = 128 * 256
Variables ¶
var CollectionCappedError = errors.New("collection capped error")
var TimeoutError = errors.New("read next log timeout, It shouldn't be happen")
TimeoutError. mongodb query executed timeout
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.
func NewBatcher ¶
func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain, handler OplogHandler, workerGroup []*Worker) *Batcher
type GidOplogReader ¶
type GidOplogReader struct {
OplogReader
}
GidOplogReader. query along with gid
func NewGidOplogReader ¶
func NewGidOplogReader(src string) *GidOplogReader
func (*GidOplogReader) SetQueryGid ¶
func (reader *GidOplogReader) SetQueryGid(gid string)
type Module ¶
type Module interface { IsRegistered() bool /** * Module install and initialize. return false on failed * and only invocation on WriteController is preparing */ Install() bool /** * Handle outstanding request message. and messages * are passed one by one. Any changes of message in * Handle() will be preserved and delivery to next * * @return tunnel's error code (<0) or ack value * */ Handle(message *tunnel.WMessage) int64 }
type OplogHandler ¶
type OplogHandler interface { // invocation on every oplog consumed Handle(log *oplog.PartialLog) }
type OplogReader ¶
type OplogReader struct {
// contains filtered or unexported fields
}
OplogReader represents stream reader from mongodb that specified by an url. And with query options. user can iterate oplogs.
func NewOplogReader ¶
func NewOplogReader(src string) *OplogReader
NewOplogReader creates reader with mongodb url
func (*OplogReader) Next ¶
func (reader *OplogReader) Next() (*bson.Raw, error)
Next returns an oplog by raw bytes which is []byte
func (*OplogReader) NextOplog ¶
func (reader *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)
NextOplog returns an oplog by oplog.GenericOplog struct
func (*OplogReader) SetQueryTimestampOnEmpty ¶
func (reader *OplogReader) SetQueryTimestampOnEmpty(ts bson.MongoTimestamp)
SetQueryTimestampOnEmpty set internal timestamp if not exist in this reader. initial stage most of the time
func (*OplogReader) StartFetcher ¶
func (reader *OplogReader) StartFetcher()
start fetcher if not exist
func (*OplogReader) UpdateQueryTimestamp ¶
func (reader *OplogReader) UpdateQueryTimestamp(ts bson.MongoTimestamp)
type OplogSyncer ¶
type OplogSyncer struct { OplogHandler // contains filtered or unexported fields }
OplogSyncer poll oplogs from original source MongoDB.
func NewOplogSyncer ¶
func NewOplogSyncer( coordinator *ReplicationCoordinator, replset string, startPosition int64, fullSyncFinishPosition int64, mongoUrl string, gid string) *OplogSyncer
* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.
func (*OplogSyncer) Handle ¶
func (sync *OplogSyncer) Handle(log *oplog.PartialLog)
func (*OplogSyncer) RestAPI ¶
func (sync *OplogSyncer) RestAPI()
type ReplicationCoordinator ¶
type ReplicationCoordinator struct { Sources []*utils.MongoSource // contains filtered or unexported fields }
ReplicationCoordinator global coordinator instance. consist of one syncerGroup and a number of workers
func (*ReplicationCoordinator) Run ¶
func (coordinator *ReplicationCoordinator) Run() error
type TransferEventListener ¶
type TransferEventListener struct {
// contains filtered or unexported fields
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(coordinator *ReplicationCoordinator, syncer *OplogSyncer, id uint32) *Worker
func (*Worker) IsAllAcked ¶
func (*Worker) Offer ¶
func (worker *Worker) Offer(batch []*oplog.GenericOplog)
type WriteController ¶
type WriteController struct { // current max lsn_ack value LatestLsnAck int64 // contains filtered or unexported fields }
func NewWriteController ¶
func NewWriteController(worker *Worker) *WriteController
func (*WriteController) Send ¶
func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64