Documentation ¶
Index ¶
- func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) (int64, error)
- func DeleteInfosOperationsByTask(cli *clientv3.Client, task string) (int64, error)
- func DeleteOperations(cli *clientv3.Client, ops ...Operation) (int64, error)
- func GetAllInfo(cli *clientv3.Client) (map[string]map[string]Info, int64, error)
- func GetAllOperations(cli *clientv3.Client) (map[string]map[string]Operation, int64, error)
- func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)
- func PutInfo(cli *clientv3.Client, info Info) (int64, error)
- func PutInfoIfOpNotDone(cli *clientv3.Client, info Info) (rev int64, putted bool, err error)
- func PutOperationDeleteExistInfo(cli *clientv3.Client, op Operation, info Info) (done bool, rev int64, err error)
- func PutOperations(cli *clientv3.Client, skipDone bool, ops ...Operation) (rev int64, putted bool, err error)
- func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, ...)
- func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, ...)
- func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, ...)
- type Info
- type Lock
- func (l *Lock) ForceSynced()
- func (l *Lock) IsDone(source string) bool
- func (l *Lock) IsResolved() bool
- func (l *Lock) IsSynced() (bool, int)
- func (l *Lock) MarkDone(source string)
- func (l *Lock) Ready() map[string]bool
- func (l *Lock) RevertSynced(sources []string)
- func (l *Lock) TrySync(caller string, ddls, sources []string) (bool, int, error)
- type LockKeeper
- func (lk *LockKeeper) Clear()
- func (lk *LockKeeper) FindLock(lockID string) *Lock
- func (lk *LockKeeper) Locks() map[string]*Lock
- func (lk *LockKeeper) RemoveLock(lockID string) bool
- func (lk *LockKeeper) RemoveLockByInfo(info Info) bool
- func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, error)
- type Operation
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DeleteInfosOperations ¶
DeleteInfosOperations deletes the shard DDL infos and operations in etcd. This function should often be called by DM-master when calling UnlockDDL.
func DeleteInfosOperationsByTask ¶
DeleteInfosOperationsByTask deletes the shard DDL infos and operations of a specified task in etcd. This function should often be called by DM-master when deleting ddl meta data.
func DeleteOperations ¶
DeleteOperations deletes the shard DDL operations in etcd. This function should often be called by DM-master.
func GetAllInfo ¶
GetAllInfo gets all shard DDL info in etcd currently. k/k/v: task-name -> source-ID -> DDL info. This function should often be called by DM-master.
func GetAllOperations ¶
GetAllOperations gets all DDL lock operation in etcd currently. k/k/v: task-name -> source-ID -> lock operation.
func GetInfosOperationsByTask ¶
func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)
GetInfosOperationsByTask gets all DDL lock infos and operations in etcd currently.
func PutInfo ¶
PutInfo puts the shard DDL info into etcd. NOTE:
In some cases before the lock resolved, the same DDL info may be PUT multiple times: 1. start-task after stop-task. 2. resume-task after paused manually or automatically. 3. the task scheduled to another DM-worker instance (just like case-1). Then we need to ensure re-PUT is safe: 1. DM-master can construct the lock and do the coordination correctly. 2. DM-worker can re-PUT and comply with the coordination correctly.
This function should often be called by DM-worker.
func PutInfoIfOpNotDone ¶
PutInfoIfOpNotDone puts the shard DDL info into etcd if the operation not exists or not `done`.
func PutOperationDeleteExistInfo ¶
func PutOperationDeleteExistInfo(cli *clientv3.Client, op Operation, info Info) (done bool, rev int64, err error)
PutOperationDeleteExistInfo puts an operation and deletes an info in one txn, if the info exists in etcd before. This function should often be called by DM-worker.
func PutOperations ¶
func PutOperations(cli *clientv3.Client, skipDone bool, ops ...Operation) (rev int64, putted bool, err error)
PutOperations puts the shard DDL operations into etcd. if `skipDone` is `true`:
- PUT: all of kvs ("not exist" or "the `done` field is not `true`")
- skip PUT: any of kvs ("exist" and "the `done` field is `true`")
NOTE:
`clientv3.Value` has a strange behavior for *not-exist* kv, see https://github.com/etcd-io/etcd/issues/10566. In addition, etcd compare has no `OR` operator now, see https://github.com/etcd-io/etcd/issues/10571. So, it's hard to do our `skipDone` logic in one txn. We break the logic into two txn, but this may lead to problem when PUT operations concurrently.
This function should often be called by DM-master.
func WatchInfoPut ¶
func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, errCh chan<- error)
WatchInfoPut watches PUT operations for info. This function should often be called by DM-master.
func WatchOperationDelete ¶
func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error)
WatchOperationDelete watches DELETE operations for DDL lock operation. If want to watch all operations, pass empty string for `task` and `source`. This function is often called by DM-worker.
func WatchOperationPut ¶
func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error)
WatchOperationPut watches PUT operations for DDL lock operation. If want to watch all operations, pass empty string for `task` and `source`. This function can be called by DM-worker and DM-master. TODO(csuzhangxc): report error and do some retry.
Types ¶
type Info ¶
type Info struct { Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID Schema string `json:"schema"` // schema name of the DDL Table string `json:"table"` // table name of the DDL DDLs []string `json:"ddls"` // DDL statements }
Info represents the shard DDL information. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.
type Lock ¶
type Lock struct { ID string // lock's ID Task string // lock's corresponding task name Owner string // Owner's source ID (not DM-worker's name) DDLs []string // DDL statements // contains filtered or unexported fields }
Lock represents the shard DDL lock in memory. This information does not need to be persistent, and can be re-constructed from the shard DDL info.
func (*Lock) ForceSynced ¶
func (l *Lock) ForceSynced()
ForceSynced forces to mark the lock as synced.
func (*Lock) IsResolved ¶
IsResolved returns whether the lock has resolved (all operations have done).
func (*Lock) MarkDone ¶
MarkDone marks the operation of the source as done. NOTE: we do not support revert the `done` after marked now.
func (*Lock) RevertSynced ¶
RevertSynced reverts the synced stage of the sources.
type LockKeeper ¶
type LockKeeper struct {
// contains filtered or unexported fields
}
LockKeeper used to keep and handle DDL lock conveniently. The lock information do not need to be persistent, and can be re-constructed from the shard DDL info.
func NewLockKeeper ¶
func NewLockKeeper() *LockKeeper
NewLockKeeper creates a new LockKeeper instance.
func (*LockKeeper) FindLock ¶
func (lk *LockKeeper) FindLock(lockID string) *Lock
FindLock finds a lock.
func (*LockKeeper) Locks ¶
func (lk *LockKeeper) Locks() map[string]*Lock
Locks return a copy of all Locks.
func (*LockKeeper) RemoveLock ¶
func (lk *LockKeeper) RemoveLock(lockID string) bool
RemoveLock removes a lock.
func (*LockKeeper) RemoveLockByInfo ¶
func (lk *LockKeeper) RemoveLockByInfo(info Info) bool
RemoveLockByInfo removes a lock through given info.
type Operation ¶
type Operation struct { ID string `json:"id"` // the corresponding DDL lock ID Task string `json:"task"` // data migration task name Source string `json:"source"` // upstream source ID DDLs []string `json:"ddls"` // DDL statements Exec bool `json:"exec"` // execute or skip the DDL statements Done bool `json:"done"` // whether the `Exec` operation has done // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the Operation has been deleted in etcd. IsDeleted bool `json:"-"` }
Operation represents a shard DDL coordinate operation. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.
func NewOperation ¶
NewOperation creates a new Operation instance.