Documentation ¶
Index ¶
- Constants
- Variables
- func ClearTestInfoOperation(cli *clientv3.Client) error
- func DelLoadTask(cli *clientv3.Client, task, sourceID string) (int64, bool, error)
- func DelLoadTaskByTask(cli *clientv3.Client, task string) (int64, bool, error)
- func DeleteAllTaskCliArgs(cli *clientv3.Client, taskName string) error
- func DeleteLightningStatusForTask(cli *clientv3.Client, task string) (int64, error)
- func DeleteOpenAPITaskTemplate(cli *clientv3.Client, taskName string) error
- func DeleteRelayConfig(cli *clientv3.Client, workers ...string) (int64, error)
- func DeleteRelayStage(cli *clientv3.Client, source string) (int64, error)
- func DeleteSourceBound(cli *clientv3.Client, workers ...string) (int64, error)
- func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker string) (int64, error)
- func DeleteSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, ...) (int64, error)
- func DeleteSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error)
- func DeleteTaskCliArgs(cli *clientv3.Client, taskName string, sources []string) error
- func DeleteWorkerInfoRelayConfig(cli *clientv3.Client, worker string) (int64, error)
- func GetAllLightningStatus(cli *clientv3.Client, task string) ([]string, error)
- func GetAllLoadTask(cli *clientv3.Client) (map[string]map[string]string, int64, error)
- func GetAllOpenAPITaskTemplate(cli *clientv3.Client) ([]*openapi.Task, error)
- func GetAllRelayConfig(cli *clientv3.Client) (map[string]map[string]struct{}, int64, error)
- func GetAllRelayStage(cli *clientv3.Client) (map[string]Stage, int64, error)
- func GetAllSourceCfgBeforeV202(cli *clientv3.Client) (map[string]*config.SourceConfig, int64, error)
- func GetAllSubTaskCfg(cli *clientv3.Client) (map[string]map[string]config.SubTaskConfig, int64, error)
- func GetAllSubTaskStage(cli *clientv3.Client) (map[string]map[string]Stage, int64, error)
- func GetAllValidatorStage(cli *clientv3.Client) (map[string]map[string]Stage, int64, error)
- func GetAllWorkerInfo(cli *clientv3.Client) (map[string]WorkerInfo, int64, error)
- func GetKeepAliveWorkers(cli *clientv3.Client) (map[string]WorkerEvent, int64, error)
- func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, error)
- func GetLoadTask(cli *clientv3.Client, task, sourceID string) (string, int64, error)
- func GetOpenAPITaskTemplate(cli *clientv3.Client, taskName string) (*openapi.Task, error)
- func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig, int64, error)
- func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound, int64, error)
- func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]*config.SourceConfig, int64, error)
- func GetSubTaskCfg(cli *clientv3.Client, source, task string, rev int64) (map[string]config.SubTaskConfig, int64, error)
- func GetSubTaskStage(cli *clientv3.Client, source, task string) (map[string]Stage, int64, error)
- func GetSubTaskStageConfig(cli *clientv3.Client, source string) (map[string]Stage, map[string]Stage, map[string]config.SubTaskConfig, int64, ...)
- func GetTaskCliArgs(cli *clientv3.Client, taskName, source string) (*config.TaskCliArgs, error)
- func GetValidatorStage(cli *clientv3.Client, source, task string, revision int64) (map[string]Stage, int64, error)
- func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, ...) error
- func PutLightningNotReadyForAllSources(cli *clientv3.Client, task string, sources []string) (int64, error)
- func PutLightningStatus(cli *clientv3.Client, task, sourceID, status string) (int64, error)
- func PutLoadTask(cli *clientv3.Client, task, sourceID, worker string) (int64, error)
- func PutOpenAPITaskTemplate(cli *clientv3.Client, task openapi.Task, overWrite bool) error
- func PutRelayConfig(cli *clientv3.Client, source string, workers ...string) (int64, error)
- func PutRelayStage(cli *clientv3.Client, stages ...Stage) (int64, error)
- func PutRelayStageRelayConfigSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error)
- func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error)
- func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error)
- func PutSourceCfg(cli *clientv3.Client, cfg *config.SourceConfig) (int64, error)
- func PutSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, ...) (int64, error)
- func PutSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error)
- func PutTaskCliArgs(cli *clientv3.Client, taskName string, sources []string, ...) error
- func PutWorkerInfo(cli *clientv3.Client, info WorkerInfo) (int64, error)
- func ReplaceSourceBound(cli *clientv3.Client, source, oldWorker, newWorker string) (int64, error)
- func UpdateOpenAPITaskTemplate(cli *clientv3.Client, task openapi.Task) error
- func WatchLoadTask(ctx context.Context, cli *clientv3.Client, revision int64, ...)
- func WatchRelayConfig(ctx context.Context, cli *clientv3.Client, worker string, revision int64, ...)
- func WatchRelayStage(ctx context.Context, cli *clientv3.Client, source string, revision int64, ...)
- func WatchSourceBound(ctx context.Context, cli *clientv3.Client, worker string, revision int64, ...)
- func WatchSubTaskStage(ctx context.Context, cli *clientv3.Client, source string, revision int64, ...)
- func WatchValidatorStage(ctx context.Context, cli *clientv3.Client, source string, rev int64, ...)
- func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outCh chan<- WorkerEvent, ...)
- type LoadTask
- type RelaySource
- type SourceBound
- type Stage
- type WorkerEvent
- type WorkerInfo
Constants ¶
const ( LightningNotReady = "not-ready" LightningReady = "ready" LightningFinished = "finished" )
Simple status transition to make all lightning synchronized in physical import mode. When DM-master creates the task, it will write LightningNotReady for all subtasks. When DM-worker enters load unit, it will mark itself as LightningReady. When all DM-workers are ready, lightning can be started so its parallel import can work. When lightning is finished and its returned error is also resolved, it will mark itself as LightningFinished. When all DM-workers are finished, sync unit can be continued.
Variables ¶
var ( // KeepAliveUpdateCh is used to notify keepalive TTL changing, in order to let watcher not see a DELETE of old key. KeepAliveUpdateCh = make(chan int64, 10) )
Functions ¶
func ClearTestInfoOperation ¶
ClearTestInfoOperation is used to clear all DM-HA relative etcd keys' information this function shouldn't be used in development environment.
func DelLoadTask ¶
DelLoadTask dels the worker in load stage for the source of the subtask. k/v: (task, sourceID) -> worker.
func DelLoadTaskByTask ¶
DelLoadTaskByTask del the worker in load stage for the source by task.
func DeleteAllTaskCliArgs ¶
DeleteAllTaskCliArgs deleted the command line arguments of this task.
func DeleteLightningStatusForTask ¶
DeleteLightningStatusForTask deletes the status for all sources of the task.
func DeleteOpenAPITaskTemplate ¶
DeleteOpenAPITaskTemplate deletes the openapi task config of task-name.
func DeleteRelayConfig ¶
DeleteRelayConfig deletes the relay config for given workers.
func DeleteRelayStage ¶
DeleteRelayStage deleted the relay stage of this source.
func DeleteSourceBound ¶
DeleteSourceBound deletes the bound relationship in etcd for the specified worker.
func DeleteSourceCfgRelayStageSourceBound ¶
func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker string) (int64, error)
DeleteSourceCfgRelayStageSourceBound deletes the following data in one txn. - upstream source config. - relay stage. - source bound relationship.
func DeleteSubTaskCfgStage ¶
func DeleteSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, validatorStages []Stage) (int64, error)
DeleteSubTaskCfgStage deletes the following data in one txn. - subtask config. - subtask stage. NOTE: golang can't use two `...` in the func, so use `[]` instead.
func DeleteSubTaskStage ¶
DeleteSubTaskStage deletes the subtask stage.
func DeleteTaskCliArgs ¶
DeleteTaskCliArgs deleted the command line arguments of this task.
func DeleteWorkerInfoRelayConfig ¶
DeleteWorkerInfoRelayConfig deletes the specified DM-worker information and its relay config.
func GetAllLightningStatus ¶
GetAllLightningStatus gets the status for all source of the task.
func GetAllLoadTask ¶
GetAllLoadTask gets all the worker which in load stage. k/v: (task, sourceID) -> worker-name.
func GetAllOpenAPITaskTemplate ¶
GetAllOpenAPITaskTemplate gets all openapi task config s.
func GetAllRelayConfig ¶
GetAllRelayConfig gets all source and its relay worker. k/v: source ID -> set(workers).
func GetAllRelayStage ¶
GetAllRelayStage gets all relay stages. k/v: source ID -> relay stage.
func GetAllSourceCfgBeforeV202 ¶
func GetAllSourceCfgBeforeV202(cli *clientv3.Client) (map[string]*config.SourceConfig, int64, error)
GetAllSourceCfgBeforeV202 gets all upstream source configs before v2.0.2. This func only use for config export command.
func GetAllSubTaskCfg ¶
func GetAllSubTaskCfg(cli *clientv3.Client) (map[string]map[string]config.SubTaskConfig, int64, error)
GetAllSubTaskCfg gets all subtask configs. k/v: source ID -> task name -> subtask config.
func GetAllSubTaskStage ¶
GetAllSubTaskStage gets all subtask stages. k/v: source ID -> task name -> subtask stage.
func GetAllValidatorStage ¶
func GetAllWorkerInfo ¶
GetAllWorkerInfo gets all DM-worker info in etcd currently. k/v: worker-name -> worker information.
func GetKeepAliveWorkers ¶
GetKeepAliveWorkers gets current alive workers, and returns a map{workerName: WorkerEvent}, revision and error.
func GetLastSourceBounds ¶
GetLastSourceBounds gets all last source bound relationship. Different with GetSourceBound, "last source bound" will not be deleted when worker offline.
func GetLoadTask ¶
GetLoadTask gets the worker which in load stage for the source of the subtask. k/v: (task, sourceID) -> worker-name.
func GetOpenAPITaskTemplate ¶
GetOpenAPITaskTemplate gets the openapi task config of task-name.
func GetRelayConfig ¶
GetRelayConfig returns the source config which the given worker need to pull relay log from etcd, with revision.
func GetSourceBound ¶
GetSourceBound gets the source bound relationship for the specified DM-worker. if the bound relationship for the worker name not exist, return with `err == nil`. if the worker name is "", it will return all bound relationships as a map{worker-name: bound}. if the worker name is given, it will return a map{worker-name: bound} whose length is 1.
func GetSourceCfg ¶
func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]*config.SourceConfig, int64, error)
GetSourceCfg gets upstream source configs. k/v: source ID -> source config. if the source config for the sourceID not exist, return with `err == nil`. if the source name is "", it will return all source configs as a map{sourceID: config}. if the source name is given, it will return a map{sourceID: config} whose length is 1.
func GetSubTaskCfg ¶
func GetSubTaskCfg(cli *clientv3.Client, source, task string, rev int64) (map[string]config.SubTaskConfig, int64, error)
GetSubTaskCfg gets the subtask config of the specified source and task name. if the config for the source not exist, return with `err == nil` and `revision=0`. if task name is "", will return all the subtaskConfigs as a map{taskName: subtaskConfig} of the source if task name if given, will return a map{taskName: subtaskConfig} whose length is 1.
func GetSubTaskStage ¶
GetSubTaskStage gets the subtask stage for the specified upstream source and task name. if the stage for the source and task name not exist, return with `err == nil` and `revision=0`. if task name is "", it will return all subtasks' stage as a map{task-name: stage} for the source. if task name is given, it will return a map{task-name: stage} whose length is 1.
func GetSubTaskStageConfig ¶
func GetSubTaskStageConfig(cli *clientv3.Client, source string) (map[string]Stage, map[string]Stage, map[string]config.SubTaskConfig, int64, error)
GetSubTaskStageConfig gets source's subtask stages and configs at the same time source **must not be empty** return map{task name -> subtask stage}, map{task name -> validator stage}, map{task name -> subtask config}, revision, error.
func GetTaskCliArgs ¶
GetTaskCliArgs gets the command line arguments for the specified task.
func GetValidatorStage ¶
func KeepAlive ¶
func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error
KeepAlive puts the join time of the workerName into etcd. this key will be kept in etcd until the worker is blocked or failed k/v: workerName -> join time.
func PutLightningNotReadyForAllSources ¶
func PutLightningNotReadyForAllSources(cli *clientv3.Client, task string, sources []string) (int64, error)
PutLightningNotReadyForAllSources puts LightningNotReady for all sources of the subtask. This function should be called by DM-master.
func PutLightningStatus ¶
PutLightningStatus puts the status for the source of the subtask. k/v: (task, sourceID) -> status. This function should be called by DM-worker.
func PutLoadTask ¶
PutLoadTask puts the worker which load stage for the source of the subtask. k/v: (task, sourceID) -> worker. This function should often be called by DM-worker.
func PutOpenAPITaskTemplate ¶
PutOpenAPITaskTemplate puts the openapi task config of task-name.
func PutRelayConfig ¶
PutRelayConfig puts the relay config for given workers. k/v: worker-name -> source-id. TODO: let caller wait until worker has enabled relay.
func PutRelayStage ¶
PutRelayStage puts the stage of the relay into etcd. k/v: sourceID -> the running stage of the relay.
func PutRelayStageRelayConfigSourceBound ¶
func PutRelayStageRelayConfigSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error)
PutRelayStageRelayConfigSourceBound puts the following data in one txn. - relay stage. - relay config for a worker - source bound relationship.
func PutRelayStageSourceBound ¶
PutRelayStageSourceBound puts the following data in one txn. - relay stage. - source bound relationship.
func PutSourceBound ¶
func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error)
PutSourceBound puts the bound relationship into etcd. k/v: worker-name -> bound relationship.
func PutSourceCfg ¶
PutSourceCfg puts the config of the upstream source into etcd. k/v: sourceID -> source config.
func PutSubTaskCfgStage ¶
func PutSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, validatorStages []Stage) (int64, error)
PutSubTaskCfgStage puts the following data in one txn. - subtask config. - subtask stage. NOTE: golang can't use two `...` in the func, so use `[]` instead.
func PutSubTaskStage ¶
PutSubTaskStage puts the stage of the subtask into etcd. k/v: sourceID, task -> the running stage of the subtask.
func PutTaskCliArgs ¶
func PutTaskCliArgs(cli *clientv3.Client, taskName string, sources []string, args config.TaskCliArgs) error
PutTaskCliArgs puts TaskCliArgs into etcd.
func PutWorkerInfo ¶
func PutWorkerInfo(cli *clientv3.Client, info WorkerInfo) (int64, error)
PutWorkerInfo puts the DM-worker info into etcd. k/v: worker-name -> worker information.
func ReplaceSourceBound ¶
ReplaceSourceBound deletes an old bound and puts a new bound in one transaction, so a bound source will not become unbound because of failing halfway.
func UpdateOpenAPITaskTemplate ¶
UpdateOpenAPITaskTemplate updates the openapi task config by task-name.
func WatchLoadTask ¶
func WatchLoadTask(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- LoadTask, errCh chan<- error, )
WatchLoadTask watches PUT & DELETE operations for worker in load stage. This function should often be called by DM-master.
func WatchRelayConfig ¶
func WatchRelayConfig(ctx context.Context, cli *clientv3.Client, worker string, revision int64, outCh chan<- RelaySource, errCh chan<- error, )
WatchRelayConfig watches PUT & DELETE operations for the relay relationship of the specified DM-worker. For the DELETE operations, it returns an nil source config.
func WatchRelayStage ¶
func WatchRelayStage(ctx context.Context, cli *clientv3.Client, source string, revision int64, outCh chan<- Stage, errCh chan<- error, )
WatchRelayStage watches PUT & DELETE operations for the relay stage. for the DELETE stage, it returns an empty stage.
func WatchSourceBound ¶
func WatchSourceBound(ctx context.Context, cli *clientv3.Client, worker string, revision int64, outCh chan<- SourceBound, errCh chan<- error)
WatchSourceBound watches PUT & DELETE operations for the bound relationship of the specified DM-worker. For the DELETE operations, it returns an empty bound relationship. nolint:dupl
func WatchSubTaskStage ¶
func WatchSubTaskStage(ctx context.Context, cli *clientv3.Client, source string, revision int64, outCh chan<- Stage, errCh chan<- error, )
WatchSubTaskStage watches PUT & DELETE operations for the subtask stage. for the DELETE stage, it returns an empty stage.
func WatchValidatorStage ¶
func WatchWorkerEvent ¶
func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outCh chan<- WorkerEvent, errCh chan<- error)
WatchWorkerEvent watches the online and offline of workers from etcd. this function will output the worker event to evCh, output the error to errCh.
Types ¶
type RelaySource ¶
type RelaySource struct { Source string // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the bound has been deleted in etcd. IsDeleted bool // record the etcd ModRevision of this bound Revision int64 }
RelaySource represents the bound relationship between the DM-worker instance and its upstream relay source.
type SourceBound ¶
type SourceBound struct { Source string `json:"source"` // the source ID of the upstream. Worker string `json:"worker"` // the name of the bound DM-worker for the source. // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the bound has been deleted in etcd. IsDeleted bool `json:"-"` // record the etcd Revision of this bound Revision int64 `json:"-"` }
SourceBound represents the bound relationship between the DM-worker instance and the upstream MySQL source.
func GetSourceBoundConfig ¶
func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error)
GetSourceBoundConfig gets the source bound relationship and relative source config at the same time for the specified DM-worker. The index worker **must not be empty**: if source bound is empty, will return an empty sourceBound and an empty source config if source bound is not empty but sourceConfig is empty, will return an error if the source bound is different for over retryNum times, will return an error.
func NewSourceBound ¶
func NewSourceBound(source, worker string) SourceBound
NewSourceBound creates a new SourceBound instance.
func (SourceBound) IsEmpty ¶
func (b SourceBound) IsEmpty() bool
IsEmpty returns true when this bound has no value.
func (SourceBound) String ¶
func (b SourceBound) String() string
String implements Stringer interface.
type Stage ¶
type Stage struct { Expect pb.Stage `json:"expect"` // the expectant stage. Source string `json:"source"` // the source ID of the upstream. Task string `json:"task,omitempty"` // the task name for subtask; empty for relay. // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the stage has been deleted in etcd. IsDeleted bool `json:"-"` // record the etcd Revision of this Stage Revision int64 `json:"-"` }
Stage represents the running stage for a relay or subtask.
func GetRelayStage ¶
GetRelayStage gets the relay stage for the specified upstream source. if the stage for the source not exist, return with `err == nil` and `revision=0`.
func NewRelayStage ¶
NewRelayStage creates a new Stage instance for relay.
func NewSubTaskStage ¶
NewSubTaskStage creates a new Stage instance for subtask.
type WorkerEvent ¶
type WorkerEvent struct { WorkerName string `json:"worker-name"` // the worker name of the worker. JoinTime time.Time `json:"join-time"` // the time when worker start to keepalive with etcd // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the worker has been deleted in etcd. IsDeleted bool `json:"-"` }
WorkerEvent represents the PUT/DELETE keepalive event of DM-worker.
func (WorkerEvent) String ¶
func (w WorkerEvent) String() string
String implements Stringer interface.
type WorkerInfo ¶
type WorkerInfo struct { Name string `json:"name"` // the name of the node. Addr string `json:"addr"` // the client address of the node to advertise. }
WorkerInfo represents the node information of the DM-worker.
func NewWorkerInfo ¶
func NewWorkerInfo(name, addr string) WorkerInfo
NewWorkerInfo creates a new WorkerInfo instance.
func (WorkerInfo) String ¶
func (i WorkerInfo) String() string
String implements Stringer interface.