Documentation ¶
Index ¶
- Variables
- func GetDMStorageType(bucketEnabled bool, bucketType resModel.ResourceType) resModel.ResourceType
- func NewDMResourceID(taskName, sourceName string, resType resModel.ResourceType) resModel.ResourceID
- func RegisterWorker()
- type CheckpointAgent
- type DDLCoordinator
- func (c *DDLCoordinator) ClearMetadata(ctx context.Context) error
- func (c *DDLCoordinator) Coordinate(ctx context.Context, item *metadata.DDLItem) ([]string, optimism.ConflictStage, error)
- func (c *DDLCoordinator) Reset(ctx context.Context) error
- func (c *DDLCoordinator) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse
- type DDLLock
- type JobMaster
- func (jm *JobMaster) Binlog(ctx context.Context, req *dmpkg.BinlogRequest) (*dmpkg.BinlogResponse, error)
- func (jm *JobMaster) BinlogSchema(ctx context.Context, req *dmpkg.BinlogSchemaRequest) *dmpkg.BinlogSchemaResponse
- func (jm *JobMaster) BinlogSchemaTask(ctx context.Context, taskID string, req *dmpkg.BinlogSchemaTaskRequest) *dmpkg.CommonTaskResponse
- func (jm *JobMaster) BinlogTask(ctx context.Context, taskID string, req *dmpkg.BinlogTaskRequest) *dmpkg.CommonTaskResponse
- func (jm *JobMaster) CloseImpl(ctx context.Context)
- func (jm *JobMaster) DMAPIDeleteBinlogOperator(c *gin.Context, taskName string, ...)
- func (jm *JobMaster) DMAPIGetBinlogOperator(c *gin.Context, taskName string, params openapi.DMAPIGetBinlogOperatorParams)
- func (jm *JobMaster) DMAPIGetJobConfig(c *gin.Context)
- func (jm *JobMaster) DMAPIGetJobStatus(c *gin.Context, params openapi.DMAPIGetJobStatusParams)
- func (jm *JobMaster) DMAPIGetSchema(c *gin.Context, taskname string, params openapi.DMAPIGetSchemaParams)
- func (jm *JobMaster) DMAPIOperateJob(c *gin.Context)
- func (jm *JobMaster) DMAPISetBinlogOperator(c *gin.Context, taskName string)
- func (jm *JobMaster) DMAPISetSchema(c *gin.Context, taskName string)
- func (jm *JobMaster) DMAPIUpdateJobConfig(c *gin.Context)
- func (jm *JobMaster) GetJobCfg(ctx context.Context) (*config.JobCfg, error)
- func (jm *JobMaster) InitImpl(ctx context.Context) error
- func (jm *JobMaster) IsJobMasterImpl()
- func (jm *JobMaster) OnCancel(ctx context.Context) error
- func (jm *JobMaster) OnJobManagerMessage(topic p2p.Topic, message interface{}) error
- func (jm *JobMaster) OnMasterMessage(ctx context.Context, topic p2p.Topic, message interface{}) error
- func (jm *JobMaster) OnMasterRecovered(ctx context.Context) error
- func (jm *JobMaster) OnOpenAPIInitialized(router *gin.RouterGroup)
- func (jm *JobMaster) OnWorkerDispatched(worker framework.WorkerHandle, result error) error
- func (jm *JobMaster) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error
- func (jm *JobMaster) OnWorkerOffline(worker framework.WorkerHandle, reason error) error
- func (jm *JobMaster) OnWorkerOnline(worker framework.WorkerHandle) error
- func (jm *JobMaster) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error
- func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobStatus, error)
- func (jm *JobMaster) QueryStatus(ctx context.Context, taskID string) *dmpkg.QueryStatusResponse
- func (jm *JobMaster) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse
- func (jm *JobMaster) StopImpl(ctx context.Context)
- func (jm *JobMaster) Tick(ctx context.Context) error
- func (jm *JobMaster) UpdateJobCfg(ctx context.Context, cfg *config.JobCfg) error
- type JobStatus
- type ShardTable
- type ShowDDLLocksResponse
- type TableAgent
- type TaskManager
- func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool)
- func (tm *TaskManager) OperateTask(ctx context.Context, op dmpkg.OperateType, jobCfg *config.JobCfg, ...) (err error)
- func (tm *TaskManager) TaskStatus() map[string]runtime.TaskStatus
- func (tm *TaskManager) TickImpl(ctx context.Context) error
- func (tm *TaskManager) UpdateTaskStatus(taskStatus runtime.TaskStatus)
- type TaskStatus
- type WorkerAgent
- type WorkerManager
Constants ¶
This section is empty.
Variables ¶
var ( // WorkerNormalInterval is check interval when no error returns in tick WorkerNormalInterval = time.Second * 30 // WorkerErrorInterval is check interval when any error returns in tick WorkerErrorInterval = time.Second * 10 )
Functions ¶
func GetDMStorageType ¶
func GetDMStorageType(bucketEnabled bool, bucketType resModel.ResourceType) resModel.ResourceType
GetDMStorageType return the storage type that DM uses
func NewDMResourceID ¶
func NewDMResourceID(taskName, sourceName string, resType resModel.ResourceType) resModel.ResourceID
NewDMResourceID returns a ResourceID in DM's style. Currently only support s3/gcs resource.
func RegisterWorker ¶
func RegisterWorker()
RegisterWorker is used to register dm job master to global registry
Types ¶
type CheckpointAgent ¶
type CheckpointAgent interface {
IsFresh(ctx context.Context, workerType frameModel.WorkerType, taskCfg *metadata.Task) (bool, error)
}
CheckpointAgent defines an interface for checkpoint.
type DDLCoordinator ¶
type DDLCoordinator struct {
// contains filtered or unexported fields
}
DDLCoordinator is a coordinator for ddl.
func NewDDLCoordinator ¶
func NewDDLCoordinator(jobID string, kvClient metaModel.KVClient, tableAgent TableAgent, jobStore *metadata.JobStore, pLogger *zap.Logger) *DDLCoordinator
NewDDLCoordinator creates a new DDLCoordinator.
func (*DDLCoordinator) ClearMetadata ¶
func (c *DDLCoordinator) ClearMetadata(ctx context.Context) error
ClearMetadata clears metadata.
func (*DDLCoordinator) Coordinate ¶
func (c *DDLCoordinator) Coordinate(ctx context.Context, item *metadata.DDLItem) ([]string, optimism.ConflictStage, error)
Coordinate coordinates ddls.
func (*DDLCoordinator) Reset ¶
func (c *DDLCoordinator) Reset(ctx context.Context) error
Reset resets the ddl coordinator.
func (*DDLCoordinator) ShowDDLLocks ¶
func (c *DDLCoordinator) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse
ShowDDLLocks show ddl locks.
type DDLLock ¶
type DDLLock struct { // source table -> [current table, pending table(conflict table)] ShardTables map[metadata.SourceTable]ShardTable }
DDLLock represents ddl lock of a target table.
type JobMaster ¶
type JobMaster struct { framework.BaseJobMaster // contains filtered or unexported fields }
JobMaster defines job master of dm job
func (*JobMaster) Binlog ¶
func (jm *JobMaster) Binlog(ctx context.Context, req *dmpkg.BinlogRequest) (*dmpkg.BinlogResponse, error)
Binlog implements the api of binlog request.
func (*JobMaster) BinlogSchema ¶
func (jm *JobMaster) BinlogSchema(ctx context.Context, req *dmpkg.BinlogSchemaRequest) *dmpkg.BinlogSchemaResponse
BinlogSchema implements the api of binlog schema request.
func (*JobMaster) BinlogSchemaTask ¶
func (jm *JobMaster) BinlogSchemaTask(ctx context.Context, taskID string, req *dmpkg.BinlogSchemaTaskRequest) *dmpkg.CommonTaskResponse
BinlogSchemaTask implements the api of binlog schema task request.
func (*JobMaster) BinlogTask ¶
func (jm *JobMaster) BinlogTask(ctx context.Context, taskID string, req *dmpkg.BinlogTaskRequest) *dmpkg.CommonTaskResponse
BinlogTask implements the api of binlog task request.
func (*JobMaster) DMAPIDeleteBinlogOperator ¶
func (jm *JobMaster) DMAPIDeleteBinlogOperator(c *gin.Context, taskName string, params openapi.DMAPIDeleteBinlogOperatorParams)
DMAPIDeleteBinlogOperator implements the api of delete binlog operator.
func (*JobMaster) DMAPIGetBinlogOperator ¶
func (jm *JobMaster) DMAPIGetBinlogOperator(c *gin.Context, taskName string, params openapi.DMAPIGetBinlogOperatorParams)
DMAPIGetBinlogOperator implements the api of get binlog operator. TODO: pagination support if needed
func (*JobMaster) DMAPIGetJobConfig ¶
DMAPIGetJobConfig implements the api of get job config.
func (*JobMaster) DMAPIGetJobStatus ¶
func (jm *JobMaster) DMAPIGetJobStatus(c *gin.Context, params openapi.DMAPIGetJobStatusParams)
DMAPIGetJobStatus implements the api of get job status.
func (*JobMaster) DMAPIGetSchema ¶
func (jm *JobMaster) DMAPIGetSchema(c *gin.Context, taskname string, params openapi.DMAPIGetSchemaParams)
DMAPIGetSchema implements the api of get schema.
func (*JobMaster) DMAPIOperateJob ¶
DMAPIOperateJob implements the api of operate job.
func (*JobMaster) DMAPISetBinlogOperator ¶
DMAPISetBinlogOperator implements the api of set binlog operator.
func (*JobMaster) DMAPISetSchema ¶
DMAPISetSchema implements the api of set schema.
func (*JobMaster) DMAPIUpdateJobConfig ¶
DMAPIUpdateJobConfig implements the api of update job config.
func (*JobMaster) IsJobMasterImpl ¶
func (jm *JobMaster) IsJobMasterImpl()
IsJobMasterImpl implements JobMasterImpl.IsJobMasterImpl
func (*JobMaster) OnJobManagerMessage ¶
OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage
func (*JobMaster) OnMasterMessage ¶
func (jm *JobMaster) OnMasterMessage(ctx context.Context, topic p2p.Topic, message interface{}) error
OnMasterMessage implements JobMasterImpl.OnMasterMessage
func (*JobMaster) OnMasterRecovered ¶
OnMasterRecovered implements JobMasterImpl.OnMasterRecovered When it is called, the jobCfg may not be in the metadata, and we should not report an error
func (*JobMaster) OnOpenAPIInitialized ¶
func (jm *JobMaster) OnOpenAPIInitialized(router *gin.RouterGroup)
OnOpenAPIInitialized implements JobMasterImpl.OnOpenAPIInitialized.
func (*JobMaster) OnWorkerDispatched ¶
func (jm *JobMaster) OnWorkerDispatched(worker framework.WorkerHandle, result error) error
OnWorkerDispatched implements JobMasterImpl.OnWorkerDispatched
func (*JobMaster) OnWorkerMessage ¶
func (jm *JobMaster) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error
OnWorkerMessage implements JobMasterImpl.OnWorkerMessage
func (*JobMaster) OnWorkerOffline ¶
func (jm *JobMaster) OnWorkerOffline(worker framework.WorkerHandle, reason error) error
OnWorkerOffline implements JobMasterImpl.OnWorkerOffline
func (*JobMaster) OnWorkerOnline ¶
func (jm *JobMaster) OnWorkerOnline(worker framework.WorkerHandle) error
OnWorkerOnline implements JobMasterImpl.OnWorkerOnline
func (*JobMaster) OnWorkerStatusUpdated ¶
func (jm *JobMaster) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error
OnWorkerStatusUpdated implements JobMasterImpl.OnWorkerStatusUpdated
func (*JobMaster) QueryJobStatus ¶
QueryJobStatus is the api of query job status.
func (*JobMaster) QueryStatus ¶
QueryStatus query status for a task
func (*JobMaster) ShowDDLLocks ¶
func (jm *JobMaster) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse
ShowDDLLocks implements the api of show ddl locks request.
func (*JobMaster) StopImpl ¶
StopImpl implements JobMasterImpl.StopImpl checkpoint is removed when job is stopped, this is different with OP DM where `--remove-meta` is specified at start-task.
type JobStatus ¶
type JobStatus struct { JobID frameModel.MasterID `json:"job_id"` // taskID -> Status TaskStatus map[string]TaskStatus `json:"task_status"` // FinishedUnitStatus records the finished unit status of a task. This field // is not atomic with TaskStatus (current status). FinishedUnitStatus map[string][]*metadata.FinishedTaskStatus `json:"finished_unit_status,omitempty"` }
JobStatus represents status of a job
type ShardTable ¶
ShardTable represents create table statements of a source table.
type ShowDDLLocksResponse ¶
type ShowDDLLocksResponse struct {
Locks map[metadata.TargetTable]DDLLock
}
ShowDDLLocksResponse represents response of show ddl locks.
type TableAgent ¶
type TableAgent interface { FetchAllDoTables(ctx context.Context, cfg *config.JobCfg) (map[metadata.TargetTable][]metadata.SourceTable, error) FetchTableStmt(ctx context.Context, jobID string, cfg *config.JobCfg, sourceTable metadata.SourceTable) (string, error) }
TableAgent defines an interface for checkpoint.
type TaskManager ¶
type TaskManager struct { *ticker.DefaultTicker // contains filtered or unexported fields }
TaskManager checks and operates task.
func NewTaskManager ¶
func NewTaskManager( jobID string, initTaskStatus []runtime.TaskStatus, jobStore *metadata.JobStore, messageAgent dmpkg.MessageAgent, pLogger *zap.Logger, metricFactory promutil.Factory, ) *TaskManager
NewTaskManager creates a new TaskManager instance
func (*TaskManager) GetTaskStatus ¶
func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool)
GetTaskStatus gets task status by taskID
func (*TaskManager) OperateTask ¶
func (tm *TaskManager) OperateTask(ctx context.Context, op dmpkg.OperateType, jobCfg *config.JobCfg, tasks []string) (err error)
OperateTask updates the task status in metadata and triggers the task manager to check and operate task. called by user request.
func (*TaskManager) TaskStatus ¶
func (tm *TaskManager) TaskStatus() map[string]runtime.TaskStatus
TaskStatus return the task status.
func (*TaskManager) TickImpl ¶
func (tm *TaskManager) TickImpl(ctx context.Context) error
TickImpl removes tasks that are not in the job config. TickImpl checks and operates task if needed.
func (*TaskManager) UpdateTaskStatus ¶
func (tm *TaskManager) UpdateTaskStatus(taskStatus runtime.TaskStatus)
UpdateTaskStatus is called when receive task status from worker.
type TaskStatus ¶
type TaskStatus struct { ExpectedStage metadata.TaskStage `json:"expected_stage"` WorkerID frameModel.WorkerID `json:"worker_id"` ConfigOutdated bool `json:"config_outdated"` Status *dmpkg.QueryStatusResponse `json:"status"` Duration time.Duration `json:"duration"` }
TaskStatus represents status of a task
type WorkerAgent ¶
type WorkerAgent interface { // for create worker CreateWorker( workerType framework.WorkerType, config framework.WorkerConfig, opts ...framework.CreateWorkerOpt, ) (frameModel.WorkerID, error) }
WorkerAgent defines an interface for create worker.
type WorkerManager ¶
type WorkerManager struct { *ticker.DefaultTicker // contains filtered or unexported fields }
WorkerManager checks and schedules workers.
func NewWorkerManager ¶
func NewWorkerManager( jobID string, initWorkerStatus []runtime.WorkerStatus, jobStore *metadata.JobStore, unitStore *metadata.UnitStateStore, workerAgent WorkerAgent, messageAgent dmpkg.MessageAgent, checkpointAgent CheckpointAgent, pLogger *zap.Logger, storageType resModel.ResourceType, ) *WorkerManager
NewWorkerManager creates a new WorkerManager instance
func (*WorkerManager) TickImpl ¶
func (wm *WorkerManager) TickImpl(ctx context.Context) error
TickImpl remove offline workers. TickImpl stop unneeded workers. TickImpl create new workers if needed.
func (*WorkerManager) UpdateWorkerStatus ¶
func (wm *WorkerManager) UpdateWorkerStatus(workerStatus runtime.WorkerStatus)
UpdateWorkerStatus is called when receive worker status.
func (*WorkerManager) WorkerStatus ¶
func (wm *WorkerManager) WorkerStatus() map[string]runtime.WorkerStatus
WorkerStatus return the worker status.