pessimism

package
v0.0.0-...-6a53270 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeleteInfosOperations

func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) (int64, error)

DeleteInfosOperations deletes the shard DDL infos and operations in etcd. This function should often be called by DM-master when calling UnlockDDL.

func DeleteInfosOperationsByTask

func DeleteInfosOperationsByTask(cli *clientv3.Client, task string) (int64, error)

DeleteInfosOperationsByTask deletes the shard DDL infos and operations of a specified task in etcd. This function should often be called by DM-master when deleting ddl meta data.

func DeleteOperations

func DeleteOperations(cli *clientv3.Client, ops ...Operation) (int64, error)

DeleteOperations deletes the shard DDL operations in etcd. This function should often be called by DM-master.

func GetAllInfo

func GetAllInfo(cli *clientv3.Client) (map[string]map[string]Info, int64, error)

GetAllInfo gets all shard DDL info in etcd currently. k/k/v: task-name -> source-ID -> DDL info. This function should often be called by DM-master.

func GetAllOperations

func GetAllOperations(cli *clientv3.Client) (map[string]map[string]Operation, int64, error)

GetAllOperations gets all DDL lock operation in etcd currently. k/k/v: task-name -> source-ID -> lock operation.

func GetInfosOperationsByTask

func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Operation, int64, error)

GetInfosOperationsByTask gets all DDL lock infos and operations in etcd currently.

func PutInfo

func PutInfo(cli *clientv3.Client, info Info) (int64, error)

PutInfo puts the shard DDL info into etcd. NOTE:

In some cases before the lock resolved, the same DDL info may be PUT multiple times:
  1. start-task after stop-task.
  2. resume-task after paused manually or automatically.
  3. the task scheduled to another DM-worker instance (just like case-1).
Then we need to ensure re-PUT is safe:
  1. DM-master can construct the lock and do the coordination correctly.
  2. DM-worker can re-PUT and comply with the coordination correctly.

This function should often be called by DM-worker.

func PutInfoIfOpNotDone

func PutInfoIfOpNotDone(cli *clientv3.Client, info Info) (rev int64, putted bool, err error)

PutInfoIfOpNotDone puts the shard DDL info into etcd if the operation not exists or not `done`.

func PutOperationDeleteExistInfo

func PutOperationDeleteExistInfo(cli *clientv3.Client, op Operation, info Info) (done bool, rev int64, err error)

PutOperationDeleteExistInfo puts an operation and deletes an info in one txn, if the info exists in etcd before. This function should often be called by DM-worker.

func PutOperations

func PutOperations(cli *clientv3.Client, skipDone bool, ops ...Operation) (rev int64, putted bool, err error)

PutOperations puts the shard DDL operations into etcd. if `skipDone` is `true`:

  • PUT: all of kvs ("not exist" or "the `done` field is not `true`")
  • skip PUT: any of kvs ("exist" and "the `done` field is `true`")

NOTE:

`clientv3.Value` has a strange behavior for *not-exist* kv,
see https://github.com/etcd-io/etcd/issues/10566.
In addition, etcd compare has no `OR` operator now,
see https://github.com/etcd-io/etcd/issues/10571.
So, it's hard to do our `skipDone` logic in one txn.
We break the logic into two txn, but this may lead to problem when PUT operations concurrently.

This function should often be called by DM-master.

func WatchInfoPut

func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, errCh chan<- error)

WatchInfoPut watches PUT operations for info. This function should often be called by DM-master.

func WatchOperationDelete

func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error)

WatchOperationDelete watches DELETE operations for DDL lock operation. If want to watch all operations, pass empty string for `task` and `source`. This function is often called by DM-worker.

func WatchOperationPut

func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error)

WatchOperationPut watches PUT operations for DDL lock operation. If want to watch all operations, pass empty string for `task` and `source`. This function can be called by DM-worker and DM-master. TODO(csuzhangxc): report error and do some retry.

Types

type Info

type Info struct {
	Task   string   `json:"task"`   // data migration task name
	Source string   `json:"source"` // upstream source ID
	Schema string   `json:"schema"` // schema name of the DDL
	Table  string   `json:"table"`  // table name of the DDL
	DDLs   []string `json:"ddls"`   // DDL statements
}

Info represents the shard DDL information. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.

func NewInfo

func NewInfo(task, source, schema, table string, ddls []string) Info

NewInfo creates a new Info instance.

func (Info) String

func (i Info) String() string

String implements Stringer interface.

type Lock

type Lock struct {
	ID    string   // lock's ID
	Task  string   // lock's corresponding task name
	Owner string   // Owner's source ID (not DM-worker's name)
	DDLs  []string // DDL statements
	// contains filtered or unexported fields
}

Lock represents the shard DDL lock in memory. This information does not need to be persistent, and can be re-constructed from the shard DDL info.

func NewLock

func NewLock(id, task, owner string, ddls, sources []string) *Lock

NewLock creates a new Lock instance.

func (*Lock) ForceSynced

func (l *Lock) ForceSynced()

ForceSynced forces to mark the lock as synced.

func (*Lock) IsDone

func (l *Lock) IsDone(source string) bool

IsDone returns whether the operation has done.

func (*Lock) IsResolved

func (l *Lock) IsResolved() bool

IsResolved returns whether the lock has resolved (all operations have done).

func (*Lock) IsSynced

func (l *Lock) IsSynced() (bool, int)

IsSynced returns whether the lock has synced.

func (*Lock) MarkDone

func (l *Lock) MarkDone(source string)

MarkDone marks the operation of the source as done. NOTE: we do not support revert the `done` after marked now.

func (*Lock) Ready

func (l *Lock) Ready() map[string]bool

Ready returns the sources sync status or whether they are ready.

func (*Lock) RevertSynced

func (l *Lock) RevertSynced(sources []string)

RevertSynced reverts the synced stage of the sources.

func (*Lock) TrySync

func (l *Lock) TrySync(caller string, ddls, sources []string) (bool, int, error)

TrySync tries to sync the lock, does decrease on remain, re-entrant. new upstream sources may join when the DDL lock is in syncing, so we need to merge these new sources.

type LockKeeper

type LockKeeper struct {
	// contains filtered or unexported fields
}

LockKeeper used to keep and handle DDL lock conveniently. The lock information do not need to be persistent, and can be re-constructed from the shard DDL info.

func NewLockKeeper

func NewLockKeeper() *LockKeeper

NewLockKeeper creates a new LockKeeper instance.

func (*LockKeeper) Clear

func (lk *LockKeeper) Clear()

Clear clears all Locks.

func (*LockKeeper) FindLock

func (lk *LockKeeper) FindLock(lockID string) *Lock

FindLock finds a lock.

func (*LockKeeper) Locks

func (lk *LockKeeper) Locks() map[string]*Lock

Locks return a copy of all Locks.

func (*LockKeeper) RemoveLock

func (lk *LockKeeper) RemoveLock(lockID string) bool

RemoveLock removes a lock.

func (*LockKeeper) RemoveLockByInfo

func (lk *LockKeeper) RemoveLockByInfo(info Info) bool

RemoveLockByInfo removes a lock through given info.

func (*LockKeeper) TrySync

func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, error)

TrySync tries to sync the lock.

type Operation

type Operation struct {
	ID     string   `json:"id"`     // the corresponding DDL lock ID
	Task   string   `json:"task"`   // data migration task name
	Source string   `json:"source"` // upstream source ID
	DDLs   []string `json:"ddls"`   // DDL statements
	Exec   bool     `json:"exec"`   // execute or skip the DDL statements
	Done   bool     `json:"done"`   // whether the `Exec` operation has done

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the Operation has been deleted in etcd.
	IsDeleted bool `json:"-"`
}

Operation represents a shard DDL coordinate operation. This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed. NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.

func NewOperation

func NewOperation(id, task, source string, ddls []string, exec, done bool) Operation

NewOperation creates a new Operation instance.

func (Operation) String

func (o Operation) String() string

String implements Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL