syncer

package
v0.0.0-...-4624acb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2024 License: Apache-2.0 Imports: 102 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL.
	MaxDDLConnectionTimeoutMinute = 5
)

Functions

func SkipDMLByExpression

func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr expression.Expression, upstreamCols []*model.ColumnInfo) (bool, error)

SkipDMLByExpression returns true when given row matches the expr, which means this row should be skipped.

Types

type CheckPoint

type CheckPoint interface {
	// Init initializes the CheckPoint
	Init(tctx *tcontext.Context) error

	// Close closes the CheckPoint
	Close()

	// ResetConn resets database connections owned by the Checkpoint
	ResetConn(tctx *tcontext.Context) error

	// Clear clears all checkpoints
	Clear(tctx *tcontext.Context) error

	// Load loads all checkpoints saved by CheckPoint
	Load(tctx *tcontext.Context) error

	// LoadMeta loads checkpoints from meta config item or file
	LoadMeta(ctx context.Context) error

	// SaveTablePoint saves checkpoint for specified table in memory
	SaveTablePoint(table *filter.Table, point binlog.Location, ti *model.TableInfo)

	// DeleteTablePoint deletes checkpoint for specified table in memory and storage
	DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error

	// DeleteAllTablePoint deletes all checkpoints for table in memory and storage
	DeleteAllTablePoint(tctx *tcontext.Context) error

	// DeleteSchemaPoint deletes checkpoint for specified schema
	DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

	// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
	IsOlderThanTablePoint(table *filter.Table, point binlog.Location) bool

	// SaveGlobalPoint saves the global binlog stream's checkpoint
	// corresponding to Meta.Save
	SaveGlobalPoint(point binlog.Location)

	// SaveGlobalPointForcibly saves the global binlog stream's checkpoint forcibly.
	SaveGlobalPointForcibly(location binlog.Location)

	// Snapshot make a snapshot of current checkpoint. If returns nil, it means nothing has changed since last call.
	Snapshot(isSyncFlush bool) *SnapshotInfo

	// DiscardPendingSnapshots discards all pending snapshots. It's used when we create a snapshot but are unable to
	// call FlushPointsExcept() to flush the snapshot due to some error.
	DiscardPendingSnapshots()

	// FlushPointsExcept flushes the global checkpoint and tables'
	// checkpoints except exceptTables, it also flushes SQLs with Args providing
	// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
	// @exceptTables: [[schema, table]... ]
	// corresponding to Meta.Flush
	FlushPointsExcept(tctx *tcontext.Context, snapshotID int, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error

	// FlushPointsWithTableInfos flushed the table points with given table infos
	FlushPointsWithTableInfos(tctx *tcontext.Context, tables []*filter.Table, tis []*model.TableInfo) error

	// FlushSafeModeExitPoint flushed the global checkpoint's with given table info
	FlushSafeModeExitPoint(tctx *tcontext.Context) error

	// GlobalPoint returns the global binlog stream's checkpoint
	// corresponding to Meta.Pos and Meta.GTID
	GlobalPoint() binlog.Location

	// GlobalPointSaveTime return the global point saved time, used for test only
	GlobalPointSaveTime() time.Time

	// SaveSafeModeExitPoint saves the pointer to location which indicates safe mode exit
	// this location is used when dump unit can't assure consistency
	SaveSafeModeExitPoint(point *binlog.Location)

	// SafeModeExitPoint returns the location where safe mode could safely turn off after
	SafeModeExitPoint() *binlog.Location

	// TablePoint returns all table's stream checkpoint
	TablePoint() map[string]map[string]binlog.Location

	// GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found
	GetTableInfo(schema string, table string) *model.TableInfo

	// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
	// corresponding to to Meta.Pos and gtid
	FlushedGlobalPoint() binlog.Location

	// LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of
	// the two times are outdated, LastFlushOutdated returns true.
	LastFlushOutdated() bool

	// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
	Rollback()

	// String return text of global position
	String() string

	// LoadIntoSchemaTracker loads table infos of all points into schema tracker.
	LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error

	// CheckAndUpdate check the checkpoint data consistency and try to fix them if possible
	CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error
}

CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again.

func NewRemoteCheckPoint

func NewRemoteCheckPoint(
	tctx *tcontext.Context,
	cfg *config.SubTaskConfig,
	metricProxies *metrics.Proxies,
	id string,
) CheckPoint

NewRemoteCheckPoint creates a new RemoteCheckPoint.

type Cond

type Cond struct {
	TargetTbl string
	Columns   []*model.ColumnInfo
	PK        *model.IndexInfo
	PkValues  [][]string
}

func (*Cond) GetArgs

func (c *Cond) GetArgs() []interface{}

func (*Cond) GetWhere

func (c *Cond) GetWhere() string

type DDLWorker

type DDLWorker struct {
	// contains filtered or unexported fields
}

func NewDDLWorker

func NewDDLWorker(pLogger *log.Logger, syncer *Syncer) *DDLWorker

NewDDLWorker creates a new DDLWorker instance.

func (*DDLWorker) AstToDDLEvent

func (ddl *DDLWorker) AstToDDLEvent(qec *queryEventContext, info *ddlInfo) (et bf.EventType)

AstToDDLEvent returns filter.DDLEvent.

func (*DDLWorker) HandleQueryEvent

func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventContext, originSQL string) (err error)

type DMLWorker

type DMLWorker struct {
	// contains filtered or unexported fields
}

DMLWorker is used to sync dml.

type DataValidator

type DataValidator struct {
	// used to sync Stop and Start operations.
	sync.RWMutex

	L log.Logger
	// contains filtered or unexported fields
}

DataValidator is used to continuously validate incremental data migrated to downstream by dm. validator can be start when there's syncer unit in the subtask and validation mode is not none, it's terminated when the subtask is terminated. stage of validator is independent of subtask, pause/resume subtask doesn't affect the stage of validator.

validator can be in running or stopped stage - in running when it's started with subtask or started later on the fly. - in stopped when validation stop is executed.

for each subtask, before it's closed/killed, only one DataValidator object is created, on "dmctl validation stop/start", will call Stop and Start on the same object.

func NewContinuousDataValidator

func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer, startWithSubtask bool) *DataValidator

func (*DataValidator) GetValidatorError

func (v *DataValidator) GetValidatorError(errState pb.ValidateErrorState) ([]*pb.ValidationError, error)

func (*DataValidator) GetValidatorStatus

func (v *DataValidator) GetValidatorStatus() *pb.ValidationStatus

func (*DataValidator) GetValidatorTableStatus

func (v *DataValidator) GetValidatorTableStatus(filterStatus pb.Stage) []*pb.ValidationTableStatus

func (*DataValidator) OperateValidatorError

func (v *DataValidator) OperateValidatorError(validateOp pb.ValidationErrOp, errID uint64, isAll bool) error

func (*DataValidator) Stage

func (v *DataValidator) Stage() pb.Stage

func (*DataValidator) Start

func (v *DataValidator) Start(expect pb.Stage)

func (*DataValidator) Started

func (v *DataValidator) Started() bool

func (*DataValidator) Stop

func (v *DataValidator) Stop()

func (*DataValidator) UpdateValidator

func (v *DataValidator) UpdateValidator(req *pb.UpdateValidationWorkerRequest) error

type ExprFilterGroup

type ExprFilterGroup struct {
	// contains filtered or unexported fields
}

ExprFilterGroup groups many related fields about expression filter.

func NewExprFilterGroup

func NewExprFilterGroup(logCtx *tcontext.Context, tidbCtx sessionctx.Context, exprConfig []*config.ExpressionFilter) *ExprFilterGroup

NewExprFilterGroup creates an ExprFilterGroup.

func (*ExprFilterGroup) GetDeleteExprs

func (g *ExprFilterGroup) GetDeleteExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)

GetDeleteExprs returns the expression filters for given table to filter DELETE events. This function will lazy calculate expressions if not initialized.

func (*ExprFilterGroup) GetInsertExprs

func (g *ExprFilterGroup) GetInsertExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, error)

GetInsertExprs returns the expression filters for given table to filter INSERT events. This function will lazy calculate expressions if not initialized.

func (*ExprFilterGroup) GetUpdateExprs

func (g *ExprFilterGroup) GetUpdateExprs(table *filter.Table, ti *model.TableInfo) ([]expression.Expression, []expression.Expression, error)

GetUpdateExprs returns two lists of expression filters for given table, to filter UPDATE events by old values and new values respectively. The two lists should have same length, and the corresponding expressions is AND logic. This function will lazy calculate expressions if not initialized.

func (*ExprFilterGroup) ResetExprs

func (g *ExprFilterGroup) ResetExprs(table *filter.Table)

ResetExprs deletes the expressions generated before. This should be called after table structure changed.

type Normal

type Normal struct {
	// contains filtered or unexported fields
}

func NewNormalDDL

func NewNormalDDL(pLogger *log.Logger, syncer *Syncer) *Normal

type OptShardingGroup

type OptShardingGroup struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OptShardingGroup represents a optimistic sharding DDL sync group.

func NewOptShardingGroup

func NewOptShardingGroup(firstConflictLocation binlog.Location, flavor string, enableGTID bool) *OptShardingGroup

func (*OptShardingGroup) Remove

func (s *OptShardingGroup) Remove(sourceTableIDs []string)

type OptShardingGroupKeeper

type OptShardingGroupKeeper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OptShardingGroupKeeper used to keep OptShardingGroup. It's used to keep sharding group meta data to make sure optimistic sharding resync redirection works correctly.

                                                 newer
│                       ───────────────────────► time
│
│ tb1 conflict DDL1     │  ▲      │
│                       │  │      │
│       ...             │  │      │
│                       │  │      │
│ tb1 conflict DDL2     │  │      │  ▲     │
│                       │  │      │  │     │
│       ...             │  │      │  │     │
│                       │  │      │  │     │
│ tb2 conflict DDL1     ▼  │      │  │     │
│                                 │  │     │
│       ...           redirect    │  │     │
│                                 │  │     │
│ tb2 conflict DDL2               ▼  │     │
│                                          │
│       ...                     redirect   │
│                                          │
│  other dml events                        ▼
│
│                                       continue
▼                                      replicating

newer binlog One redirection example is listed as above.

func NewOptShardingGroupKeeper

func NewOptShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *OptShardingGroupKeeper

NewOptShardingGroupKeeper creates a new OptShardingGroupKeeper.

func (*OptShardingGroupKeeper) AdjustGlobalLocation

func (k *OptShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location

AdjustGlobalLocation adjusts globalLocation with sharding groups' lowest first point.

func (*OptShardingGroupKeeper) RemoveGroup

func (k *OptShardingGroupKeeper) RemoveGroup(targetTable *filter.Table, sourceTableIDs []string)

func (*OptShardingGroupKeeper) RemoveSchema

func (k *OptShardingGroupKeeper) RemoveSchema(schema string)

func (*OptShardingGroupKeeper) Reset

func (k *OptShardingGroupKeeper) Reset()

Reset resets the keeper.

type Optimist

type Optimist struct {
	// contains filtered or unexported fields
}

func NewOptimistDDL

func NewOptimistDDL(pLogger *log.Logger, syncer *Syncer) *Optimist

type Pessimist

type Pessimist struct {
	// contains filtered or unexported fields
}

func NewPessimistDDL

func NewPessimistDDL(pLogger *log.Logger, syncer *Syncer) *Pessimist

type RemoteCheckPoint

type RemoteCheckPoint struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet it's not thread-safe.

func (*RemoteCheckPoint) CheckAndUpdate

func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error

CheckAndUpdate check the checkpoint data consistency and try to fix them if possible.

func (*RemoteCheckPoint) Clear

func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error

Clear implements CheckPoint.Clear.

func (*RemoteCheckPoint) Close

func (cp *RemoteCheckPoint) Close()

Close implements CheckPoint.Close.

func (*RemoteCheckPoint) DeleteAllTablePoint

func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error

DeleteAllTablePoint implements CheckPoint.DeleteAllTablePoint.

func (*RemoteCheckPoint) DeleteSchemaPoint

func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

DeleteSchemaPoint implements CheckPoint.DeleteSchemaPoint.

func (*RemoteCheckPoint) DeleteTablePoint

func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error

DeleteTablePoint implements CheckPoint.DeleteTablePoint.

func (*RemoteCheckPoint) DiscardPendingSnapshots

func (cp *RemoteCheckPoint) DiscardPendingSnapshots()

DiscardPendingSnapshots discard all pending snapshots.

func (*RemoteCheckPoint) FlushPointsExcept

func (cp *RemoteCheckPoint) FlushPointsExcept(
	tctx *tcontext.Context,
	snapshotID int,
	exceptTables []*filter.Table,
	extraSQLs []string,
	extraArgs [][]interface{},
) error

FlushPointsExcept implements CheckPoint.FlushPointsExcept.

func (*RemoteCheckPoint) FlushPointsWithTableInfos

func (cp *RemoteCheckPoint) FlushPointsWithTableInfos(tctx *tcontext.Context, tables []*filter.Table, tis []*model.TableInfo) error

FlushPointsWithTableInfos implements CheckPoint.FlushPointsWithTableInfos.

func (*RemoteCheckPoint) FlushSafeModeExitPoint

func (cp *RemoteCheckPoint) FlushSafeModeExitPoint(tctx *tcontext.Context) error

FlushSafeModeExitPoint implements CheckPoint.FlushSafeModeExitPoint.

func (*RemoteCheckPoint) FlushedGlobalPoint

func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location

FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint.

func (*RemoteCheckPoint) GetTableInfo

func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo

func (*RemoteCheckPoint) GlobalPoint

func (cp *RemoteCheckPoint) GlobalPoint() binlog.Location

GlobalPoint implements CheckPoint.GlobalPoint.

func (*RemoteCheckPoint) GlobalPointSaveTime

func (cp *RemoteCheckPoint) GlobalPointSaveTime() time.Time

GlobalPointSaveTime implements CheckPoint.GlobalPointSaveTime.

func (*RemoteCheckPoint) Init

func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) (err error)

Init implements CheckPoint.Init.

func (*RemoteCheckPoint) IsOlderThanTablePoint

func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location) bool

IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. This function is used to skip old binlog events. Table checkpoint is saved after dispatching a binlog event.

  • For GTID based and position based replication, DML handling is a bit different but comparison is same here. When using position based, each event has unique position so we have confident to skip event which is <= table checkpoint. When using GTID based, there may be more than one event with same GTID, but we still skip event which is <= table checkpoint, to make this right we only save table point for the transaction affected tables only after the whole transaction is processed
  • DDL will not have unique position or GTID, so we can always skip events <= table checkpoint.

func (*RemoteCheckPoint) LastFlushOutdated

func (cp *RemoteCheckPoint) LastFlushOutdated() bool

LastFlushOutdated implements CheckPoint.LastFlushOutdated.

func (*RemoteCheckPoint) Load

func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error

Load implements CheckPoint.Load.

func (*RemoteCheckPoint) LoadIntoSchemaTracker

func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error

LoadIntoSchemaTracker loads table infos of all points into schema tracker.

func (*RemoteCheckPoint) LoadMeta

func (cp *RemoteCheckPoint) LoadMeta(ctx context.Context) error

LoadMeta implements CheckPoint.LoadMeta.

func (*RemoteCheckPoint) ResetConn

func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error

ResetConn implements CheckPoint.ResetConn.

func (*RemoteCheckPoint) Rollback

func (cp *RemoteCheckPoint) Rollback()

Rollback implements CheckPoint.Rollback.

func (*RemoteCheckPoint) SafeModeExitPoint

func (cp *RemoteCheckPoint) SafeModeExitPoint() *binlog.Location

SafeModeExitPoint implements CheckPoint.SafeModeExitPoint.

func (*RemoteCheckPoint) SaveGlobalPoint

func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location)

SaveGlobalPoint implements CheckPoint.SaveGlobalPoint.

func (*RemoteCheckPoint) SaveGlobalPointForcibly

func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location)

SaveGlobalPointForcibly implements CheckPoint.SaveGlobalPointForcibly.

func (*RemoteCheckPoint) SaveSafeModeExitPoint

func (cp *RemoteCheckPoint) SaveSafeModeExitPoint(point *binlog.Location)

SaveSafeModeExitPoint implements CheckPoint.SaveSafeModeExitPoint shouldn't call concurrently (only called before loop in Syncer.Run and in loop to reset).

func (*RemoteCheckPoint) SaveTablePoint

func (cp *RemoteCheckPoint) SaveTablePoint(table *filter.Table, point binlog.Location, ti *model.TableInfo)

SaveTablePoint implements CheckPoint.SaveTablePoint.

func (*RemoteCheckPoint) Snapshot

func (cp *RemoteCheckPoint) Snapshot(isSyncFlush bool) *SnapshotInfo

Snapshot make a snapshot of checkpoint and return the snapshot info.

func (*RemoteCheckPoint) String

func (cp *RemoteCheckPoint) String() string

String implements CheckPoint.String.

func (*RemoteCheckPoint) TablePoint

func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location

TablePoint implements CheckPoint.TablePoint.

type ShardingGroup

type ShardingGroup struct {
	sync.RWMutex

	IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later
	// contains filtered or unexported fields
}

ShardingGroup represents a sharding DDL sync group.

func NewShardingGroup

func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources []string, meta *shardmeta.ShardingMeta, isSchemaOnly bool, flavor string, enableGTID bool) *ShardingGroup

NewShardingGroup creates a new ShardingGroup.

func (*ShardingGroup) ActiveDDLFirstLocation

func (sg *ShardingGroup) ActiveDDLFirstLocation() (binlog.Location, error)

ActiveDDLFirstLocation returns the first binlog position of active DDL.

func (*ShardingGroup) CheckSyncing

func (sg *ShardingGroup) CheckSyncing(source string, location binlog.Location) (beforeActiveDDL bool)

CheckSyncing checks the source table syncing status returns

beforeActiveDDL: whether the position is before active DDL

func (*ShardingGroup) FirstEndPosUnresolved

func (sg *ShardingGroup) FirstEndPosUnresolved() *binlog.Location

FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil.

func (*ShardingGroup) FirstLocationUnresolved

func (sg *ShardingGroup) FirstLocationUnresolved() *binlog.Location

FirstLocationUnresolved returns the first DDL pos if un-resolved, else nil.

func (*ShardingGroup) FlushData

func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})

FlushData returns sharding meta flush SQLs and args.

func (*ShardingGroup) Leave

func (sg *ShardingGroup) Leave(sources []string) error

Leave leaves from sharding group it, doesn't affect in syncing process used cases

  • drop a database
  • drop table

func (*ShardingGroup) Merge

func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)

Merge merges new sources to exists used cases

  • add a new table to exists sharding group
  • add new table(s) to parent database's sharding group if group is in sequence sharding, return error directly otherwise add it in source, set it false and increment remain

func (*ShardingGroup) Reset

func (sg *ShardingGroup) Reset()

Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it.

func (*ShardingGroup) ResolveShardingDDL

func (sg *ShardingGroup) ResolveShardingDDL() bool

ResolveShardingDDL resolves sharding DDL in sharding group.

func (*ShardingGroup) Sources

func (sg *ShardingGroup) Sources() map[string]bool

Sources returns all sources (and whether synced).

func (*ShardingGroup) String

func (sg *ShardingGroup) String() string

String implements Stringer.String.

func (*ShardingGroup) Tables

func (sg *ShardingGroup) Tables() []*filter.Table

Tables returns all source tables' <schema, table> pair.

func (*ShardingGroup) TrySync

func (sg *ShardingGroup) TrySync(source string, location, endLocation binlog.Location, ddls []string) (bool, bool, int, error)

TrySync tries to sync the sharding group returns

synced: whether the source table's sharding group synced
active: whether the DDL will be processed in this round
remain: remain un-synced source table's count

func (*ShardingGroup) UnresolvedGroupInfo

func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup

UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil.

func (*ShardingGroup) UnresolvedTables

func (sg *ShardingGroup) UnresolvedTables() []*filter.Table

UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil.

type ShardingGroupKeeper

type ShardingGroupKeeper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ShardingGroupKeeper used to keep ShardingGroup.

func NewShardingGroupKeeper

func NewShardingGroupKeeper(
	tctx *tcontext.Context,
	cfg *config.SubTaskConfig,
	metricProxies *metrics.Proxies,
) *ShardingGroupKeeper

NewShardingGroupKeeper creates a new ShardingGroupKeeper.

func (*ShardingGroupKeeper) ActiveDDLFirstLocation

func (k *ShardingGroupKeeper) ActiveDDLFirstLocation(targetTable *filter.Table) (binlog.Location, error)

ActiveDDLFirstLocation returns the binlog position of active DDL.

func (*ShardingGroupKeeper) AddGroup

func (k *ShardingGroupKeeper) AddGroup(targetTable *filter.Table, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)

AddGroup adds new group(s) according to target schema, table and source IDs.

func (*ShardingGroupKeeper) AdjustGlobalLocation

func (k *ShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location

AdjustGlobalLocation adjusts globalLocation with sharding groups' lowest first point.

func (*ShardingGroupKeeper) CheckAndFix

func (k *ShardingGroupKeeper) CheckAndFix(metas map[string]*shardmeta.ShardingMeta, schemaMap map[string]string, tablesMap map[string]map[string]string) error

CheckAndFix try to check and fix the schema/table case-sensitive issue.

NOTE: CheckAndFix is called before sharding groups are inited.

func (*ShardingGroupKeeper) Close

func (k *ShardingGroupKeeper) Close()

Close closes sharding group keeper.

func (*ShardingGroupKeeper) Group

func (k *ShardingGroupKeeper) Group(targetTable *filter.Table) *ShardingGroup

Group returns target table's group, nil if not exist.

func (*ShardingGroupKeeper) Groups

func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup

Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) InSyncing

func (k *ShardingGroupKeeper) InSyncing(sourceTable, targetTable *filter.Table, location binlog.Location) bool

InSyncing checks whether the source is in sharding syncing, that is to say not before active DDL.

func (*ShardingGroupKeeper) Init

func (k *ShardingGroupKeeper) Init() (err error)

Init does initialization staff.

func (*ShardingGroupKeeper) LeaveGroup

func (k *ShardingGroupKeeper) LeaveGroup(targetTable *filter.Table, sources []string) error

LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process.

func (*ShardingGroupKeeper) LoadShardMeta

func (k *ShardingGroupKeeper) LoadShardMeta(flavor string, enableGTID bool) (map[string]*shardmeta.ShardingMeta, error)

LoadShardMeta implements CheckPoint.LoadShardMeta.

func (*ShardingGroupKeeper) PrepareFlushSQLs

func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})

PrepareFlushSQLs returns all sharding meta flushed SQLs except for given table IDs.

func (*ShardingGroupKeeper) ResetGroups

func (k *ShardingGroupKeeper) ResetGroups()

ResetGroups resets group's sync status.

func (*ShardingGroupKeeper) ResolveShardingDDL

func (k *ShardingGroupKeeper) ResolveShardingDDL(targetTable *filter.Table) (bool, error)

ResolveShardingDDL resolves one sharding DDL in specific group.

func (*ShardingGroupKeeper) TrySync

func (k *ShardingGroupKeeper) TrySync(
	sourceTable, targetTable *filter.Table, location, endLocation binlog.Location, ddls []string) (
	needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error,
)

TrySync tries to sync the sharding group returns

isSharding: whether the source table is in a sharding group
group: the sharding group
synced: whether the source table's sharding group synced
active: whether is active DDL in sequence sharding DDL
remain: remain un-synced source table's count

func (*ShardingGroupKeeper) UnresolvedGroups

func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup

UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) UnresolvedTables

func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, []*filter.Table)

UnresolvedTables returns

all `target-schema.target-table` that has unresolved sharding DDL,
all source tables which with DDLs are un-resolved

NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress.

type ShardingReSync

type ShardingReSync struct {
	// contains filtered or unexported fields
}

ShardingReSync represents re-sync info for a sharding DDL group.

func (*ShardingReSync) String

func (s *ShardingReSync) String() string

String implements stringer.String.

type SnapshotInfo

type SnapshotInfo struct {
	// contains filtered or unexported fields
}

SnapshotInfo contains: - checkpoint snapshot id, it's for retrieving checkpoint snapshot in flush phase - global checkpoint position, it's for updating current active relay log after checkpoint flush.

type Syncer

type Syncer struct {
	sync.RWMutex

	// `lower_case_table_names` setting of upstream db
	SourceTableNamesFlavor conn.LowerCaseTableNamesFlavor
	// contains filtered or unexported fields
}

Syncer can sync your MySQL data to another MySQL database.

func NewSyncer

func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay relay.Process) *Syncer

NewSyncer creates a new Syncer.

func (*Syncer) CheckCanUpdateCfg

func (s *Syncer) CheckCanUpdateCfg(newCfg *config.SubTaskConfig) error

CheckCanUpdateCfg check if task config can be updated. 1. task must not in a pessimistic ddl state. 2. only balist, route/filter rules and syncerConfig can be updated at this moment. 3. some config fields from sourceCfg also can be updated, see more in func `copyConfigFromSource`.

func (*Syncer) Close

func (s *Syncer) Close()

Close closes syncer.

func (*Syncer) HandleError

func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (string, error)

HandleError handle error for syncer.

func (*Syncer) Init

func (s *Syncer) Init(ctx context.Context) (err error)

Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.

func (*Syncer) IsFreshTask

func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error)

IsFreshTask implements Unit.IsFreshTask.

func (*Syncer) IsRunning

func (s *Syncer) IsRunning() bool

func (*Syncer) Kill

func (s *Syncer) Kill()

Kill kill syncer without graceful.

func (*Syncer) OperateSchema

func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (msg string, err error)

OperateSchema operates schema for an upstream table.

func (*Syncer) Pause

func (s *Syncer) Pause()

Pause implements Unit.Pause.

func (*Syncer) Process

func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)

Process implements the dm.Unit interface.

func (*Syncer) Resume

func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)

Resume resumes the paused process.

func (*Syncer) Run

func (s *Syncer) Run(ctx context.Context) (err error)

Run starts running for sync, we should guarantee it can rerun when paused.

func (*Syncer) ShardDDLOperation

func (s *Syncer) ShardDDLOperation() *pessimism.Operation

ShardDDLOperation returns the current pending to handle shard DDL lock operation.

func (*Syncer) Status

func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{}

Status implements Unit.Status.

func (*Syncer) Type

func (s *Syncer) Type() pb.UnitType

Type implements Unit.Type.

func (*Syncer) Update

func (s *Syncer) Update(ctx context.Context, cfg *config.SubTaskConfig) error

Update implements Unit.Update now, only support to update config for routes, filters, column-mappings, block-allow-list now no config diff implemented, so simply re-init use new config.

type TestInjector

type TestInjector struct {
	// contains filtered or unexported fields
}

TestInjector is used to support inject test cases into syncer. In some cases, we use failpoint to control the test flow, but we may need to control the flow based on some previous status, so we add this TestInjector to record these status. NOTE: if HTTP for failpoint works well, then we may remove this.

Directories

Path Synopsis
Package binlogstream is used by syncer to read binlog.
Package binlogstream is used by syncer to read binlog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL