dm

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// WorkerNormalInterval is check interval when no error returns in tick
	WorkerNormalInterval = time.Second * 30
	// WorkerErrorInterval is check interval when any error returns in tick
	WorkerErrorInterval = time.Second * 10
)

Functions

func GetDMStorageType

func GetDMStorageType(bucketEnabled bool, bucketType resModel.ResourceType) resModel.ResourceType

GetDMStorageType return the storage type that DM uses

func NewDMResourceID

func NewDMResourceID(taskName, sourceName string, resType resModel.ResourceType) resModel.ResourceID

NewDMResourceID returns a ResourceID in DM's style. Currently only support s3/gcs resource.

func RegisterWorker

func RegisterWorker()

RegisterWorker is used to register dm job master to global registry

Types

type CheckpointAgent

type CheckpointAgent interface {
	IsFresh(ctx context.Context, workerType frameModel.WorkerType, taskCfg *metadata.Task) (bool, error)
}

CheckpointAgent defines an interface for checkpoint.

type DDLCoordinator

type DDLCoordinator struct {
	// contains filtered or unexported fields
}

DDLCoordinator is a coordinator for ddl.

func NewDDLCoordinator

func NewDDLCoordinator(jobID string, kvClient metaModel.KVClient, tableAgent TableAgent, jobStore *metadata.JobStore, pLogger *zap.Logger) *DDLCoordinator

NewDDLCoordinator creates a new DDLCoordinator.

func (*DDLCoordinator) ClearMetadata

func (c *DDLCoordinator) ClearMetadata(ctx context.Context) error

ClearMetadata clears metadata.

func (*DDLCoordinator) Coordinate

Coordinate coordinates ddls.

func (*DDLCoordinator) Reset

func (c *DDLCoordinator) Reset(ctx context.Context) error

Reset resets the ddl coordinator.

func (*DDLCoordinator) ShowDDLLocks

func (c *DDLCoordinator) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse

ShowDDLLocks show ddl locks.

type DDLLock

type DDLLock struct {
	// source table -> [current table, pending table(conflict table)]
	ShardTables map[metadata.SourceTable]ShardTable
}

DDLLock represents ddl lock of a target table.

type JobMaster

type JobMaster struct {
	framework.BaseJobMaster
	// contains filtered or unexported fields
}

JobMaster defines job master of dm job

func (*JobMaster) Binlog

Binlog implements the api of binlog request.

func (*JobMaster) BinlogSchema

BinlogSchema implements the api of binlog schema request.

func (*JobMaster) BinlogSchemaTask

func (jm *JobMaster) BinlogSchemaTask(ctx context.Context, taskID string, req *dmpkg.BinlogSchemaTaskRequest) *dmpkg.CommonTaskResponse

BinlogSchemaTask implements the api of binlog schema task request.

func (*JobMaster) BinlogTask

func (jm *JobMaster) BinlogTask(ctx context.Context, taskID string, req *dmpkg.BinlogTaskRequest) *dmpkg.CommonTaskResponse

BinlogTask implements the api of binlog task request.

func (*JobMaster) CloseImpl

func (jm *JobMaster) CloseImpl(ctx context.Context)

CloseImpl implements JobMasterImpl.CloseImpl

func (*JobMaster) DMAPIDeleteBinlogOperator

func (jm *JobMaster) DMAPIDeleteBinlogOperator(c *gin.Context, taskName string, params openapi.DMAPIDeleteBinlogOperatorParams)

DMAPIDeleteBinlogOperator implements the api of delete binlog operator.

func (*JobMaster) DMAPIGetBinlogOperator

func (jm *JobMaster) DMAPIGetBinlogOperator(c *gin.Context, taskName string, params openapi.DMAPIGetBinlogOperatorParams)

DMAPIGetBinlogOperator implements the api of get binlog operator. TODO: pagination support if needed

func (*JobMaster) DMAPIGetJobConfig

func (jm *JobMaster) DMAPIGetJobConfig(c *gin.Context)

DMAPIGetJobConfig implements the api of get job config.

func (*JobMaster) DMAPIGetJobStatus

func (jm *JobMaster) DMAPIGetJobStatus(c *gin.Context, params openapi.DMAPIGetJobStatusParams)

DMAPIGetJobStatus implements the api of get job status.

func (*JobMaster) DMAPIGetSchema

func (jm *JobMaster) DMAPIGetSchema(c *gin.Context, taskname string, params openapi.DMAPIGetSchemaParams)

DMAPIGetSchema implements the api of get schema.

func (*JobMaster) DMAPIOperateJob

func (jm *JobMaster) DMAPIOperateJob(c *gin.Context)

DMAPIOperateJob implements the api of operate job.

func (*JobMaster) DMAPISetBinlogOperator

func (jm *JobMaster) DMAPISetBinlogOperator(c *gin.Context, taskName string)

DMAPISetBinlogOperator implements the api of set binlog operator.

func (*JobMaster) DMAPISetSchema

func (jm *JobMaster) DMAPISetSchema(c *gin.Context, taskName string)

DMAPISetSchema implements the api of set schema.

func (*JobMaster) DMAPIUpdateJobConfig

func (jm *JobMaster) DMAPIUpdateJobConfig(c *gin.Context)

DMAPIUpdateJobConfig implements the api of update job config.

func (*JobMaster) GetJobCfg

func (jm *JobMaster) GetJobCfg(ctx context.Context) (*config.JobCfg, error)

GetJobCfg gets job config.

func (*JobMaster) InitImpl

func (jm *JobMaster) InitImpl(ctx context.Context) error

InitImpl implements JobMasterImpl.InitImpl

func (*JobMaster) IsJobMasterImpl

func (jm *JobMaster) IsJobMasterImpl()

IsJobMasterImpl implements JobMasterImpl.IsJobMasterImpl

func (*JobMaster) OnCancel

func (jm *JobMaster) OnCancel(ctx context.Context) error

OnCancel implements JobMasterImpl.OnCancel

func (*JobMaster) OnJobManagerMessage

func (jm *JobMaster) OnJobManagerMessage(topic p2p.Topic, message interface{}) error

OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage

func (*JobMaster) OnMasterMessage

func (jm *JobMaster) OnMasterMessage(ctx context.Context, topic p2p.Topic, message interface{}) error

OnMasterMessage implements JobMasterImpl.OnMasterMessage

func (*JobMaster) OnMasterRecovered

func (jm *JobMaster) OnMasterRecovered(ctx context.Context) error

OnMasterRecovered implements JobMasterImpl.OnMasterRecovered When it is called, the jobCfg may not be in the metadata, and we should not report an error

func (*JobMaster) OnOpenAPIInitialized

func (jm *JobMaster) OnOpenAPIInitialized(router *gin.RouterGroup)

OnOpenAPIInitialized implements JobMasterImpl.OnOpenAPIInitialized.

func (*JobMaster) OnWorkerDispatched

func (jm *JobMaster) OnWorkerDispatched(worker framework.WorkerHandle, result error) error

OnWorkerDispatched implements JobMasterImpl.OnWorkerDispatched

func (*JobMaster) OnWorkerMessage

func (jm *JobMaster) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error

OnWorkerMessage implements JobMasterImpl.OnWorkerMessage

func (*JobMaster) OnWorkerOffline

func (jm *JobMaster) OnWorkerOffline(worker framework.WorkerHandle, reason error) error

OnWorkerOffline implements JobMasterImpl.OnWorkerOffline

func (*JobMaster) OnWorkerOnline

func (jm *JobMaster) OnWorkerOnline(worker framework.WorkerHandle) error

OnWorkerOnline implements JobMasterImpl.OnWorkerOnline

func (*JobMaster) OnWorkerStatusUpdated

func (jm *JobMaster) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error

OnWorkerStatusUpdated implements JobMasterImpl.OnWorkerStatusUpdated

func (*JobMaster) QueryJobStatus

func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobStatus, error)

QueryJobStatus is the api of query job status.

func (*JobMaster) QueryStatus

func (jm *JobMaster) QueryStatus(ctx context.Context, taskID string) *dmpkg.QueryStatusResponse

QueryStatus query status for a task

func (*JobMaster) ShowDDLLocks

func (jm *JobMaster) ShowDDLLocks(ctx context.Context) ShowDDLLocksResponse

ShowDDLLocks implements the api of show ddl locks request.

func (*JobMaster) StopImpl

func (jm *JobMaster) StopImpl(ctx context.Context)

StopImpl implements JobMasterImpl.StopImpl checkpoint is removed when job is stopped, this is different with OP DM where `--remove-meta` is specified at start-task.

func (*JobMaster) Tick

func (jm *JobMaster) Tick(ctx context.Context) error

Tick implements JobMasterImpl.Tick Do not do heavy work in Tick, it will block the message processing.

func (*JobMaster) UpdateJobCfg

func (jm *JobMaster) UpdateJobCfg(ctx context.Context, cfg *config.JobCfg) error

UpdateJobCfg updates job config.

type JobStatus

type JobStatus struct {
	JobID frameModel.MasterID `json:"job_id"`
	// taskID -> Status
	TaskStatus map[string]TaskStatus `json:"task_status"`
	// FinishedUnitStatus records the finished unit status of a task. This field
	// is not atomic with TaskStatus (current status).
	FinishedUnitStatus map[string][]*metadata.FinishedTaskStatus `json:"finished_unit_status,omitempty"`
}

JobStatus represents status of a job

type ShardTable

type ShardTable struct {
	Current string
	Next    string
}

ShardTable represents create table statements of a source table.

type ShowDDLLocksResponse

type ShowDDLLocksResponse struct {
	Locks map[metadata.TargetTable]DDLLock
}

ShowDDLLocksResponse represents response of show ddl locks.

type TableAgent

type TableAgent interface {
	FetchAllDoTables(ctx context.Context, cfg *config.JobCfg) (map[metadata.TargetTable][]metadata.SourceTable, error)
	FetchTableStmt(ctx context.Context, jobID string, cfg *config.JobCfg, sourceTable metadata.SourceTable) (string, error)
}

TableAgent defines an interface for checkpoint.

type TaskManager

type TaskManager struct {
	*ticker.DefaultTicker
	// contains filtered or unexported fields
}

TaskManager checks and operates task.

func NewTaskManager

func NewTaskManager(
	jobID string,
	initTaskStatus []runtime.TaskStatus,
	jobStore *metadata.JobStore,
	messageAgent dmpkg.MessageAgent,
	pLogger *zap.Logger,
	metricFactory promutil.Factory,
) *TaskManager

NewTaskManager creates a new TaskManager instance

func (*TaskManager) GetTaskStatus

func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool)

GetTaskStatus gets task status by taskID

func (*TaskManager) OperateTask

func (tm *TaskManager) OperateTask(ctx context.Context, op dmpkg.OperateType, jobCfg *config.JobCfg, tasks []string) (err error)

OperateTask updates the task status in metadata and triggers the task manager to check and operate task. called by user request.

func (*TaskManager) TaskStatus

func (tm *TaskManager) TaskStatus() map[string]runtime.TaskStatus

TaskStatus return the task status.

func (*TaskManager) TickImpl

func (tm *TaskManager) TickImpl(ctx context.Context) error

TickImpl removes tasks that are not in the job config. TickImpl checks and operates task if needed.

func (*TaskManager) UpdateTaskStatus

func (tm *TaskManager) UpdateTaskStatus(taskStatus runtime.TaskStatus)

UpdateTaskStatus is called when receive task status from worker.

type TaskStatus

type TaskStatus struct {
	ExpectedStage  metadata.TaskStage         `json:"expected_stage"`
	WorkerID       frameModel.WorkerID        `json:"worker_id"`
	ConfigOutdated bool                       `json:"config_outdated"`
	Status         *dmpkg.QueryStatusResponse `json:"status"`
	Duration       time.Duration              `json:"duration"`
}

TaskStatus represents status of a task

type WorkerAgent

type WorkerAgent interface {
	// for create worker
	CreateWorker(
		workerType framework.WorkerType,
		config framework.WorkerConfig,
		opts ...framework.CreateWorkerOpt,
	) (frameModel.WorkerID, error)
}

WorkerAgent defines an interface for create worker.

type WorkerManager

type WorkerManager struct {
	*ticker.DefaultTicker
	// contains filtered or unexported fields
}

WorkerManager checks and schedules workers.

func NewWorkerManager

func NewWorkerManager(
	jobID string,
	initWorkerStatus []runtime.WorkerStatus,
	jobStore *metadata.JobStore,
	unitStore *metadata.UnitStateStore,
	workerAgent WorkerAgent,
	messageAgent dmpkg.MessageAgent,
	checkpointAgent CheckpointAgent,
	pLogger *zap.Logger,
	storageType resModel.ResourceType,
) *WorkerManager

NewWorkerManager creates a new WorkerManager instance

func (*WorkerManager) TickImpl

func (wm *WorkerManager) TickImpl(ctx context.Context) error

TickImpl remove offline workers. TickImpl stop unneeded workers. TickImpl create new workers if needed.

func (*WorkerManager) UpdateWorkerStatus

func (wm *WorkerManager) UpdateWorkerStatus(workerStatus runtime.WorkerStatus)

UpdateWorkerStatus is called when receive worker status.

func (*WorkerManager) WorkerStatus

func (wm *WorkerManager) WorkerStatus() map[string]runtime.WorkerStatus

WorkerStatus return the worker status.

Directories

Path Synopsis
Package openapi provides primitives to interact with the openapi HTTP API.
Package openapi provides primitives to interact with the openapi HTTP API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL