Documentation ¶
Index ¶
- Constants
- func HashTx(entryCtx *common.EntryContext) (hashes []uint64)
- func NewDumper(ctx context.Context, db usql.QueryAble, table *common.Table, chunkSize int64, ...) *dumper
- func SelectAllGtidExecuted(db sql.QueryAble, jid string, gtidSet *mysql.MysqlGTIDSet) (itemMap base.GtidItemMap, err error)
- type Applier
- func (a *Applier) ApplyEventQueries(entry *common.DumpEntry) (err error)
- func (a *Applier) Finish1() error
- func (a *Applier) InitDB() (err error)
- func (a *Applier) Run()
- func (a *Applier) Shutdown() error
- func (a *Applier) Stats() (*common.TaskStatistics, error)
- func (a *Applier) ValidateConnection() error
- func (a *Applier) ValidateGrants() error
- type ApplierIncr
- type Extractor
- func (e *Extractor) CheckAndApplyLowerCaseTableNames()
- func (e *Extractor) CountTableRows(db sql.QueryAble, table *common.Table) (int64, error)
- func (e *Extractor) Finish1() (err error)
- func (e *Extractor) Run()
- func (e *Extractor) Shutdown() error
- func (e *Extractor) Stats() (*common.TaskStatistics, error)
- func (e *Extractor) StreamEvents() error
- type GtidExecutedCreater
- type Inspector
- func (i *Inspector) Close()
- func (i *Inspector) InitDB() (err error)
- func (i *Inspector) InitDBConnections() (err error)
- func (i *Inspector) InspectTableColumnsAndUniqueKeys(databaseName, tableName string) (columns *common.ColumnList, uniqueKeys []*common.UniqueKey, err error)
- func (i *Inspector) ValidateBinlogs() error
- func (i *Inspector) ValidateConnection() error
- func (i *Inspector) ValidateGTIDMode() error
- func (i *Inspector) ValidateGrants() error
- func (i *Inspector) ValidateOriginalTable(databaseName, tableName string, table *common.Table) (err error)
- func (i *Inspector) ValidateServerId() error
- type Int64PriQueue
- type MtsManager
- func (mm *MtsManager) Executed(binlogEntry *common.DataEntry)
- func (mm *MtsManager) Executed0(seq int64)
- func (mm *MtsManager) LcUpdater()
- func (mm *MtsManager) WaitForAllCommitted(logger g.LoggerType) bool
- func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool
- func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool
- type TimestampContext
- type WritesetManager
Constants ¶
const ( JobIncrCopy = "job_stage_incr" JobFullCopy = "job_stage_full" )
Variables ¶
This section is empty.
Functions ¶
func HashTx ¶
func HashTx(entryCtx *common.EntryContext) (hashes []uint64)
HashTx returns an empty slice if there is no row events (DDL TX), or there is a row event refering to a no-PK table.
func SelectAllGtidExecuted ¶
func SelectAllGtidExecuted(db sql.QueryAble, jid string, gtidSet *mysql.MysqlGTIDSet) ( itemMap base.GtidItemMap, err error)
return: normalized GtidSet
Types ¶
type Applier ¶
type Applier struct { NatsAddr string MySQLVersion string TotalRowsReplayed int64 // contains filtered or unexported fields }
func NewApplier ¶
func NewApplier( execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, natsAddr string, waitCh chan *drivers.ExitResult, event *eventer.Eventer, taskConfig *drivers.TaskConfig, ctx context.Context) (a *Applier, err error)
func (*Applier) ApplyEventQueries ¶
func (*Applier) ValidateGrants ¶
ValidateGrants verifies the user by which we're executing has necessary grants to do its thang.
type ApplierIncr ¶
type ApplierIncr struct { MySQLServerUuid string TotalDeltaCopied int64 EntryExecutedHook func(entry *common.DataEntry) OnError func(int, error) SkipGtidExecutedTable bool // contains filtered or unexported fields }
func NewApplierIncr ¶
func NewApplierIncr(applier *Applier, sourcetype string) (*ApplierIncr, error)
func (*ApplierIncr) ApplyBinlogEvent ¶
func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.EntryContext) (err error)
ApplyEventQueries applies multiple DML queries onto the dest table
func (*ApplierIncr) HasShutdown ¶
func (a *ApplierIncr) HasShutdown() bool
func (*ApplierIncr) MtsWorker ¶
func (a *ApplierIncr) MtsWorker(workerIndex int)
func (*ApplierIncr) Run ¶
func (a *ApplierIncr) Run() (err error)
func (*ApplierIncr) Shutdown ¶
func (a *ApplierIncr) Shutdown()
type Extractor ¶
type Extractor struct { MySQLVersion string TotalTransferredBytes int // Original comment: TotalRowsCopied returns the accurate number of rows being copied (affected) // This is not exactly the same as the rows being iterated via chunks, but potentially close enough. // TODO What is the difference between mysqlContext.RowsEstimate ? TotalRowsCopied int64 RevApplier *Applier // contains filtered or unexported fields }
Extractor is the main schema extract flow manager.
func NewExtractor ¶
func NewExtractor(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, waitCh chan *drivers.ExitResult, ctx context.Context) (*Extractor, error)
func (*Extractor) CheckAndApplyLowerCaseTableNames ¶
func (e *Extractor) CheckAndApplyLowerCaseTableNames()
func (*Extractor) CountTableRows ¶
CountTableRows counts exact number of rows on the original table
func (*Extractor) StreamEvents ¶
StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine
type GtidExecutedCreater ¶
type GtidExecutedCreater struct {
// contains filtered or unexported fields
}
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector reads data from the read-MySQL-server (typically a replica, but can be the master) It is used for gaining initial status and structure, and later also follow up on progress and changelog
func NewInspector ¶
func NewInspector(ctx *common.MySQLDriverConfig, logger g.LoggerType) *Inspector
func (*Inspector) InitDBConnections ¶
func (*Inspector) InspectTableColumnsAndUniqueKeys ¶
func (*Inspector) ValidateBinlogs ¶
ValidateBinlogs checks that binary log configuration is good to go
func (*Inspector) ValidateConnection ¶
func (*Inspector) ValidateGTIDMode ¶
func (*Inspector) ValidateGrants ¶
ValidateGrants verifies the user by which we're executing has necessary grants to do its thang.
func (*Inspector) ValidateOriginalTable ¶
func (*Inspector) ValidateServerId ¶
type Int64PriQueue ¶
type Int64PriQueue []int64
from container/heap/example_intheap_test.go
func (Int64PriQueue) Len ¶
func (q Int64PriQueue) Len() int
func (Int64PriQueue) Less ¶
func (q Int64PriQueue) Less(i, j int) bool
func (*Int64PriQueue) Pop ¶
func (q *Int64PriQueue) Pop() interface{}
func (*Int64PriQueue) Push ¶
func (q *Int64PriQueue) Push(x interface{})
func (Int64PriQueue) Swap ¶
func (q Int64PriQueue) Swap(i, j int)
type MtsManager ¶
type MtsManager struct {
// contains filtered or unexported fields
}
func NewMtsManager ¶
func NewMtsManager(shutdownCh chan struct{}, logger g.LoggerType) *MtsManager
shutdownCh: close to indicate a shutdown
func (*MtsManager) Executed ¶
func (mm *MtsManager) Executed(binlogEntry *common.DataEntry)
func (*MtsManager) Executed0 ¶
func (mm *MtsManager) Executed0(seq int64)
func (*MtsManager) LcUpdater ¶
func (mm *MtsManager) LcUpdater()
func (*MtsManager) WaitForAllCommitted ¶
func (mm *MtsManager) WaitForAllCommitted(logger g.LoggerType) bool
This function must be called sequentially.
func (*MtsManager) WaitForExecution ¶
func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool
block for waiting. return true for can_execute, false for abortion.
This function must be called sequentially.
func (*MtsManager) WaitForExecution0 ¶
func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool
type TimestampContext ¶
type TimestampContext struct { // Do not pass 0 to the chan. TimestampCh chan uint32 // contains filtered or unexported fields }
func NewTimestampContext ¶
func NewTimestampContext(stopCh chan struct{}, logger g.LoggerType, emptyQueueFunc func() bool) *TimestampContext
func (*TimestampContext) GetDelay ¶
func (tsc *TimestampContext) GetDelay() (d int64)
func (*TimestampContext) Handle ¶
func (tsc *TimestampContext) Handle()
type WritesetManager ¶
type WritesetManager struct {
// contains filtered or unexported fields
}
func NewWritesetManager ¶
func NewWritesetManager(historySize int) *WritesetManager
func (*WritesetManager) GatLastCommit ¶
func (wm *WritesetManager) GatLastCommit(entryCtx *common.EntryContext, logger g.LoggerType) int64