Documentation ¶
Index ¶
- Constants
- type Batcher
- type Module
- type OplogHandler
- type OplogSyncer
- type Persister
- func (p *Persister) GetFetchStage() int32
- func (p *Persister) GetQueryTsFromDiskQueue() bson.MongoTimestamp
- func (p *Persister) InitDiskQueue(dqName string)
- func (p *Persister) Inject(input []byte)
- func (p *Persister) PushToPendingQueue(input []byte)
- func (p *Persister) RestAPI()
- func (p *Persister) SetFetchStage(fetchStage int32)
- func (p *Persister) Start()
- type TransferEventListener
- type Worker
- func (worker *Worker) AllAcked(allAcked bool)
- func (worker *Worker) Init() bool
- func (worker *Worker) IsAllAcked() bool
- func (worker *Worker) Offer(batch []*oplog.GenericOplog)
- func (worker *Worker) RestAPI()
- func (worker *Worker) SetInitSyncFinishTs(fullSyncFinishPosition int64)
- func (worker *Worker) StartWorker()
- func (worker *Worker) String() string
- type WriteController
Constants ¶
View Source
const ( // bson deserialize workload is CPU-intensive task PipelineQueueMaxNr = 6 PipelineQueueMiddleNr = 4 PipelineQueueMinNr = 1 PipelineQueueLen = 64 DurationTime = 6000 // unit: ms. DDLCheckpointInterval = 300 // unit: ms. FilterCheckpointGap = 180 // unit: seconds. no checkpoint update, flush checkpoint mandatory FilterCheckpointCheckInterval = 180 // unit: seconds. CheckCheckpointUpdateTimes = 10 // at most times of time check )
View Source
const (
FullSyncReaderOplogStoreDiskReadBatch = 10000
)
View Source
const MaxUnAckListLength = 128 * 256
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.
func NewBatcher ¶
func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain, handler OplogHandler, workerGroup []*Worker) *Batcher
func (*Batcher) BatchMore ¶
*
- this function is used to gather oplogs together.
- honestly speaking, it's complicate so that reading unit tests may help you
- to make it more clear. The reason this function is so complicate is there're
- too much corner cases here.
- return batched oplogs and barrier flag.
- set barrier if find DDL.
- i d i c u i
- | |
type Module ¶
type Module interface { IsRegistered() bool /** * Module install and initialize. return false on failed * and only invocation on WriteController is preparing */ Install() bool /** * Handle outstanding request message. and messages * are passed one by one. Any changes of message in * Handle() will be preserved and delivery to next * * @return tunnel's error code (<0) or ack value * */ Handle(message *tunnel.WMessage) int64 }
type OplogHandler ¶
type OplogHandler interface { // invocation on every oplog consumed Handle(log *oplog.PartialLog) }
type OplogSyncer ¶
type OplogSyncer struct { OplogHandler // source mongodb replica set name Replset string // pending queue. used by raw log parsing. we buffered the // target raw oplogs in buffer and push them to pending queue // when buffer is filled in. and transfer to log queue // buffer []*bson.Raw // move to persister PendingQueue []chan [][]byte LastFetchTs bson.MongoTimestamp // the previous last fetch timestamp // can be closed CanClose bool SyncGroup []*OplogSyncer // contains filtered or unexported fields }
OplogSyncer poll oplogs from original source MongoDB.
func NewOplogSyncer ¶
func NewOplogSyncer( replset string, startPosition int64, fullSyncFinishPosition int64, mongoUrl string, gids []string, rateController *nimo.SimpleRateController) *OplogSyncer
* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> persister --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.
func (*OplogSyncer) Handle ¶
func (sync *OplogSyncer) Handle(log *oplog.PartialLog)
func (*OplogSyncer) Init ¶
func (sync *OplogSyncer) Init()
func (*OplogSyncer) RestAPI ¶
func (sync *OplogSyncer) RestAPI()
func (*OplogSyncer) StartDiskApply ¶
func (sync *OplogSyncer) StartDiskApply()
func (*OplogSyncer) String ¶
func (sync *OplogSyncer) String() string
type Persister ¶
type Persister struct { // batch data([]byte) together and send to downstream Buffer [][]byte // disk queue used to store oplog temporarily DiskQueue *diskQueue.DiskQueue // contains filtered or unexported fields }
func NewPersister ¶
func NewPersister(replset string, sync *OplogSyncer) *Persister
func (*Persister) GetFetchStage ¶
func (*Persister) GetQueryTsFromDiskQueue ¶
func (p *Persister) GetQueryTsFromDiskQueue() bson.MongoTimestamp
func (*Persister) InitDiskQueue ¶
func (*Persister) PushToPendingQueue ¶
func (*Persister) SetFetchStage ¶
type TransferEventListener ¶
type TransferEventListener struct {
// contains filtered or unexported fields
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(syncer *OplogSyncer, id uint32) *Worker
func (*Worker) IsAllAcked ¶
func (*Worker) Offer ¶
func (worker *Worker) Offer(batch []*oplog.GenericOplog)
func (*Worker) SetInitSyncFinishTs ¶
func (*Worker) StartWorker ¶
func (worker *Worker) StartWorker()
type WriteController ¶
type WriteController struct { // current max lsn_ack value LatestLsnAck int64 // contains filtered or unexported fields }
func NewWriteController ¶
func NewWriteController(worker *Worker) *WriteController
func (*WriteController) Send ¶
func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64
func (*WriteController) SetInitSyncFinishTs ¶
func (controller *WriteController) SetInitSyncFinishTs(fullSyncFinishPosition int64)
set init sync finish timestamp if tunnel is direct
Source Files ¶
Click to show internal directories.
Click to hide internal directories.