Documentation ¶
Index ¶
- func AddDifferentFieldLenColumns(lockID, ddl string, oldJoined, newJoined schemacmp.Table) (string, error)
- func CheckColumns(cli *clientv3.Client, source string, schemaMap map[string]string, ...) error
- func CheckDDLInfos(cli *clientv3.Client, source string, schemaMap map[string]string, ...) error
- func CheckOperations(cli *clientv3.Client, source string, schemaMap map[string]string, ...) error
- func CheckSourceTables(cli *clientv3.Client, source string, schemaMap map[string]string, ...) error
- func ClearTestInfoOperationColumn(cli *clientv3.Client) error
- func DeleteDroppedColumns(cli *clientv3.Client, lockID string, columns ...string) (rev int64, deleted bool, err error)
- func DeleteInfosOperationsColumns(cli *clientv3.Client, infos []Info, ops []Operation, lockID string) (int64, bool, error)
- func DeleteInfosOperationsTablesByTable(cli *clientv3.Client, task, source, upSchema, upTable, lockID string, ...) (int64, error)
- func DeleteInfosOperationsTablesByTask(cli *clientv3.Client, task string, lockIDSet map[string]struct{}) (int64, error)
- func DeleteInfosOperationsTablesByTaskAndSource(cli *clientv3.Client, task string, sources []string, ...) (int64, error)
- func DeleteSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)
- func DiffSourceTables(oldST, newST SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})
- func GetAllDroppedColumns(cli *clientv3.Client) (map[string]map[string]map[string]map[string]map[string]DropColumnStage, int64, ...)
- func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Info, int64, error)
- func GetAllOperations(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Operation, int64, error)
- func GetAllSourceTables(cli *clientv3.Client) (map[string]map[string]SourceTables, int64, error)
- func GetColumnName(lockID, ddl string, tp ast.AlterTableType) (string, error)
- func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)
- func PutDroppedColumns(cli *clientv3.Client, lockID, source, upSchema, upTable string, cols []string, ...) (int64, bool, error)
- func PutInfo(cli *clientv3.Client, info Info) (int64, error)
- func PutOperation(cli *clientv3.Client, skipDone bool, op Operation, infoModRev int64) (rev int64, putted bool, err error)
- func PutSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)
- func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, ...)
- func WatchOperationPut(ctx context.Context, cli *clientv3.Client, ...)
- func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64, ...)
- type ConflictStage
- type DownstreamMeta
- type DropColumnStage
- type Info
- type Lock
- func (l *Lock) AddDroppedColumns(source, schema, table string, cols []string) error
- func (l *Lock) AddTable(source, schema, table string, needLock bool)
- func (l *Lock) DeleteColumnsByOp(op Operation) error
- func (l *Lock) FetchTableInfos(task, source, schema, table string) (*model.TableInfo, error)
- func (l *Lock) GetVersion(source string, schema string, table string) int64
- func (l *Lock) HasTables() bool
- func (l *Lock) IsDone(source, schema, table string) bool
- func (l *Lock) IsDroppedColumn(source, upSchema, upTable, col string) bool
- func (l *Lock) IsResolved() bool
- func (l *Lock) IsSynced() (bool, int)
- func (l *Lock) Joined() (schemacmp.Table, error)
- func (l *Lock) Ready() map[string]map[string]map[string]bool
- func (l *Lock) TryMarkDone(source, schema, table string) bool
- func (l *Lock) TryRemoveTable(source, schema, table string) []string
- func (l *Lock) TryRemoveTableBySources(sources []string) []string
- func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, cols []string, err error)
- func (l *Lock) UpdateTableAfterUnlock(info Info)
- type LockKeeper
- func (lk *LockKeeper) Clear()
- func (lk *LockKeeper) FindLock(lockID string) *Lock
- func (lk *LockKeeper) FindLockByInfo(info Info) *Lock
- func (lk *LockKeeper) FindLocksByTask(task string) []*Lock
- func (lk *LockKeeper) Locks() map[string]*Lock
- func (lk *LockKeeper) RemoveDownstreamMeta(task string)
- func (lk *LockKeeper) RemoveLock(lockID string) bool
- func (lk *LockKeeper) RemoveLockByInfo(info Info) bool
- func (lk *LockKeeper) SetDropColumns(...)
- func (lk *LockKeeper) TrySync(cli *clientv3.Client, info Info, tts []TargetTable) (string, []string, []string, error)
- type LogInfo
- type OldInfo
- type Operation
- type RouteTable
- type SourceTables
- type TableKeeper
- func (tk *TableKeeper) AddTable(task, source, upSchema, upTable, downSchema, downTable string) bool
- func (tk *TableKeeper) FindTables(task, downSchema, downTable string) []TargetTable
- func (tk *TableKeeper) Init(stm map[string]map[string]SourceTables)
- func (tk *TableKeeper) RemoveTable(task, source, upSchema, upTable, downSchema, downTable string) bool
- func (tk *TableKeeper) RemoveTableByTask(task string) bool
- func (tk *TableKeeper) RemoveTableByTaskAndSources(task string, sources []string)
- func (tk *TableKeeper) SourceTableExist(task, source, upSchema, upTable, downSchema, downTable string) bool
- func (tk *TableKeeper) Update(st SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})
- type TargetTable
- type TargetTableSlice
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddDifferentFieldLenColumns ¶
func AddDifferentFieldLenColumns(lockID, ddl string, oldJoined, newJoined schemacmp.Table) (string, error)
AddDifferentFieldLenColumns checks whether dm adds columns with different field lengths.
func CheckColumns ¶
func CheckColumns(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error
CheckColumns try to check and fix all the schema and table names for delete columns infos.
func CheckDDLInfos ¶
func CheckDDLInfos(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error
CheckDDLInfos try to check and fix all the schema and table names for DDL info.
func CheckOperations ¶
func CheckOperations(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error
CheckOperations try to check and fix all the schema and table names for operation infos.
func CheckSourceTables ¶
func CheckSourceTables(cli *clientv3.Client, source string, schemaMap map[string]string, talesMap map[string]map[string]string) error
CheckSourceTables try to check and fix all the source schemas and table names.
func ClearTestInfoOperationColumn ¶
ClearTestInfoOperationColumns is used to clear all shard DDL information in optimism mode. it only used for testing now.
func DeleteDroppedColumns ¶
func DeleteDroppedColumns(cli *clientv3.Client, lockID string, columns ...string) (rev int64, deleted bool, err error)
DeleteDroppedColumns tries to delete the partially dropped columns for the specified lock ID. Only when this column is fully dropped in downstream database, in other words, **we receive all `Done` operation from dm-worker**, we can delete this column's name from the etcd.
func DeleteInfosOperationsColumns ¶
func DeleteInfosOperationsColumns(cli *clientv3.Client, infos []Info, ops []Operation, lockID string) (int64, bool, error)
DeleteInfosOperationsColumns deletes the shard DDL infos, operations, and dropped columns in etcd. This function should often be called by DM-master when removing the lock. Only delete when all info's version are greater or equal to etcd's version, otherwise it means new info was putted into etcd before.
func DeleteInfosOperationsTablesByTable ¶
func DeleteInfosOperationsTablesByTable(cli *clientv3.Client, task, source, upSchema, upTable, lockID string, dropCols []string) (int64, error)
DeleteInfosOperationsTablesByTable deletes the shard DDL infos and operations in etcd by table This function should often be called by DM-master when drop a table.
func DeleteInfosOperationsTablesByTask ¶
func DeleteInfosOperationsTablesByTask(cli *clientv3.Client, task string, lockIDSet map[string]struct{}) (int64, error)
DeleteInfosOperationsTablesByTask deletes the shard DDL infos and operations in etcd. This function should often be called by DM-master when stop a task for all sources.
func DeleteInfosOperationsTablesByTaskAndSource ¶
func DeleteInfosOperationsTablesByTaskAndSource(cli *clientv3.Client, task string, sources []string, dropColumns map[string][]string) (int64, error)
DeleteInfosOperationsTablesByTaskAndSource deletes the shard DDL infos and operations in etcd by task and source. This function should often be called by DM-master when stop a task for sources.
func DeleteSourceTables ¶
func DeleteSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)
DeleteSourceTables deletes the source tables in etcd. This function should often be called by DM-worker.
func DiffSourceTables ¶
func DiffSourceTables(oldST, newST SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})
func GetAllDroppedColumns ¶
func GetAllDroppedColumns(cli *clientv3.Client) (map[string]map[string]map[string]map[string]map[string]DropColumnStage, int64, error)
GetAllDroppedColumns gets the all partially dropped columns. return lockID -> column-name -> source-id -> upstream-schema-name -> upstream-table-name.
func GetAllInfo ¶
func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Info, int64, error)
GetAllInfo gets all shard DDL info in etcd currently. This function should often be called by DM-master. k/k/k/k/v: task-name -> source-ID -> upstream-schema-name -> upstream-table-name -> shard DDL info. ugly code, but have no better idea now.
func GetAllOperations ¶
func GetAllOperations(cli *clientv3.Client) (map[string]map[string]map[string]map[string]Operation, int64, error)
GetAllOperations gets all shard DDL operation in etcd currently. This function should often be called by DM-master. k/k/k/k/v: task-name -> source-ID -> upstream-schema-name -> upstream-table-name -> shard DDL operation.
func GetAllSourceTables ¶
GetAllSourceTables gets all source tables in etcd currently. This function should often be called by DM-master. k/k/v: task-name -> source-ID -> source tables.
func GetColumnName ¶
func GetColumnName(lockID, ddl string, tp ast.AlterTableType) (string, error)
GetColumnName checks whether dm adds/drops a column, and return this column's name.
func GetInfosOperationsByTask ¶
func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)
GetInfosOperationsByTask gets all shard DDL info and operation in etcd currently. This function should often be called by DM-master.
func PutDroppedColumns ¶
func PutDroppedColumns(cli *clientv3.Client, lockID, source, upSchema, upTable string, cols []string, done DropColumnStage) (int64, bool, error)
PutDroppedColumn puts the partially dropped column names into ectd. When we drop a column, we save this column's name in etcd.
func PutInfo ¶
PutInfo puts the shard DDL info into etcd. NOTE:
In some cases before the lock resolved, the same DDL info may be PUT multiple times: 1. start-task after stop-task. 2. resume-task after paused manually or automatically. 3. the task scheduled to another DM-worker instance (just like case-1). Then we need to ensure re-PUT is safe: 1. DM-master can construct the lock and do the coordination correctly. 2. DM-worker can re-PUT and comply with the coordination correctly.
This function should often be called by DM-worker.
func PutOperation ¶
func PutOperation(cli *clientv3.Client, skipDone bool, op Operation, infoModRev int64) (rev int64, putted bool, err error)
PutOperation puts the shard DDL operation into etcd.
func PutSourceTables ¶
func PutSourceTables(cli *clientv3.Client, st SourceTables) (int64, error)
PutSourceTables puts source tables into etcd. This function should often be called by DM-worker.
func WatchInfo ¶
func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, errCh chan<- error, )
WatchInfo watches PUT & DELETE operations for info. This function should often be called by DM-master.
func WatchOperationPut ¶
func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source, upSchema, upTable string, revision int64, outCh chan<- Operation, errCh chan<- error, )
WatchOperationPut watches PUT operations for DDL lock operation. If want to watch all operations matching, pass empty string for `task`, `source`, `upSchema` and `upTable`. This function can be called by DM-worker and DM-master.
func WatchSourceTables ¶
func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- SourceTables, errCh chan<- error, )
WatchSourceTables watches PUT & DELETE operations for source tables. This function should often be called by DM-master.
Types ¶
type ConflictStage ¶
type ConflictStage string
ConflictStage represents the current shard DDL conflict stage in the optimistic mode.
const ( // ConflictNone indicates no conflict exists, // DM-worker can execute DDL/DML to the downstream normally. ConflictNone ConflictStage = "none" // ConflictDetected indicates a conflict will exist after applied the shard DDL. // in this stage, DM-worker should not execute/skip DDL/DML, // but it should still try to find the DDL which can resolve the conflict in the binlog stream. ConflictDetected ConflictStage = "detected" // ConflictResolved indicates a conflict DDL be resolved. // in this stage, DM-worker should redirect to the conflict DDL. ConflictResolved ConflictStage = "resolved" // ConflictUnlocked indicates a conflict will be unlocked after applied the shard DDL. // in this stage, DM-worker should directly execute/skip DDLs. ConflictUnlocked ConflictStage = "unlocked" // ConflictSkipWaitRedirect indicates a conflict happened and will be skipped and redirected until all tables has no conflict // in this stage, DM-worker should skip all DML and DDL for the conflict table until redirect. ConflictSkipWaitRedirect ConflictStage = "skip and wait for redirect" // #nosec // ConflictError indicates an error happened when we try to sync the DDLs // in this stage, DM-worker should retry and can skip ddls for this error. ConflictError ConflictStage = "error" )
type DownstreamMeta ¶
type DownstreamMeta struct {
// contains filtered or unexported fields
}
DownstreamMeta used to fetch table info from downstream.
type DropColumnStage ¶
type DropColumnStage int
DropColumnStage represents whether drop column done for a sharding table.
const ( // DropNotDone represents master haven't received done for the col. DropNotDone DropColumnStage = iota // DropPartiallyDone represents master receive done for the col. DropPartiallyDone // DropDone represents master receive done and ddl for the col(executed in downstream). DropDone )
type Info ¶
type Info struct { Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID UpSchema string `json:"up-schema"` // upstream/source schema name, different sources can have the same schema name UpTable string `json:"up-table"` // upstream/source table name, different sources can have the same table name DownSchema string `json:"down-schema"` // downstream/target schema name DownTable string `json:"down-table"` // downstream/target table name DDLs []string `json:"ddls"` // DDL statements TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs TableInfosAfter []*model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the Info has been deleted in etcd. IsDeleted bool `json:"-"` // only set it when get/watch from etcd Version int64 `json:"-"` // only set it when get from etcd // use for sort infos in recoverlock Revision int64 `json:"-"` // use to resolve conflict IgnoreConflict bool `json:"ignore-conflict"` }
Info represents the shard DDL information. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task`, `Source`, `UpSchema` and `DownTable` are redundant in the etcd key path for convenient. Info is putted when receiving a shard DDL for a table in DM-worker, and is deleted when removing the lock by DM-master because we need the newest schema in Info to recover the lock when restarting DM-master. when new Info is putted to overwrite the old one, the DM-master should update the lock based on the new one.
func NewInfo ¶
func NewInfo(task, source, upSchema, upTable, downSchema, downTable string, ddls []string, tableInfoBefore *model.TableInfo, tableInfosAfter []*model.TableInfo, ) Info
NewInfo creates a new Info instance.
func (*Info) ShortString ¶
ShortString returns short string of Info.
type Lock ¶
type Lock struct { ID string // lock's ID Task string // lock's corresponding task name DownSchema string // downstream schema name DownTable string // downstream table name // contains filtered or unexported fields }
Lock represents the shard DDL lock in memory. This information does not need to be persistent, and can be re-constructed from the shard DDL info.
func NewLock ¶
func NewLock(cli *clientv3.Client, id, task, downSchema, downTable string, initTable schemacmp.Table, tts []TargetTable, downstreamMeta *DownstreamMeta) *Lock
NewLock creates a new Lock instance.
func (*Lock) AddDroppedColumns ¶
AddDroppedColumns adds a dropped column name in both etcd and lock's column map.
func (*Lock) DeleteColumnsByOp ¶
DeleteColumnsByOp deletes the partially dropped columns that extracted from operation. We can not remove columns from the partially dropped columns map unless: this column is dropped in the downstream database, all the upstream source done the delete column operation that is to say, columns all done.
func (*Lock) FetchTableInfos ¶
FetchTableInfos fetch all table infos for a lock.
func (*Lock) GetVersion ¶
GetVersion return version of info in lock.
func (*Lock) IsDroppedColumn ¶
IsDroppedColumn checks whether this column is a partially dropped column for this lock.
func (*Lock) IsResolved ¶
IsResolved returns whether the lock has resolved. return true if all tables have the same schema and all DDLs operations have done.
func (*Lock) IsSynced ¶
IsSynced returns whether the lock has synced. In the optimistic mode, we call it `synced` if table info of all tables are the same, and we define `remain` as the table count which have different table info with the joined one, e.g. for `ADD COLUMN`, it's the table count which have not added the column, for `DROP COLUMN`, it's the table count which have dropped the column.
func (*Lock) Ready ¶
Ready returns the source tables' sync status (whether they are ready). we define `ready` if the table's info is the same with the joined one, e.g for `ADD COLUMN`, it's true if it has added the column, for `DROP COLUMN`, it's true if it has not dropped the column.
func (*Lock) TryMarkDone ¶
TryMarkDone tries to mark the operation of the source table as done. it returns whether marked done. NOTE: this method can always mark a existing table as done, so the caller of this method should ensure that the table has done the DDLs operation. NOTE: a done table may revert to not-done if new table schema received and new DDLs operation need to be done.
func (*Lock) TryRemoveTable ¶
TryRemoveTable tries to remove a table in the lock. it returns whether the table has been removed. TODO: it does NOT try to rebuild the joined schema after the table removed now. try to support this if needed later. NOTE: if no table exists in the lock after removed the table, it's the caller's responsibility to decide whether remove the lock or not.
func (*Lock) TryRemoveTableBySources ¶
TryRemoveTable tries to remove tables in the lock by sources. return drop columns for later use.
func (*Lock) TrySync ¶
TrySync tries to sync the lock, re-entrant. new upstream sources may join when the DDL lock is in syncing, so we need to merge these new sources. NOTE: now, any error returned, we treat it as conflict detected. NOTE: now, DDLs (not empty) returned when resolved the conflict, but in fact these DDLs should not be replicated to the downstream. NOTE: now, `TrySync` can detect and resolve conflicts in both of the following modes:
- non-intrusive: update the schema of non-conflict tables to match the conflict tables. data from conflict tables are non-intrusive.
- intrusive: revert the schema of the conflict tables to match the non-conflict tables. data from conflict tables are intrusive.
TODO: but both of these modes are difficult to be implemented in DM-worker now, try to do that later. for non-intrusive, a broadcast mechanism needed to notify conflict tables after the conflict has resolved, or even a block mechanism needed. for intrusive, a DML prune or transform mechanism needed for two different schemas (before and after the conflict resolved).
func (*Lock) UpdateTableAfterUnlock ¶
UpdateTableAfterUnlock updates table's schema info after unlock exec action.
type LockKeeper ¶
type LockKeeper struct {
// contains filtered or unexported fields
}
LockKeeper used to keep and handle DDL lock conveniently. The lock information do not need to be persistent, and can be re-constructed from the shard DDL info. But the drop columns should be persistent.
func NewLockKeeper ¶
func NewLockKeeper(getDownstreamMetaFunc func(string) (*dbconfig.DBConfig, string)) *LockKeeper
NewLockKeeper creates a new LockKeeper instance.
func (*LockKeeper) FindLock ¶
func (lk *LockKeeper) FindLock(lockID string) *Lock
FindLock finds a lock.
func (*LockKeeper) FindLockByInfo ¶
func (lk *LockKeeper) FindLockByInfo(info Info) *Lock
FindLockByInfo finds a lock with a shard DDL info.
func (*LockKeeper) FindLocksByTask ¶
func (lk *LockKeeper) FindLocksByTask(task string) []*Lock
FindLocksByTask finds locks by task.
func (*LockKeeper) Locks ¶
func (lk *LockKeeper) Locks() map[string]*Lock
Locks return a copy of all Locks.
func (*LockKeeper) RemoveDownstreamMeta ¶
func (lk *LockKeeper) RemoveDownstreamMeta(task string)
RemoveDownstreamMeta removes downstream mate by task.
func (*LockKeeper) RemoveLock ¶
func (lk *LockKeeper) RemoveLock(lockID string) bool
RemoveLock removes a lock.
func (*LockKeeper) RemoveLockByInfo ¶
func (lk *LockKeeper) RemoveLockByInfo(info Info) bool
RemoveLockByInfo removes a lock.
func (*LockKeeper) SetDropColumns ¶
func (lk *LockKeeper) SetDropColumns(dropColumns map[string]map[string]map[string]map[string]map[string]DropColumnStage)
SetDropColumns set drop columns for lock keeper.
type LogInfo ¶
type LogInfo struct { Task string `json:"task"` Source string `json:"source"` UpSchema string `json:"up-schema"` UpTable string `json:"up-table"` DownSchema string `json:"down-schema"` DownTable string `json:"down-table"` DDLs []string `json:"ddls"` TableBefore string `json:"table-before"` TableAfter string `json:"table-after"` IsDeleted bool `json:"is-deleted"` Version int64 `json:"version"` Revision int64 `json:"revision"` IgnoreConflict bool `json:"ignore-conflict"` }
LogInfo replace TableInfo with schema.Table.String() for log.
type OldInfo ¶
type OldInfo struct { Task string `json:"task"` Source string `json:"source"` UpSchema string `json:"up-schema"` UpTable string `json:"up-table"` DownSchema string `json:"down-schema"` DownTable string `json:"down-table"` DDLs []string `json:"ddls"` TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs TableInfoAfter *model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs }
OldInfo represents info in etcd before v2.0.2.
type Operation ¶
type Operation struct { ID string `json:"id"` // the corresponding DDL lock ID Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID UpSchema string `json:"up-schema"` // upstream/source schema name, different sources can have the same schema name UpTable string `json:"up-table"` // upstream/source table name, different sources can have the same table name DDLs []string `json:"ddls"` // DDL statements need to apply to the downstream. ConflictStage ConflictStage `json:"conflict-stage"` // current conflict stage. ConflictMsg string `json:"conflict-message"` // current conflict message Done bool `json:"done"` // whether the operation has done Cols []string `json:"cols"` // drop columns' name // only set it when get from etcd // use for sort infos in recovering locks Revision int64 `json:"-"` }
Operation represents a shard DDL coordinate operation. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task`, `Source`, `UpSchema` and `UpTable` are redundant in the etcd key path for convenient. Operation is putted when coordinating a shard DDL operation for DM-worker by DM-master, and is updated (with `done`) after DM-worker has done the operation by DM-worker, and is deleted when removing the lock by DM-master. because we need the newest stage in Operation to recover the lock when restarting DM-master.
func GetOperation ¶
func GetOperation(cli *clientv3.Client, task, source, upSchema, upTable string) (Operation, int64, error)
GetOperation gets shard DDL operation in etcd currently. This function should often be called by DM-worker. (task-name, source-ID, upstream-schema-name, upstream-table-name) -> shard DDL operation.
func NewOperation ¶
func NewOperation(id, task, source, upSchema, upTable string, ddls []string, conflictStage ConflictStage, conflictMsg string, done bool, cols []string, ) Operation
NewOperation creates a new Operation instance.
type RouteTable ¶
RouteTable represents a table in upstream/downstream.
type SourceTables ¶
type SourceTables struct { Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID // downstream-schema-name -> downstream-table-name -> upstream-schema-name -> upstream-table-name -> struct{}, // multiple downstream/target tables (<downstream-schema-name, downstream-table-name> pair) may exist in one subtask. Tables map[string]map[string]map[string]map[string]struct{} `json:"tables"` // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the SourceTables has been deleted in etcd. IsDeleted bool `json:"-"` }
SourceTables represents the upstream/sources tables for a data migration **subtask**. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. We need this because only one shard group exists for **every** target table in the optimistic mode (in DM-master), so we need DM-worker to report its upstream table names to DM-master. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient. SourceTables is putted when starting the subtask by DM-worker, and is updated when new tables added/removed in the upstream source by DM-worker, and **may be** deleted when stopping the subtask by DM-worker later.
func NewSourceTables ¶
func NewSourceTables(task, source string) SourceTables
NewSourceTables creates a new SourceTables instances.
func (*SourceTables) AddTable ¶
func (st *SourceTables) AddTable(upSchema, upTable, downSchema, downTable string) bool
AddTable adds a table into SourceTables. it returns whether added (not exist before).
func (*SourceTables) RemoveTable ¶
func (st *SourceTables) RemoveTable(upSchema, upTable, downSchema, downTable string) bool
RemoveTable removes a table from SourceTables. it returns whether removed (exist before).
func (SourceTables) String ¶
func (st SourceTables) String() string
String implements Stringer interface.
func (*SourceTables) TargetTable ¶
func (st *SourceTables) TargetTable(downSchema, downTable string) TargetTable
TargetTable returns a TargetTable instance for a specified downstream table, returns an empty TargetTable instance if no tables exist.
type TableKeeper ¶
type TableKeeper struct {
// contains filtered or unexported fields
}
TableKeeper used to keep initial tables for a task in optimism mode.
func NewTableKeeper ¶
func NewTableKeeper() *TableKeeper
NewTableKeeper creates a new TableKeeper instance.
func (*TableKeeper) AddTable ¶
func (tk *TableKeeper) AddTable(task, source, upSchema, upTable, downSchema, downTable string) bool
AddTable adds a table into the source tables. it returns whether added (not exist before). NOTE: we only add for existing task now.
func (*TableKeeper) FindTables ¶
func (tk *TableKeeper) FindTables(task, downSchema, downTable string) []TargetTable
FindTables finds source tables by task name and downstream table name.
func (*TableKeeper) Init ¶
func (tk *TableKeeper) Init(stm map[string]map[string]SourceTables)
Init (re-)initializes the keeper with initial source tables.
func (*TableKeeper) RemoveTable ¶
func (tk *TableKeeper) RemoveTable(task, source, upSchema, upTable, downSchema, downTable string) bool
RemoveTable removes a table from the source tables. it returns whether removed (exit before).
func (*TableKeeper) RemoveTableByTask ¶
func (tk *TableKeeper) RemoveTableByTask(task string) bool
RemoveTableByTask removes tables from the source tables through task name. it returns whether removed (exit before).
func (*TableKeeper) RemoveTableByTaskAndSources ¶
func (tk *TableKeeper) RemoveTableByTaskAndSources(task string, sources []string)
RemoveTableByTaskAndSource removes tables from the source tables through task name and sources.
func (*TableKeeper) SourceTableExist ¶
func (tk *TableKeeper) SourceTableExist(task, source, upSchema, upTable, downSchema, downTable string) bool
SourceTableExist check whether a source table exist.
func (*TableKeeper) Update ¶
func (tk *TableKeeper) Update(st SourceTables) (map[RouteTable]struct{}, map[RouteTable]struct{})
Update adds/updates tables into the keeper or removes tables from the keeper. it returns the newly added and dropped tables.
type TargetTable ¶
type TargetTable struct { Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID DownSchema string `json:"down-schema"` // downstream schema name DownTable string `json:"down-table"` // downstream table name // upstream-schema-name -> upstream-table-name -> struct{} UpTables map[string]map[string]struct{} `json:"up-tables"` }
TargetTable represents some upstream/sources tables for **one** target table. It is often generated from `SourceTables` for the specified downstream table.
func TargetTablesForTask ¶
func TargetTablesForTask(task, downSchema, downTable string, stm map[string]map[string]SourceTables) []TargetTable
TargetTablesForTask returns TargetTable list for a specified task and downstream table. stm: task name -> upstream source ID -> SourceTables.
func (TargetTable) IsEmpty ¶
func (tt TargetTable) IsEmpty() bool
IsEmpty returns whether the TargetTable instance is empty.
type TargetTableSlice ¶
type TargetTableSlice []TargetTable
TargetTableSlice attaches the methods of Interface to []TargetTable, sorting in increasing order according to `Source` field.
func (TargetTableSlice) Less ¶
func (t TargetTableSlice) Less(i, j int) bool
Less implements Sorter.Less.
func (TargetTableSlice) Swap ¶
func (t TargetTableSlice) Swap(i, j int)
Swap implements Sorter.Swap.