scheduler

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ReleaseFunc

type ReleaseFunc func()

ReleaseFunc wraps on releasing a latch. It is safe to call multiple times. Also compiler can warn you of not used ReleaseFunc variables.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules tasks for DM-worker instances, including: - register/unregister DM-worker instances. - observe the online/offline status of DM-worker instances. - observe add/remove operations for upstream sources' config. - schedule upstream sources to DM-worker instances. - schedule data migration subtask operations. - holds agents of DM-worker instances. NOTE: the DM-master server MUST wait for this scheduler become started before handling client requests. Cases trigger a source-to-worker bound try: - a worker from Offline to Free:

  • receive keep-alive.

- a worker from Bound to Free:

  • trigger by unbound: `a source removed`.

- a new source added:

  • add source request from user.

- a source unbound from another worker:

  • trigger by unbound: `a worker from Bound to Offline`.
  • TODO(csuzhangxc): design a strategy to ensure the old worker already shutdown its work.

Cases trigger a source-to-worker unbound try. - a worker from Bound to Offline:

  • lost keep-alive.

- a source removed:

  • remove source request from user.

TODO: try to handle the return `err` of etcd operations,

because may put into etcd, but the response to the etcd client interrupted.

Relay scheduling:

  • scheduled by source DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay` of source config and decide whether to enable relay. turn on `enable-relay`:
  • use `enable-relay: true` when create source
  • `start-relay -s source` to dynamically change `enable-relay` turn off `enable-relay`:
  • use `enable-relay: false` when create source
  • `stop-relay -s source` to dynamically change `enable-relay`
  • found conflict schedule type with (source, worker) when scheduler bootstrap
  • scheduled by (source, worker) DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will read UpstreamRelayWorkerKeyAdapter in etcd. add UpstreamRelayWorkerKeyAdapter:
  • use `start-relay -s source -w worker` remove UpstreamRelayWorkerKeyAdapter:
  • use `stop-relay -s source -w worker`
  • remove worker by `offline-member`

func NewScheduler

func NewScheduler(pLogger *log.Logger, securityCfg security.Security) *Scheduler

NewScheduler creates a new scheduler instance.

func (*Scheduler) AcquireSubtaskLatch

func (s *Scheduler) AcquireSubtaskLatch(name string) (ReleaseFunc, error)

AcquireSubtaskLatch tries acquiring a latch for subtask name.

func (*Scheduler) AddSourceCfg

func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error

AddSourceCfg adds the upstream source config to the cluster, and try to bound source to worker NOTE: please verify the config before call this.

func (*Scheduler) AddSourceCfgWithWorker

func (s *Scheduler) AddSourceCfgWithWorker(cfg *config.SourceConfig, workerName string) error

AddSourceCfgWithWorker adds the upstream source config to the cluster, and try to bound source to specify worker NOTE: please verify the config before call this.

func (*Scheduler) AddSubTasks

func (s *Scheduler) AddSubTasks(latched bool, expectStage pb.Stage, cfgs ...config.SubTaskConfig) error

AddSubTasks adds the information of one or more subtasks for one task. use s.mu.RLock() to protect s.bound, and s.subtaskLatch to protect subtask related members. setting `latched` to true means caller has acquired latch.

func (*Scheduler) AddWorker

func (s *Scheduler) AddWorker(name, addr string) error

AddWorker adds the information of the DM-worker when registering a new instance. This only adds the information of the DM-worker, in order to know whether it's online (ready to handle works), we need to wait for its healthy status through keep-alive.

func (*Scheduler) BatchOperateTaskOnWorker

func (s *Scheduler) BatchOperateTaskOnWorker(
	ctx context.Context, worker *Worker, tasks []string, source string, stage pb.Stage, needWait bool,
) error

BatchOperateTaskOnWorker batch operate tasks in one worker and use query-status to make sure all tasks are in expected stage if needWait=true.

func (*Scheduler) BoundSources

func (s *Scheduler) BoundSources() []string

BoundSources returns all bound source IDs in increasing order.

func (*Scheduler) Close

func (s *Scheduler) Close()

Close closes the scheduler.

func (*Scheduler) CloseAllWorkers

func (s *Scheduler) CloseAllWorkers()

CloseAllWorkers closes all the scheduler's workers.

func (*Scheduler) GetALlSubTaskCfgs

func (s *Scheduler) GetALlSubTaskCfgs() map[string]map[string]*config.SubTaskConfig

GetSubTaskCfgs gets all subTask config pointer, return nil when error happens.

func (*Scheduler) GetAllWorkers

func (s *Scheduler) GetAllWorkers() ([]*Worker, error)

GetAllWorkers gets all worker agent.

func (*Scheduler) GetDownstreamMetaByTask

func (s *Scheduler) GetDownstreamMetaByTask(task string) (*dbconfig.DBConfig, string)

GetDownstreamMetaByTask gets downstream db config and meta config by task name.

func (*Scheduler) GetExpectRelayStage

func (s *Scheduler) GetExpectRelayStage(source string) ha.Stage

GetExpectRelayStage returns the current expect relay stage. If the stage not exists, an invalid stage is returned. This func is used for testing.

func (*Scheduler) GetExpectSubTaskStage

func (s *Scheduler) GetExpectSubTaskStage(task, source string) ha.Stage

GetExpectSubTaskStage returns the current expect subtask stage. If the stage not exists, an invalid stage is returned.

func (*Scheduler) GetRelayWorkers

func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error)

GetRelayWorkers returns all alive worker instances for a relay source.

func (*Scheduler) GetSourceCfgByID

func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig

GetSourceCfgByID gets source config by source ID.

func (*Scheduler) GetSourceCfgIDs

func (s *Scheduler) GetSourceCfgIDs() []string

GetSourceCfgIDs gets all added source ID.

func (*Scheduler) GetSourceCfgs

func (s *Scheduler) GetSourceCfgs() map[string]*config.SourceConfig

GetSourceCfgs gets all source cfgs, return nil when error happens.

func (*Scheduler) GetSubTaskCfgs

func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig

GetSubTaskCfgs gets all subconfig, return nil when error happens.

func (*Scheduler) GetSubTaskCfgsByTask

func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTaskConfig

GetSubTaskCfgsByTask gets subtask configs' map by task name.

func (*Scheduler) GetSubTaskCfgsByTaskAndSource

func (s *Scheduler) GetSubTaskCfgsByTaskAndSource(taskName string, sources []string) map[string]map[string]config.SubTaskConfig

func (*Scheduler) GetTaskNameListBySourceName

func (s *Scheduler) GetTaskNameListBySourceName(sourceName string, expectStage *pb.Stage) []string

GetTaskNameListBySourceName gets task name list by source name.

func (*Scheduler) GetValidatorStage

func (s *Scheduler) GetValidatorStage(task, source string) *ha.Stage

GetValidatorStage get validator stage of task-source pair.

func (*Scheduler) GetWorkerByName

func (s *Scheduler) GetWorkerByName(name string) *Worker

GetWorkerByName gets worker agent by worker name.

func (*Scheduler) GetWorkerBySource

func (s *Scheduler) GetWorkerBySource(source string) *Worker

GetWorkerBySource gets the current bound worker agent by source ID, returns nil if the source not bound.

func (*Scheduler) OperateValidationTask

func (s *Scheduler) OperateValidationTask(validatorStages []ha.Stage, changedSubtaskCfgs []config.SubTaskConfig) error

OperateValidationTask operate validator of subtask.

tasks: tasks need to operate
validatorStages: stage info of subtask validators
changedSubtaskCfgs: changed subtask configs

see server.StartValidation/StopValidation for more detail.

func (*Scheduler) RemoveLoadTaskAndLightningStatus

func (s *Scheduler) RemoveLoadTaskAndLightningStatus(task string) error

RemoveLoadTaskAndLightningStatus removes the loadtask and lightning status by task.

func (*Scheduler) RemoveSourceCfg

func (s *Scheduler) RemoveSourceCfg(source string) error

RemoveSourceCfg removes the upstream source config in the cluster. when removing the upstream source config, it should also remove: - any existing relay stage. - any source-worker bound relationship.

func (*Scheduler) RemoveSubTasks

func (s *Scheduler) RemoveSubTasks(task string, sources ...string) error

RemoveSubTasks removes the information of one or more subtasks for one task.

func (*Scheduler) RemoveWorker

func (s *Scheduler) RemoveWorker(name string) error

RemoveWorker removes the information of the DM-worker when removing the instance manually. The user should shutdown the DM-worker instance before removing its information.

func (*Scheduler) SetWorkerClientForTest

func (s *Scheduler) SetWorkerClientForTest(name string, mockCli workerrpc.Client)

SetWorkerClientForTest sets mockWorkerClient for specified worker, only used for test.

func (*Scheduler) Start

func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err error)

Start starts the scheduler for work. NOTE: for logic errors, it should start without returning errors (but report via metrics or log) so that the user can fix them.

func (*Scheduler) StartRelay

func (s *Scheduler) StartRelay(source string, workers []string) error

StartRelay puts etcd key-value pairs to start relay on some workers.

func (*Scheduler) Started

func (s *Scheduler) Started() bool

Started returns if the scheduler is started.

func (*Scheduler) StopRelay

func (s *Scheduler) StopRelay(source string, workers []string) error

StopRelay deletes etcd key-value pairs to stop relay on some workers.

func (*Scheduler) TransferSource

func (s *Scheduler) TransferSource(ctx context.Context, source, worker string) error

TransferSource unbinds the `source` and binds it to a free or same-source-relay `worker`. If fails halfway, the old worker should try recover.

func (*Scheduler) TryResolveLoadTask

func (s *Scheduler) TryResolveLoadTask(sources []string)

TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is accessible to the local files. If so, trigger a transfer source.

func (*Scheduler) UnboundSources

func (s *Scheduler) UnboundSources() []string

UnboundSources returns all unbound source IDs in increasing order.

func (*Scheduler) UpdateExpectRelayStage

func (s *Scheduler) UpdateExpectRelayStage(newStage pb.Stage, sources ...string) error

UpdateExpectRelayStage updates the current expect relay stage. now, only support updates: - from `Running` to `Paused`. - from `Paused` to `Running`. NOTE: from `Running` to `Running` and `Paused` to `Paused` still update the data in etcd, because some user may want to update `{Running, Paused, ...}` to `{Running, Running, ...}`. so, this should be also supported in DM-worker.

func (*Scheduler) UpdateExpectSubTaskStage

func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string, sources ...string) error

UpdateExpectSubTaskStage updates the current expect subtask stage. now, only support updates: - from `Running` to `Paused/Stopped`. - from `Paused/Stopped` to `Running`. NOTE: from `Running` to `Running` and `Paused` to `Paused` still update the data in etcd, because some user may want to update `{Running, Paused, ...}` to `{Running, Running, ...}`. so, this should be also supported in DM-worker.

func (*Scheduler) UpdateSourceCfg

func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error

UpdateSourceCfg update the upstream source config to the cluster.

func (*Scheduler) UpdateSubTasks

func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskConfig) error

UpdateSubTasks update the information of one or more subtasks for one task.

func (*Scheduler) ValidatorEnabled

func (s *Scheduler) ValidatorEnabled(task, source string) bool

ValidatorEnabled returns true when validator of task-source pair has enabled, i.e. validation mode is not none. enabled validator can be in running or stopped stage.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is an agent for a DM-worker instance.

func NewMockWorker

func NewMockWorker(cli workerrpc.Client) *Worker

NewMockWorker is used in tests.

func NewWorker

func NewWorker(baseInfo ha.WorkerInfo, securityCfg security.Security) (*Worker, error)

NewWorker creates a new Worker instance with Offline stage.

func (*Worker) BaseInfo

func (w *Worker) BaseInfo() ha.WorkerInfo

BaseInfo returns the base info of the worker. No lock needed because baseInfo should not be modified after the instance created.

func (*Worker) Bound

func (w *Worker) Bound() ha.SourceBound

Bound returns the current source ID bound to, returns null value if not be bound.

func (*Worker) Close

func (w *Worker) Close()

Close closes the worker and release resources.

func (*Worker) RelaySourceID

func (w *Worker) RelaySourceID() string

RelaySourceID returns the source ID from which this worker is pulling relay log, returns empty string if not started relay.

func (*Worker) SendRequest

func (w *Worker) SendRequest(ctx context.Context, req *workerrpc.Request, d time.Duration) (*workerrpc.Response, error)

SendRequest sends request to the DM-worker instance.

func (*Worker) Stage

func (w *Worker) Stage() WorkerStage

Stage returns the current stage.

func (*Worker) StartRelay

func (w *Worker) StartRelay(sourceID string) error

StartRelay adds relay source information to a bound worker and calculates the stage.

func (*Worker) StopRelay

func (w *Worker) StopRelay()

StopRelay clears relay source information of a bound worker and calculates the stage.

func (*Worker) ToBound

func (w *Worker) ToBound(bound ha.SourceBound) error

ToBound transforms to Bound. All available transitions can be found at the beginning of this file.

func (*Worker) ToFree

func (w *Worker) ToFree()

ToFree transforms to Free and clears the bound and relay information. All available transitions can be found at the beginning of this file.

func (*Worker) ToOffline

func (w *Worker) ToOffline()

ToOffline transforms to Offline. All available transitions can be found at the beginning of this file.

func (*Worker) Unbound

func (w *Worker) Unbound() error

Unbound changes worker's stage from Bound to Free or Relay.

type WorkerStage

type WorkerStage string

WorkerStage represents the stage of a DM-worker instance.

const (
	WorkerOffline WorkerStage = "offline" // the worker is not online yet.
	WorkerFree    WorkerStage = "free"    // the worker is online, but no upstream source assigned to it yet.
	WorkerBound   WorkerStage = "bound"   // the worker is online, and one upstream source already assigned to it.
	WorkerRelay   WorkerStage = "relay"   // the worker is online, pulling relay log but not responsible for migrating.
)

the stage of DM-worker instances. valid transformation:

  • Offline -> Free, receive keep-alive.
  • Free -> Offline, lost keep-alive.
  • Free -> Bound, bind source.
  • Free -> Relay, start relay for a source.
  • Bound -> Offline, lost keep-alive, when receive keep-alive again, it should become Free.
  • Bound -> Free, unbind source.
  • Bound -> Relay, commands like transfer-source that gracefully unbind a worker which has started relay.
  • Relay -> Offline, lost keep-alive.
  • Relay -> Free, stop relay.
  • Relay -> Bound, old bound worker becomes offline so bind source to this worker, which has started relay.

invalid transformation:

  • Offline -> Bound, must become Free first.
  • Offline -> Relay, must become Free first.

in Bound stage relay can be turned on/off, the difference with Bound-Relay transformation is

  • Bound stage turning on/off represents a bound DM worker receives start-relay/stop-relay, source bound relation is not changed.
  • Bound-Relay transformation represents source bound relation is changed.

caller should ensure the correctness when invoke below transformation methods successively. For example, call ToBound

twice with different arguments.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL