Documentation ¶
Index ¶
- Variables
- func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr expression.Expression, ...) (bool, error)
- type CheckPoint
- type Cond
- type DDLWorker
- type DMLWorker
- type DataValidator
- func (v *DataValidator) GetValidatorError(errState pb.ValidateErrorState) ([]*pb.ValidationError, error)
- func (v *DataValidator) GetValidatorStatus() *pb.ValidationStatus
- func (v *DataValidator) GetValidatorTableStatus(filterStatus pb.Stage) []*pb.ValidationTableStatus
- func (v *DataValidator) OperateValidatorError(validateOp pb.ValidationErrOp, errID uint64, isAll bool) error
- func (v *DataValidator) Stage() pb.Stage
- func (v *DataValidator) Start(expect pb.Stage)
- func (v *DataValidator) Started() bool
- func (v *DataValidator) Stop()
- func (v *DataValidator) UpdateValidator(req *pb.UpdateValidationWorkerRequest) error
- type ExprFilterGroup
- func (g *ExprFilterGroup) GetDeleteExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)
- func (g *ExprFilterGroup) GetInsertExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)
- func (g *ExprFilterGroup) GetUpdateExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, []expression.Expression, error)
- func (g *ExprFilterGroup) ResetExprs(table *filter.Table)
- type Normal
- type OptShardingGroup
- type OptShardingGroupKeeper
- func (k *OptShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location
- func (k *OptShardingGroupKeeper) RemoveGroup(targetTable *filter.Table, sourceTableIDs []string)
- func (k *OptShardingGroupKeeper) RemoveSchema(schema string)
- func (k *OptShardingGroupKeeper) Reset()
- type Optimist
- type Pessimist
- type RemoteCheckPoint
- func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, ...) error
- func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) Close()
- func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error
- func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error
- func (cp *RemoteCheckPoint) DiscardPendingSnapshots()
- func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, snapshotID int, exceptTables []*filter.Table, ...) error
- func (cp *RemoteCheckPoint) FlushPointsWithTableInfos(tctx *tcontext.Context, tables []*filter.Table, tis []*model.TableInfo) error
- func (cp *RemoteCheckPoint) FlushSafeModeExitPoint(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location
- func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo
- func (cp *RemoteCheckPoint) GlobalPoint() binlog.Location
- func (cp *RemoteCheckPoint) GlobalPointSaveTime() time.Time
- func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) (err error)
- func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location) bool
- func (cp *RemoteCheckPoint) LastFlushOutdated() bool
- func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error
- func (cp *RemoteCheckPoint) LoadMeta(ctx context.Context) error
- func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) Rollback()
- func (cp *RemoteCheckPoint) SafeModeExitPoint() *binlog.Location
- func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location)
- func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location)
- func (cp *RemoteCheckPoint) SaveSafeModeExitPoint(point *binlog.Location)
- func (cp *RemoteCheckPoint) SaveTablePoint(table *filter.Table, point binlog.Location, ti *model.TableInfo)
- func (cp *RemoteCheckPoint) Snapshot(isSyncFlush bool) *SnapshotInfo
- func (cp *RemoteCheckPoint) String() string
- func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location
- type ShardingGroup
- func (sg *ShardingGroup) ActiveDDLFirstLocation() (binlog.Location, error)
- func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) (beforeActiveDDL bool)
- func (sg *ShardingGroup) FirstEndPosUnresolved() *binlog.Location
- func (sg *ShardingGroup) FirstLocationUnresolved() *binlog.Location
- func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})
- func (sg *ShardingGroup) Leave(sources []string) error
- func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)
- func (sg *ShardingGroup) Reset()
- func (sg *ShardingGroup) ResolveShardingDDL() bool
- func (sg *ShardingGroup) Sources() map[string]bool
- func (sg *ShardingGroup) String() string
- func (sg *ShardingGroup) Tables() []*filter.Table
- func (sg *ShardingGroup) TrySync(source string, location, endLocation binlog.Location, ddls []string) (bool, bool, int, error)
- func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
- func (sg *ShardingGroup) UnresolvedTables() []*filter.Table
- type ShardingGroupKeeper
- func (k *ShardingGroupKeeper) ActiveDDLFirstLocation(targetTable *filter.Table) (binlog.Location, error)
- func (k *ShardingGroupKeeper) AddGroup(targetTable *filter.Table, sourceIDs []string, meta *shardmeta.ShardingMeta, ...) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, ...)
- func (k *ShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location
- func (k *ShardingGroupKeeper) CheckAndFix(metas map[string]*shardmeta.ShardingMeta, schemaMap map[string]string, ...) error
- func (k *ShardingGroupKeeper) Close()
- func (k *ShardingGroupKeeper) Group(targetTable *filter.Table) *ShardingGroup
- func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
- func (k *ShardingGroupKeeper) InSyncing(sourceTable, targetTable *filter.Table, location binlog.Location) bool
- func (k *ShardingGroupKeeper) Init() (err error)
- func (k *ShardingGroupKeeper) LeaveGroup(targetTable *filter.Table, sources []string) error
- func (k *ShardingGroupKeeper) LoadShardMeta(flavor string, enableGTID bool) (map[string]*shardmeta.ShardingMeta, error)
- func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})
- func (k *ShardingGroupKeeper) ResetGroups()
- func (k *ShardingGroupKeeper) ResolveShardingDDL(targetTable *filter.Table) (bool, error)
- func (k *ShardingGroupKeeper) TrySync(sourceTable, targetTable *filter.Table, location, endLocation binlog.Location, ...) (needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, ...)
- func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
- func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, []*filter.Table)
- type ShardingReSync
- type SnapshotInfo
- type Syncer
- func (s *Syncer) CheckCanUpdateCfg(newCfg *config.SubTaskConfig) error
- func (s *Syncer) Close()
- func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (string, error)
- func (s *Syncer) Init(ctx context.Context) (err error)
- func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error)
- func (s *Syncer) IsRunning() bool
- func (s *Syncer) Kill()
- func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (msg string, err error)
- func (s *Syncer) Pause()
- func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Run(ctx context.Context) (err error)
- func (s *Syncer) ShardDDLOperation() *pessimism.Operation
- func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{}
- func (s *Syncer) Type() pb.UnitType
- func (s *Syncer) Update(ctx context.Context, cfg *config.SubTaskConfig) error
- type TestInjector
Constants ¶
This section is empty.
Variables ¶
var (
// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL.
MaxDDLConnectionTimeoutMinute = 5
)
Functions ¶
func SkipDMLByExpression ¶
func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr expression.Expression, upstreamCols []*model.ColumnInfo) (bool, error)
SkipDMLByExpression returns true when given row matches the expr, which means this row should be skipped.
Types ¶
type CheckPoint ¶
type CheckPoint interface { // Init initializes the CheckPoint Init(tctx *tcontext.Context) error // Close closes the CheckPoint Close() // ResetConn resets database connections owned by the Checkpoint ResetConn(tctx *tcontext.Context) error // Clear clears all checkpoints Clear(tctx *tcontext.Context) error // Load loads all checkpoints saved by CheckPoint Load(tctx *tcontext.Context) error // LoadMeta loads checkpoints from meta config item or file LoadMeta(ctx context.Context) error // SaveTablePoint saves checkpoint for specified table in memory SaveTablePoint(table *filter.Table, point binlog.Location, ti *model.TableInfo) // DeleteTablePoint deletes checkpoint for specified table in memory and storage DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error // DeleteAllTablePoint deletes all checkpoints for table in memory and storage DeleteAllTablePoint(tctx *tcontext.Context) error // DeleteSchemaPoint deletes checkpoint for specified schema DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint IsOlderThanTablePoint(table *filter.Table, point binlog.Location) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save SaveGlobalPoint(point binlog.Location) // SaveGlobalPointForcibly saves the global binlog stream's checkpoint forcibly. SaveGlobalPointForcibly(location binlog.Location) // Snapshot make a snapshot of current checkpoint. If returns nil, it means nothing has changed since last call. Snapshot(isSyncFlush bool) *SnapshotInfo // DiscardPendingSnapshots discards all pending snapshots. It's used when we create a snapshot but are unable to // call FlushPointsExcept() to flush the snapshot due to some error. DiscardPendingSnapshots() // FlushPointsExcept flushes the global checkpoint and tables' // checkpoints except exceptTables, it also flushes SQLs with Args providing // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. // @exceptTables: [[schema, table]... ] // corresponding to Meta.Flush FlushPointsExcept(tctx *tcontext.Context, snapshotID int, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error // FlushPointsWithTableInfos flushed the table points with given table infos FlushPointsWithTableInfos(tctx *tcontext.Context, tables []*filter.Table, tis []*model.TableInfo) error // FlushSafeModeExitPoint flushed the global checkpoint's with given table info FlushSafeModeExitPoint(tctx *tcontext.Context) error // GlobalPoint returns the global binlog stream's checkpoint // corresponding to Meta.Pos and Meta.GTID GlobalPoint() binlog.Location // GlobalPointSaveTime return the global point saved time, used for test only GlobalPointSaveTime() time.Time // SaveSafeModeExitPoint saves the pointer to location which indicates safe mode exit // this location is used when dump unit can't assure consistency SaveSafeModeExitPoint(point *binlog.Location) // SafeModeExitPoint returns the location where safe mode could safely turn off after SafeModeExitPoint() *binlog.Location // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]binlog.Location // GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found GetTableInfo(schema string, table string) *model.TableInfo // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location // LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of // the two times are outdated, LastFlushOutdated returns true. LastFlushOutdated() bool // Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints Rollback() // String return text of global position String() string // LoadIntoSchemaTracker loads table infos of all points into schema tracker. LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error // CheckAndUpdate check the checkpoint data consistency and try to fix them if possible CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error }
CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again.
func NewRemoteCheckPoint ¶
func NewRemoteCheckPoint( tctx *tcontext.Context, cfg *config.SubTaskConfig, metricProxies *metrics.Proxies, id string, ) CheckPoint
NewRemoteCheckPoint creates a new RemoteCheckPoint.
type Cond ¶
type DDLWorker ¶
type DDLWorker struct {
// contains filtered or unexported fields
}
func NewDDLWorker ¶
NewDDLWorker creates a new DDLWorker instance.
func (*DDLWorker) AstToDDLEvent ¶
AstToDDLEvent returns filter.DDLEvent.
func (*DDLWorker) HandleQueryEvent ¶
func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventContext, originSQL string) (err error)
type DMLWorker ¶
type DMLWorker struct {
// contains filtered or unexported fields
}
DMLWorker is used to sync dml.
type DataValidator ¶
type DataValidator struct { // used to sync Stop and Start operations. sync.RWMutex L log.Logger // contains filtered or unexported fields }
DataValidator is used to continuously validate incremental data migrated to downstream by dm. validator can be start when there's syncer unit in the subtask and validation mode is not none, it's terminated when the subtask is terminated. stage of validator is independent of subtask, pause/resume subtask doesn't affect the stage of validator.
validator can be in running or stopped stage - in running when it's started with subtask or started later on the fly. - in stopped when validation stop is executed.
for each subtask, before it's closed/killed, only one DataValidator object is created, on "dmctl validation stop/start", will call Stop and Start on the same object.
func NewContinuousDataValidator ¶
func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer, startWithSubtask bool) *DataValidator
func (*DataValidator) GetValidatorError ¶
func (v *DataValidator) GetValidatorError(errState pb.ValidateErrorState) ([]*pb.ValidationError, error)
func (*DataValidator) GetValidatorStatus ¶
func (v *DataValidator) GetValidatorStatus() *pb.ValidationStatus
func (*DataValidator) GetValidatorTableStatus ¶
func (v *DataValidator) GetValidatorTableStatus(filterStatus pb.Stage) []*pb.ValidationTableStatus
func (*DataValidator) OperateValidatorError ¶
func (v *DataValidator) OperateValidatorError(validateOp pb.ValidationErrOp, errID uint64, isAll bool) error
func (*DataValidator) Stage ¶
func (v *DataValidator) Stage() pb.Stage
func (*DataValidator) Start ¶
func (v *DataValidator) Start(expect pb.Stage)
func (*DataValidator) Started ¶
func (v *DataValidator) Started() bool
func (*DataValidator) Stop ¶
func (v *DataValidator) Stop()
func (*DataValidator) UpdateValidator ¶
func (v *DataValidator) UpdateValidator(req *pb.UpdateValidationWorkerRequest) error
type ExprFilterGroup ¶
type ExprFilterGroup struct {
// contains filtered or unexported fields
}
ExprFilterGroup groups many related fields about expression filter.
func NewExprFilterGroup ¶
func NewExprFilterGroup(logCtx *tcontext.Context, tidbCtx sessionctx.Context, exprConfig []*config.ExpressionFilter) *ExprFilterGroup
NewExprFilterGroup creates an ExprFilterGroup.
func (*ExprFilterGroup) GetDeleteExprs ¶
func (g *ExprFilterGroup) GetDeleteExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)
GetDeleteExprs returns the expression filters for given table to filter DELETE events. This function will lazy calculate expressions if not initialized.
func (*ExprFilterGroup) GetInsertExprs ¶
func (g *ExprFilterGroup) GetInsertExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)
GetInsertExprs returns the expression filters for given table to filter INSERT events. This function will lazy calculate expressions if not initialized.
func (*ExprFilterGroup) GetUpdateExprs ¶
func (g *ExprFilterGroup) GetUpdateExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, []expression.Expression, error)
GetUpdateExprs returns two lists of expression filters for given table, to filter UPDATE events by old values and new values respectively. The two lists should have same length, and the corresponding expressions is AND logic. This function will lazy calculate expressions if not initialized.
func (*ExprFilterGroup) ResetExprs ¶
func (g *ExprFilterGroup) ResetExprs(table *filter.Table)
ResetExprs deletes the expressions generated before. This should be called after table structure changed.
type OptShardingGroup ¶
OptShardingGroup represents a optimistic sharding DDL sync group.
func NewOptShardingGroup ¶
func NewOptShardingGroup(firstConflictLocation binlog.Location, flavor string, enableGTID bool) *OptShardingGroup
func (*OptShardingGroup) Remove ¶
func (s *OptShardingGroup) Remove(sourceTableIDs []string)
type OptShardingGroupKeeper ¶
OptShardingGroupKeeper used to keep OptShardingGroup. It's used to keep sharding group meta data to make sure optimistic sharding resync redirection works correctly.
newer │ ───────────────────────► time │ │ tb1 conflict DDL1 │ ▲ │ │ │ │ │ │ ... │ │ │ │ │ │ │ │ tb1 conflict DDL2 │ │ │ ▲ │ │ │ │ │ │ │ │ ... │ │ │ │ │ │ │ │ │ │ │ │ tb2 conflict DDL1 ▼ │ │ │ │ │ │ │ │ │ ... redirect │ │ │ │ │ │ │ │ tb2 conflict DDL2 ▼ │ │ │ │ │ ... redirect │ │ │ │ other dml events ▼ │ │ continue ▼ replicating
newer binlog One redirection example is listed as above.
func NewOptShardingGroupKeeper ¶
func NewOptShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *OptShardingGroupKeeper
NewOptShardingGroupKeeper creates a new OptShardingGroupKeeper.
func (*OptShardingGroupKeeper) AdjustGlobalLocation ¶
func (k *OptShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location
AdjustGlobalLocation adjusts globalLocation with sharding groups' lowest first point.
func (*OptShardingGroupKeeper) RemoveGroup ¶
func (k *OptShardingGroupKeeper) RemoveGroup(targetTable *filter.Table, sourceTableIDs []string)
func (*OptShardingGroupKeeper) RemoveSchema ¶
func (k *OptShardingGroupKeeper) RemoveSchema(schema string)
func (*OptShardingGroupKeeper) Reset ¶
func (k *OptShardingGroupKeeper) Reset()
Reset resets the keeper.
type RemoteCheckPoint ¶
RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet it's not thread-safe.
func (*RemoteCheckPoint) CheckAndUpdate ¶
func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error
CheckAndUpdate check the checkpoint data consistency and try to fix them if possible.
func (*RemoteCheckPoint) Clear ¶
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error
Clear implements CheckPoint.Clear.
func (*RemoteCheckPoint) Close ¶
func (cp *RemoteCheckPoint) Close()
Close implements CheckPoint.Close.
func (*RemoteCheckPoint) DeleteAllTablePoint ¶
func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error
DeleteAllTablePoint implements CheckPoint.DeleteAllTablePoint.
func (*RemoteCheckPoint) DeleteSchemaPoint ¶
func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error
DeleteSchemaPoint implements CheckPoint.DeleteSchemaPoint.
func (*RemoteCheckPoint) DeleteTablePoint ¶
DeleteTablePoint implements CheckPoint.DeleteTablePoint.
func (*RemoteCheckPoint) DiscardPendingSnapshots ¶
func (cp *RemoteCheckPoint) DiscardPendingSnapshots()
DiscardPendingSnapshots discard all pending snapshots.
func (*RemoteCheckPoint) FlushPointsExcept ¶
func (cp *RemoteCheckPoint) FlushPointsExcept( tctx *tcontext.Context, snapshotID int, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}, ) error
FlushPointsExcept implements CheckPoint.FlushPointsExcept.
func (*RemoteCheckPoint) FlushPointsWithTableInfos ¶
func (cp *RemoteCheckPoint) FlushPointsWithTableInfos(tctx *tcontext.Context, tables []*filter.Table, tis []*model.TableInfo) error
FlushPointsWithTableInfos implements CheckPoint.FlushPointsWithTableInfos.
func (*RemoteCheckPoint) FlushSafeModeExitPoint ¶
func (cp *RemoteCheckPoint) FlushSafeModeExitPoint(tctx *tcontext.Context) error
FlushSafeModeExitPoint implements CheckPoint.FlushSafeModeExitPoint.
func (*RemoteCheckPoint) FlushedGlobalPoint ¶
func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location
FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint.
func (*RemoteCheckPoint) GetTableInfo ¶
func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo
func (*RemoteCheckPoint) GlobalPoint ¶
func (cp *RemoteCheckPoint) GlobalPoint() binlog.Location
GlobalPoint implements CheckPoint.GlobalPoint.
func (*RemoteCheckPoint) GlobalPointSaveTime ¶
func (cp *RemoteCheckPoint) GlobalPointSaveTime() time.Time
GlobalPointSaveTime implements CheckPoint.GlobalPointSaveTime.
func (*RemoteCheckPoint) Init ¶
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) (err error)
Init implements CheckPoint.Init.
func (*RemoteCheckPoint) IsOlderThanTablePoint ¶
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location) bool
IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. This function is used to skip old binlog events. Table checkpoint is saved after dispatching a binlog event.
- For GTID based and position based replication, DML handling is a bit different but comparison is same here. When using position based, each event has unique position so we have confident to skip event which is <= table checkpoint. When using GTID based, there may be more than one event with same GTID, but we still skip event which is <= table checkpoint, to make this right we only save table point for the transaction affected tables only after the whole transaction is processed
- DDL will not have unique position or GTID, so we can always skip events <= table checkpoint.
func (*RemoteCheckPoint) LastFlushOutdated ¶
func (cp *RemoteCheckPoint) LastFlushOutdated() bool
LastFlushOutdated implements CheckPoint.LastFlushOutdated.
func (*RemoteCheckPoint) Load ¶
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error
Load implements CheckPoint.Load.
func (*RemoteCheckPoint) LoadIntoSchemaTracker ¶
func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error
LoadIntoSchemaTracker loads table infos of all points into schema tracker.
func (*RemoteCheckPoint) LoadMeta ¶
func (cp *RemoteCheckPoint) LoadMeta(ctx context.Context) error
LoadMeta implements CheckPoint.LoadMeta.
func (*RemoteCheckPoint) ResetConn ¶
func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error
ResetConn implements CheckPoint.ResetConn.
func (*RemoteCheckPoint) Rollback ¶
func (cp *RemoteCheckPoint) Rollback()
Rollback implements CheckPoint.Rollback.
func (*RemoteCheckPoint) SafeModeExitPoint ¶
func (cp *RemoteCheckPoint) SafeModeExitPoint() *binlog.Location
SafeModeExitPoint implements CheckPoint.SafeModeExitPoint.
func (*RemoteCheckPoint) SaveGlobalPoint ¶
func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location)
SaveGlobalPoint implements CheckPoint.SaveGlobalPoint.
func (*RemoteCheckPoint) SaveGlobalPointForcibly ¶
func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location)
SaveGlobalPointForcibly implements CheckPoint.SaveGlobalPointForcibly.
func (*RemoteCheckPoint) SaveSafeModeExitPoint ¶
func (cp *RemoteCheckPoint) SaveSafeModeExitPoint(point *binlog.Location)
SaveSafeModeExitPoint implements CheckPoint.SaveSafeModeExitPoint shouldn't call concurrently (only called before loop in Syncer.Run and in loop to reset).
func (*RemoteCheckPoint) SaveTablePoint ¶
func (cp *RemoteCheckPoint) SaveTablePoint(table *filter.Table, point binlog.Location, ti *model.TableInfo)
SaveTablePoint implements CheckPoint.SaveTablePoint.
func (*RemoteCheckPoint) Snapshot ¶
func (cp *RemoteCheckPoint) Snapshot(isSyncFlush bool) *SnapshotInfo
Snapshot make a snapshot of checkpoint and return the snapshot info.
func (*RemoteCheckPoint) String ¶
func (cp *RemoteCheckPoint) String() string
String implements CheckPoint.String.
func (*RemoteCheckPoint) TablePoint ¶
func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location
TablePoint implements CheckPoint.TablePoint.
type ShardingGroup ¶
type ShardingGroup struct { sync.RWMutex IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later // contains filtered or unexported fields }
ShardingGroup represents a sharding DDL sync group.
func NewShardingGroup ¶
func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources []string, meta *shardmeta.ShardingMeta, isSchemaOnly bool, flavor string, enableGTID bool) *ShardingGroup
NewShardingGroup creates a new ShardingGroup.
func (*ShardingGroup) ActiveDDLFirstLocation ¶
func (sg *ShardingGroup) ActiveDDLFirstLocation() (binlog.Location, error)
ActiveDDLFirstLocation returns the first binlog position of active DDL.
func (*ShardingGroup) CheckSyncing ¶
func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) (beforeActiveDDL bool)
CheckSyncing checks the source table syncing status returns
beforeActiveDDL: whether the position is before active DDL
func (*ShardingGroup) FirstEndPosUnresolved ¶
func (sg *ShardingGroup) FirstEndPosUnresolved() *binlog.Location
FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil.
func (*ShardingGroup) FirstLocationUnresolved ¶
func (sg *ShardingGroup) FirstLocationUnresolved() *binlog.Location
FirstLocationUnresolved returns the first DDL pos if un-resolved, else nil.
func (*ShardingGroup) FlushData ¶
func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})
FlushData returns sharding meta flush SQLs and args.
func (*ShardingGroup) Leave ¶
func (sg *ShardingGroup) Leave(sources []string) error
Leave leaves from sharding group it, doesn't affect in syncing process used cases
- drop a database
- drop table
func (*ShardingGroup) Merge ¶
Merge merges new sources to exists used cases
- add a new table to exists sharding group
- add new table(s) to parent database's sharding group if group is in sequence sharding, return error directly otherwise add it in source, set it false and increment remain
func (*ShardingGroup) Reset ¶
func (sg *ShardingGroup) Reset()
Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it.
func (*ShardingGroup) ResolveShardingDDL ¶
func (sg *ShardingGroup) ResolveShardingDDL() bool
ResolveShardingDDL resolves sharding DDL in sharding group.
func (*ShardingGroup) Sources ¶
func (sg *ShardingGroup) Sources() map[string]bool
Sources returns all sources (and whether synced).
func (*ShardingGroup) String ¶
func (sg *ShardingGroup) String() string
String implements Stringer.String.
func (*ShardingGroup) Tables ¶
func (sg *ShardingGroup) Tables() []*filter.Table
Tables returns all source tables' <schema, table> pair.
func (*ShardingGroup) TrySync ¶
func (sg *ShardingGroup) TrySync(source string, location, endLocation binlog.Location, ddls []string) (bool, bool, int, error)
TrySync tries to sync the sharding group returns
synced: whether the source table's sharding group synced active: whether the DDL will be processed in this round remain: remain un-synced source table's count
func (*ShardingGroup) UnresolvedGroupInfo ¶
func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil.
func (*ShardingGroup) UnresolvedTables ¶
func (sg *ShardingGroup) UnresolvedTables() []*filter.Table
UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil.
type ShardingGroupKeeper ¶
ShardingGroupKeeper used to keep ShardingGroup.
func NewShardingGroupKeeper ¶
func NewShardingGroupKeeper( tctx *tcontext.Context, cfg *config.SubTaskConfig, metricProxies *metrics.Proxies, ) *ShardingGroupKeeper
NewShardingGroupKeeper creates a new ShardingGroupKeeper.
func (*ShardingGroupKeeper) ActiveDDLFirstLocation ¶
func (k *ShardingGroupKeeper) ActiveDDLFirstLocation(targetTable *filter.Table) (binlog.Location, error)
ActiveDDLFirstLocation returns the binlog position of active DDL.
func (*ShardingGroupKeeper) AddGroup ¶
func (k *ShardingGroupKeeper) AddGroup(targetTable *filter.Table, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)
AddGroup adds new group(s) according to target schema, table and source IDs.
func (*ShardingGroupKeeper) AdjustGlobalLocation ¶
func (k *ShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location
AdjustGlobalLocation adjusts globalLocation with sharding groups' lowest first point.
func (*ShardingGroupKeeper) CheckAndFix ¶
func (k *ShardingGroupKeeper) CheckAndFix(metas map[string]*shardmeta.ShardingMeta, schemaMap map[string]string, tablesMap map[string]map[string]string) error
CheckAndFix try to check and fix the schema/table case-sensitive issue.
NOTE: CheckAndFix is called before sharding groups are inited.
func (*ShardingGroupKeeper) Close ¶
func (k *ShardingGroupKeeper) Close()
Close closes sharding group keeper.
func (*ShardingGroupKeeper) Group ¶
func (k *ShardingGroupKeeper) Group(targetTable *filter.Table) *ShardingGroup
Group returns target table's group, nil if not exist.
func (*ShardingGroupKeeper) Groups ¶
func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) InSyncing ¶
func (k *ShardingGroupKeeper) InSyncing(sourceTable, targetTable *filter.Table, location binlog.Location) bool
InSyncing checks whether the source is in sharding syncing, that is to say not before active DDL.
func (*ShardingGroupKeeper) Init ¶
func (k *ShardingGroupKeeper) Init() (err error)
Init does initialization staff.
func (*ShardingGroupKeeper) LeaveGroup ¶
func (k *ShardingGroupKeeper) LeaveGroup(targetTable *filter.Table, sources []string) error
LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process.
func (*ShardingGroupKeeper) LoadShardMeta ¶
func (k *ShardingGroupKeeper) LoadShardMeta(flavor string, enableGTID bool) (map[string]*shardmeta.ShardingMeta, error)
LoadShardMeta implements CheckPoint.LoadShardMeta.
func (*ShardingGroupKeeper) PrepareFlushSQLs ¶
func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})
PrepareFlushSQLs returns all sharding meta flushed SQLs except for given table IDs.
func (*ShardingGroupKeeper) ResetGroups ¶
func (k *ShardingGroupKeeper) ResetGroups()
ResetGroups resets group's sync status.
func (*ShardingGroupKeeper) ResolveShardingDDL ¶
func (k *ShardingGroupKeeper) ResolveShardingDDL(targetTable *filter.Table) (bool, error)
ResolveShardingDDL resolves one sharding DDL in specific group.
func (*ShardingGroupKeeper) TrySync ¶
func (k *ShardingGroupKeeper) TrySync( sourceTable, targetTable *filter.Table, location, endLocation binlog.Location, ddls []string) ( needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error, )
TrySync tries to sync the sharding group returns
isSharding: whether the source table is in a sharding group group: the sharding group synced: whether the source table's sharding group synced active: whether is active DDL in sequence sharding DDL remain: remain un-synced source table's count
func (*ShardingGroupKeeper) UnresolvedGroups ¶
func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) UnresolvedTables ¶
func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, []*filter.Table)
UnresolvedTables returns
all `target-schema.target-table` that has unresolved sharding DDL, all source tables which with DDLs are un-resolved
NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress.
type ShardingReSync ¶
type ShardingReSync struct {
// contains filtered or unexported fields
}
ShardingReSync represents re-sync info for a sharding DDL group.
func (*ShardingReSync) String ¶
func (s *ShardingReSync) String() string
String implements stringer.String.
type SnapshotInfo ¶
type SnapshotInfo struct {
// contains filtered or unexported fields
}
SnapshotInfo contains: - checkpoint snapshot id, it's for retrieving checkpoint snapshot in flush phase - global checkpoint position, it's for updating current active relay log after checkpoint flush.
type Syncer ¶
type Syncer struct { sync.RWMutex // `lower_case_table_names` setting of upstream db SourceTableNamesFlavor conn.LowerCaseTableNamesFlavor // contains filtered or unexported fields }
Syncer can sync your MySQL data to another MySQL database.
func (*Syncer) CheckCanUpdateCfg ¶
func (s *Syncer) CheckCanUpdateCfg(newCfg *config.SubTaskConfig) error
CheckCanUpdateCfg check if task config can be updated. 1. task must not in a pessimistic ddl state. 2. only balist, route/filter rules and syncerConfig can be updated at this moment. 3. some config fields from sourceCfg also can be updated, see more in func `copyConfigFromSource`.
func (*Syncer) HandleError ¶
HandleError handle error for syncer.
func (*Syncer) Init ¶
Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.
func (*Syncer) IsFreshTask ¶
IsFreshTask implements Unit.IsFreshTask.
func (*Syncer) OperateSchema ¶
func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (msg string, err error)
OperateSchema operates schema for an upstream table.
func (*Syncer) Process ¶
func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
Process implements the dm.Unit interface.
func (*Syncer) Resume ¶
func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
Resume resumes the paused process.
func (*Syncer) ShardDDLOperation ¶
ShardDDLOperation returns the current pending to handle shard DDL lock operation.
func (*Syncer) Status ¶
func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{}
Status implements Unit.Status.
type TestInjector ¶
type TestInjector struct {
// contains filtered or unexported fields
}
TestInjector is used to support inject test cases into syncer. In some cases, we use failpoint to control the test flow, but we may need to control the flow based on some previous status, so we add this TestInjector to record these status. NOTE: if HTTP for failpoint works well, then we may remove this.
Source Files ¶
- causality.go
- checkpoint.go
- checkpoint_flush_worker.go
- compactor.go
- data_validator.go
- ddl.go
- dml.go
- dml_worker.go
- error.go
- expr_filter_group.go
- filter.go
- handle_error.go
- job.go
- opt_sharding_group.go
- optimist.go
- relay.go
- safe_mode.go
- schema.go
- sharding_group.go
- status.go
- syncer.go
- test_injector.go
- util.go
- validate_worker.go
- validator_checkpoint.go
- validator_cond.go
Directories ¶
Path | Synopsis |
---|---|
Package binlogstream is used by syncer to read binlog.
|
Package binlogstream is used by syncer to read binlog. |