Documentation ¶
Index ¶
- type ReleaseFunc
- type Scheduler
- func (s *Scheduler) AcquireSubtaskLatch(name string) (ReleaseFunc, error)
- func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error
- func (s *Scheduler) AddSourceCfgWithWorker(cfg *config.SourceConfig, workerName string) error
- func (s *Scheduler) AddSubTasks(latched bool, expectStage pb.Stage, cfgs ...config.SubTaskConfig) error
- func (s *Scheduler) AddWorker(name, addr string) error
- func (s *Scheduler) BatchOperateTaskOnWorker(ctx context.Context, worker *Worker, tasks []string, source string, ...) error
- func (s *Scheduler) BoundSources() []string
- func (s *Scheduler) Close()
- func (s *Scheduler) CloseAllWorkers()
- func (s *Scheduler) GetALlSubTaskCfgs() map[string]map[string]*config.SubTaskConfig
- func (s *Scheduler) GetAllWorkers() ([]*Worker, error)
- func (s *Scheduler) GetDownstreamMetaByTask(task string) (*dbconfig.DBConfig, string)
- func (s *Scheduler) GetExpectRelayStage(source string) ha.Stage
- func (s *Scheduler) GetExpectSubTaskStage(task, source string) ha.Stage
- func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error)
- func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig
- func (s *Scheduler) GetSourceCfgIDs() []string
- func (s *Scheduler) GetSourceCfgs() map[string]*config.SourceConfig
- func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig
- func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTaskConfig
- func (s *Scheduler) GetSubTaskCfgsByTaskAndSource(taskName string, sources []string) map[string]map[string]config.SubTaskConfig
- func (s *Scheduler) GetTaskNameListBySourceName(sourceName string, expectStage *pb.Stage) []string
- func (s *Scheduler) GetValidatorStage(task, source string) *ha.Stage
- func (s *Scheduler) GetWorkerByName(name string) *Worker
- func (s *Scheduler) GetWorkerBySource(source string) *Worker
- func (s *Scheduler) OperateValidationTask(validatorStages []ha.Stage, changedSubtaskCfgs []config.SubTaskConfig) error
- func (s *Scheduler) RemoveLoadTaskAndLightningStatus(task string) error
- func (s *Scheduler) RemoveSourceCfg(source string) error
- func (s *Scheduler) RemoveSubTasks(task string, sources ...string) error
- func (s *Scheduler) RemoveWorker(name string) error
- func (s *Scheduler) SetWorkerClientForTest(name string, mockCli workerrpc.Client)
- func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err error)
- func (s *Scheduler) StartRelay(source string, workers []string) error
- func (s *Scheduler) Started() bool
- func (s *Scheduler) StopRelay(source string, workers []string) error
- func (s *Scheduler) TransferSource(ctx context.Context, source, worker string) error
- func (s *Scheduler) TryResolveLoadTask(sources []string)
- func (s *Scheduler) UnboundSources() []string
- func (s *Scheduler) UpdateExpectRelayStage(newStage pb.Stage, sources ...string) error
- func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string, sources ...string) error
- func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error
- func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskConfig) error
- func (s *Scheduler) ValidatorEnabled(task, source string) bool
- type Worker
- func (w *Worker) BaseInfo() ha.WorkerInfo
- func (w *Worker) Bound() ha.SourceBound
- func (w *Worker) Close()
- func (w *Worker) RelaySourceID() string
- func (w *Worker) SendRequest(ctx context.Context, req *workerrpc.Request, d time.Duration) (*workerrpc.Response, error)
- func (w *Worker) Stage() WorkerStage
- func (w *Worker) StartRelay(sourceID string) error
- func (w *Worker) StopRelay()
- func (w *Worker) ToBound(bound ha.SourceBound) error
- func (w *Worker) ToFree()
- func (w *Worker) ToOffline()
- func (w *Worker) Unbound() error
- type WorkerStage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ReleaseFunc ¶
type ReleaseFunc func()
ReleaseFunc wraps on releasing a latch. It is safe to call multiple times. Also compiler can warn you of not used ReleaseFunc variables.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler schedules tasks for DM-worker instances, including: - register/unregister DM-worker instances. - observe the online/offline status of DM-worker instances. - observe add/remove operations for upstream sources' config. - schedule upstream sources to DM-worker instances. - schedule data migration subtask operations. - holds agents of DM-worker instances. NOTE: the DM-master server MUST wait for this scheduler become started before handling client requests. Cases trigger a source-to-worker bound try: - a worker from Offline to Free:
- receive keep-alive.
- a worker from Bound to Free:
- trigger by unbound: `a source removed`.
- a new source added:
- add source request from user.
- a source unbound from another worker:
- trigger by unbound: `a worker from Bound to Offline`.
- TODO(csuzhangxc): design a strategy to ensure the old worker already shutdown its work.
Cases trigger a source-to-worker unbound try. - a worker from Bound to Offline:
- lost keep-alive.
- a source removed:
- remove source request from user.
TODO: try to handle the return `err` of etcd operations,
because may put into etcd, but the response to the etcd client interrupted.
Relay scheduling:
- scheduled by source DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay` of source config and decide whether to enable relay. turn on `enable-relay`:
- use `enable-relay: true` when create source
- `start-relay -s source` to dynamically change `enable-relay` turn off `enable-relay`:
- use `enable-relay: false` when create source
- `stop-relay -s source` to dynamically change `enable-relay`
- found conflict schedule type with (source, worker) when scheduler bootstrap
- scheduled by (source, worker) DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will read UpstreamRelayWorkerKeyAdapter in etcd. add UpstreamRelayWorkerKeyAdapter:
- use `start-relay -s source -w worker` remove UpstreamRelayWorkerKeyAdapter:
- use `stop-relay -s source -w worker`
- remove worker by `offline-member`
func NewScheduler ¶
NewScheduler creates a new scheduler instance.
func (*Scheduler) AcquireSubtaskLatch ¶
func (s *Scheduler) AcquireSubtaskLatch(name string) (ReleaseFunc, error)
AcquireSubtaskLatch tries acquiring a latch for subtask name.
func (*Scheduler) AddSourceCfg ¶
func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error
AddSourceCfg adds the upstream source config to the cluster, and try to bound source to worker NOTE: please verify the config before call this.
func (*Scheduler) AddSourceCfgWithWorker ¶
func (s *Scheduler) AddSourceCfgWithWorker(cfg *config.SourceConfig, workerName string) error
AddSourceCfgWithWorker adds the upstream source config to the cluster, and try to bound source to specify worker NOTE: please verify the config before call this.
func (*Scheduler) AddSubTasks ¶
func (s *Scheduler) AddSubTasks(latched bool, expectStage pb.Stage, cfgs ...config.SubTaskConfig) error
AddSubTasks adds the information of one or more subtasks for one task. use s.mu.RLock() to protect s.bound, and s.subtaskLatch to protect subtask related members. setting `latched` to true means caller has acquired latch.
func (*Scheduler) AddWorker ¶
AddWorker adds the information of the DM-worker when registering a new instance. This only adds the information of the DM-worker, in order to know whether it's online (ready to handle works), we need to wait for its healthy status through keep-alive.
func (*Scheduler) BatchOperateTaskOnWorker ¶
func (s *Scheduler) BatchOperateTaskOnWorker( ctx context.Context, worker *Worker, tasks []string, source string, stage pb.Stage, needWait bool, ) error
BatchOperateTaskOnWorker batch operate tasks in one worker and use query-status to make sure all tasks are in expected stage if needWait=true.
func (*Scheduler) BoundSources ¶
BoundSources returns all bound source IDs in increasing order.
func (*Scheduler) CloseAllWorkers ¶
func (s *Scheduler) CloseAllWorkers()
CloseAllWorkers closes all the scheduler's workers.
func (*Scheduler) GetALlSubTaskCfgs ¶
func (s *Scheduler) GetALlSubTaskCfgs() map[string]map[string]*config.SubTaskConfig
GetSubTaskCfgs gets all subTask config pointer, return nil when error happens.
func (*Scheduler) GetAllWorkers ¶
GetAllWorkers gets all worker agent.
func (*Scheduler) GetDownstreamMetaByTask ¶
GetDownstreamMetaByTask gets downstream db config and meta config by task name.
func (*Scheduler) GetExpectRelayStage ¶
GetExpectRelayStage returns the current expect relay stage. If the stage not exists, an invalid stage is returned. This func is used for testing.
func (*Scheduler) GetExpectSubTaskStage ¶
GetExpectSubTaskStage returns the current expect subtask stage. If the stage not exists, an invalid stage is returned.
func (*Scheduler) GetRelayWorkers ¶
GetRelayWorkers returns all alive worker instances for a relay source.
func (*Scheduler) GetSourceCfgByID ¶
func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig
GetSourceCfgByID gets source config by source ID.
func (*Scheduler) GetSourceCfgIDs ¶
GetSourceCfgIDs gets all added source ID.
func (*Scheduler) GetSourceCfgs ¶
func (s *Scheduler) GetSourceCfgs() map[string]*config.SourceConfig
GetSourceCfgs gets all source cfgs, return nil when error happens.
func (*Scheduler) GetSubTaskCfgs ¶
func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig
GetSubTaskCfgs gets all subconfig, return nil when error happens.
func (*Scheduler) GetSubTaskCfgsByTask ¶
func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTaskConfig
GetSubTaskCfgsByTask gets subtask configs' map by task name.
func (*Scheduler) GetSubTaskCfgsByTaskAndSource ¶
func (*Scheduler) GetTaskNameListBySourceName ¶
GetTaskNameListBySourceName gets task name list by source name.
func (*Scheduler) GetValidatorStage ¶
GetValidatorStage get validator stage of task-source pair.
func (*Scheduler) GetWorkerByName ¶
GetWorkerByName gets worker agent by worker name.
func (*Scheduler) GetWorkerBySource ¶
GetWorkerBySource gets the current bound worker agent by source ID, returns nil if the source not bound.
func (*Scheduler) OperateValidationTask ¶
func (s *Scheduler) OperateValidationTask(validatorStages []ha.Stage, changedSubtaskCfgs []config.SubTaskConfig) error
OperateValidationTask operate validator of subtask.
tasks: tasks need to operate validatorStages: stage info of subtask validators changedSubtaskCfgs: changed subtask configs
see server.StartValidation/StopValidation for more detail.
func (*Scheduler) RemoveLoadTaskAndLightningStatus ¶
RemoveLoadTaskAndLightningStatus removes the loadtask and lightning status by task.
func (*Scheduler) RemoveSourceCfg ¶
RemoveSourceCfg removes the upstream source config in the cluster. when removing the upstream source config, it should also remove: - any existing relay stage. - any source-worker bound relationship.
func (*Scheduler) RemoveSubTasks ¶
RemoveSubTasks removes the information of one or more subtasks for one task.
func (*Scheduler) RemoveWorker ¶
RemoveWorker removes the information of the DM-worker when removing the instance manually. The user should shutdown the DM-worker instance before removing its information.
func (*Scheduler) SetWorkerClientForTest ¶
SetWorkerClientForTest sets mockWorkerClient for specified worker, only used for test.
func (*Scheduler) Start ¶
Start starts the scheduler for work. NOTE: for logic errors, it should start without returning errors (but report via metrics or log) so that the user can fix them.
func (*Scheduler) StartRelay ¶
StartRelay puts etcd key-value pairs to start relay on some workers.
func (*Scheduler) TransferSource ¶
TransferSource unbinds the `source` and binds it to a free or same-source-relay `worker`. If fails halfway, the old worker should try recover.
func (*Scheduler) TryResolveLoadTask ¶
TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is accessible to the local files. If so, trigger a transfer source.
func (*Scheduler) UnboundSources ¶
UnboundSources returns all unbound source IDs in increasing order.
func (*Scheduler) UpdateExpectRelayStage ¶
UpdateExpectRelayStage updates the current expect relay stage. now, only support updates: - from `Running` to `Paused`. - from `Paused` to `Running`. NOTE: from `Running` to `Running` and `Paused` to `Paused` still update the data in etcd, because some user may want to update `{Running, Paused, ...}` to `{Running, Running, ...}`. so, this should be also supported in DM-worker.
func (*Scheduler) UpdateExpectSubTaskStage ¶
func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string, sources ...string) error
UpdateExpectSubTaskStage updates the current expect subtask stage. now, only support updates: - from `Running` to `Paused/Stopped`. - from `Paused/Stopped` to `Running`. NOTE: from `Running` to `Running` and `Paused` to `Paused` still update the data in etcd, because some user may want to update `{Running, Paused, ...}` to `{Running, Running, ...}`. so, this should be also supported in DM-worker.
func (*Scheduler) UpdateSourceCfg ¶
func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error
UpdateSourceCfg update the upstream source config to the cluster.
func (*Scheduler) UpdateSubTasks ¶
UpdateSubTasks update the information of one or more subtasks for one task.
func (*Scheduler) ValidatorEnabled ¶
ValidatorEnabled returns true when validator of task-source pair has enabled, i.e. validation mode is not none. enabled validator can be in running or stopped stage.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is an agent for a DM-worker instance.
func NewMockWorker ¶
NewMockWorker is used in tests.
func (*Worker) BaseInfo ¶
func (w *Worker) BaseInfo() ha.WorkerInfo
BaseInfo returns the base info of the worker. No lock needed because baseInfo should not be modified after the instance created.
func (*Worker) Bound ¶
func (w *Worker) Bound() ha.SourceBound
Bound returns the current source ID bound to, returns null value if not be bound.
func (*Worker) RelaySourceID ¶
RelaySourceID returns the source ID from which this worker is pulling relay log, returns empty string if not started relay.
func (*Worker) SendRequest ¶
func (w *Worker) SendRequest(ctx context.Context, req *workerrpc.Request, d time.Duration) (*workerrpc.Response, error)
SendRequest sends request to the DM-worker instance.
func (*Worker) StartRelay ¶
StartRelay adds relay source information to a bound worker and calculates the stage.
func (*Worker) StopRelay ¶
func (w *Worker) StopRelay()
StopRelay clears relay source information of a bound worker and calculates the stage.
func (*Worker) ToBound ¶
func (w *Worker) ToBound(bound ha.SourceBound) error
ToBound transforms to Bound. All available transitions can be found at the beginning of this file.
func (*Worker) ToFree ¶
func (w *Worker) ToFree()
ToFree transforms to Free and clears the bound and relay information. All available transitions can be found at the beginning of this file.
type WorkerStage ¶
type WorkerStage string
WorkerStage represents the stage of a DM-worker instance.
const ( WorkerOffline WorkerStage = "offline" // the worker is not online yet. WorkerFree WorkerStage = "free" // the worker is online, but no upstream source assigned to it yet. WorkerBound WorkerStage = "bound" // the worker is online, and one upstream source already assigned to it. WorkerRelay WorkerStage = "relay" // the worker is online, pulling relay log but not responsible for migrating. )
the stage of DM-worker instances. valid transformation:
- Offline -> Free, receive keep-alive.
- Free -> Offline, lost keep-alive.
- Free -> Bound, bind source.
- Free -> Relay, start relay for a source.
- Bound -> Offline, lost keep-alive, when receive keep-alive again, it should become Free.
- Bound -> Free, unbind source.
- Bound -> Relay, commands like transfer-source that gracefully unbind a worker which has started relay.
- Relay -> Offline, lost keep-alive.
- Relay -> Free, stop relay.
- Relay -> Bound, old bound worker becomes offline so bind source to this worker, which has started relay.
invalid transformation:
- Offline -> Bound, must become Free first.
- Offline -> Relay, must become Free first.
in Bound stage relay can be turned on/off, the difference with Bound-Relay transformation is
- Bound stage turning on/off represents a bound DM worker receives start-relay/stop-relay, source bound relation is not changed.
- Bound-Relay transformation represents source bound relation is changed.
caller should ensure the correctness when invoke below transformation methods successively. For example, call ToBound
twice with different arguments.