Documentation
¶
Index ¶
- Variables
- func ClearHandledPointer(tctx *tcontext.Context, h dbOperator) error
- func ClearOperationLog(tctx *tcontext.Context, h dbOperator) error
- func ClearTaskMeta(tctx *tcontext.Context, h dbOperator) error
- func CloneTaskLog(log *pb.TaskLog) *pb.TaskLog
- func CloneTaskMeta(task *pb.TaskMeta) *pb.TaskMeta
- func DecodeTaskLogKey(key []byte) (int64, error)
- func DecodeTaskMetaKey(key []byte) string
- func DeleteTaskMeta(h dbOperator, name string) error
- func EncodeTaskLogKey(id int64) []byte
- func EncodeTaskMetaKey(name string) []byte
- func GetTaskMeta(h dbOperator, name string) (*pb.TaskMeta, error)
- func InitConditionHub(w *Worker)
- func InitStatus(lis net.Listener)
- func LoadTaskMetas(h dbOperator) (map[string]*pb.TaskMeta, error)
- func SetTaskMeta(h dbOperator, task *pb.TaskMeta) error
- func VerifyTaskMeta(task *pb.TaskMeta) error
- type CheckerConfig
- type ConditionHub
- type Config
- type KVConfig
- type Logger
- func (logger *Logger) Append(h dbOperator, opLog *pb.TaskLog) error
- func (logger *Logger) ForwardTo(h dbOperator, ID int64) error
- func (logger *Logger) GC(ctx context.Context, h dbOperator)
- func (logger *Logger) GetTaskLog(h dbOperator, id int64) (*pb.TaskLog, error)
- func (logger *Logger) Initial(h dbOperator) ([]*pb.TaskLog, error)
- func (logger *Logger) MarkAndForwardLog(h dbOperator, opLog *pb.TaskLog) error
- type Meta
- type Metadata
- func (meta *Metadata) AppendOperation(subTask *pb.TaskMeta) (int64, error)
- func (meta *Metadata) Close()
- func (meta *Metadata) GetTask(name string) (task *pb.TaskMeta)
- func (meta *Metadata) GetTaskLog(opLogID int64) (*pb.TaskLog, error)
- func (meta *Metadata) LoadTaskMeta() map[string]*pb.TaskMeta
- func (meta *Metadata) MarkOperation(log *pb.TaskLog) error
- func (meta *Metadata) PeekLog() (log *pb.TaskLog)
- type Pointer
- type RelayHolder
- type ResumeStrategy
- type Server
- func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) Close()
- func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error
- func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)
- func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
- func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)
- func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)
- func (s *Server) QueryTaskOperation(ctx context.Context, req *pb.QueryTaskOperationRequest) (*pb.QueryTaskOperationResponse, error)
- func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)
- func (s *Server) Start() error
- func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.OperateSubTaskResponse, error)
- func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
- type SubTask
- func (st *SubTask) CheckUnit() bool
- func (st *SubTask) ClearDDLInfo()
- func (st *SubTask) ClearDDLLockInfo()
- func (st *SubTask) Close()
- func (st *SubTask) CurrUnit() unit.Unit
- func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo
- func (st *SubTask) Error() interface{}
- func (st *SubTask) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error
- func (st *SubTask) GetDDLInfo() *pb.DDLInfo
- func (st *SubTask) Init() error
- func (st *SubTask) Pause() error
- func (st *SubTask) PrevUnit() unit.Unit
- func (st *SubTask) Result() *pb.ProcessResult
- func (st *SubTask) Resume() error
- func (st *SubTask) Run()
- func (st *SubTask) SaveDDLInfo(info *pb.DDLInfo) error
- func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error
- func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
- func (st *SubTask) Stage() pb.Stage
- func (st *SubTask) Status() interface{}
- func (st *SubTask) StatusJSON() string
- func (st *SubTask) Update(cfg *config.SubTaskConfig) error
- func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error
- type TaskStatusChecker
- type Worker
- func (w *Worker) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) error
- func (w *Worker) Close()
- func (w *Worker) Error(stName string) []*pb.SubTaskError
- func (w *Worker) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error
- func (w *Worker) FetchDDLInfo(ctx context.Context) *pb.DDLInfo
- func (w *Worker) ForbidPurge() (bool, string)
- func (w *Worker) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
- func (w *Worker) MigrateRelay(ctx context.Context, binlogName string, binlogPos uint32) error
- func (w *Worker) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) error
- func (w *Worker) OperateSubTask(name string, op pb.TaskOp) (int64, error)
- func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error
- func (w *Worker) QueryConfig(ctx context.Context) (*Config, error)
- func (w *Worker) QueryError(name string) []*pb.SubTaskError
- func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus
- func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error
- func (w *Worker) Start()
- func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) (int64, error)
- func (w *Worker) Status(stName string) []*pb.SubTaskStatus
- func (w *Worker) StatusJSON(stName string) string
- func (w *Worker) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
- func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error
- func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) (int64, error)
Constants ¶
This section is empty.
Variables ¶
var ( // GCBatchSize is batch size for gc process GCBatchSize = 1024 // GCInterval is the interval to gc GCInterval = time.Hour )
var ( DefaultCheckInterval = 5 * time.Second DefaultBackoffRollback = 5 * time.Minute DefaultBackoffMin = 1 * time.Second DefaultBackoffMax = 5 * time.Minute DefaultBackoffJitter = true DefaultBackoffFactor float64 = 2 )
Backoff related constants
var HandledPointerKey = []byte("!DM!handledPointer")
HandledPointerKey is key of HandledPointer which point to the last handled log
var NewRelayHolder = NewRealRelayHolder
NewRelayHolder is relay holder initializer it can be used for testing
var NewTaskStatusChecker = NewRealTaskStatusChecker
NewTaskStatusChecker is a TaskStatusChecker initializer
var SampleConfigFile string
SampleConfigFile is sample config file of dm-worker later we can read it from dm/worker/dm-worker.toml and assign it to SampleConfigFile while we build dm-worker
var ( // TaskLogPrefix is prefix of task log key TaskLogPrefix = []byte("!DM!TaskLog") )
var TaskMetaPrefix = []byte("!DM!TaskMeta")
TaskMetaPrefix is prefix of task meta key
Functions ¶
func ClearHandledPointer ¶
ClearHandledPointer clears the handled pointer in kv DB.
func ClearOperationLog ¶
ClearOperationLog clears the task operation log.
func ClearTaskMeta ¶
ClearTaskMeta clears all task meta in kv DB.
func CloneTaskLog ¶
CloneTaskLog returns a task log copy
func CloneTaskMeta ¶
CloneTaskMeta returns a task meta copy
func DecodeTaskLogKey ¶
DecodeTaskLogKey decodes task log key and returns its log ID
func DecodeTaskMetaKey ¶
DecodeTaskMetaKey decodes task meta key and returns task name
func DeleteTaskMeta ¶
DeleteTaskMeta delete task meta from kv DB
func EncodeTaskLogKey ¶
EncodeTaskLogKey encodes log ID into a task log key
func EncodeTaskMetaKey ¶
EncodeTaskMetaKey encodes take name into a task meta key
func GetTaskMeta ¶
GetTaskMeta returns task meta by given name
func InitConditionHub ¶
func InitConditionHub(w *Worker)
InitConditionHub inits the singleton instance of ConditionHub
func LoadTaskMetas ¶
LoadTaskMetas loads all task metas from kv db
func SetTaskMeta ¶
SetTaskMeta saves task meta into kv db
func VerifyTaskMeta ¶
VerifyTaskMeta verify legality of take meta
Types ¶
type CheckerConfig ¶
type CheckerConfig struct { CheckEnable bool `toml:"check-enable" json:"check-enable"` BackoffRollback duration `toml:"backoff-rollback" json:"backoff-rollback"` BackoffMax duration `toml:"backoff-max" json:"backoff-max"` // unexpose config CheckInterval duration `json:"-"` BackoffMin duration `json:"-"` BackoffJitter bool `json:"-"` BackoffFactor float64 `json:"-"` }
CheckerConfig is configuration used for TaskStatusChecker
type ConditionHub ¶
type ConditionHub struct {
// contains filtered or unexported fields
}
ConditionHub holds a DM-worker and it is used for wait condition detection
func GetConditionHub ¶
func GetConditionHub() *ConditionHub
GetConditionHub returns singleton instance of ConditionHub
type Config ¶
type Config struct { LogLevel string `toml:"log-level" json:"log-level"` LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` WorkerAddr string `toml:"worker-addr" json:"worker-addr"` EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `toml:"relay-dir" json:"relay-dir"` MetaDir string `toml:"meta-dir" json:"meta-dir"` ServerID uint32 `toml:"server-id" json:"server-id"` Flavor string `toml:"flavor" json:"flavor"` Charset string `toml:"charset" json:"charset"` // relay synchronous starting point (if specified) RelayBinLogName string `toml:"relay-binlog-name" json:"relay-binlog-name"` RelayBinlogGTID string `toml:"relay-binlog-gtid" json:"relay-binlog-gtid"` SourceID string `toml:"source-id" json:"source-id"` From config.DBConfig `toml:"from" json:"from"` // config items for purger Purge purger.Config `toml:"purge" json:"purge"` // config items for task status checker Checker CheckerConfig `toml:"checker" json:"checker"` // config items for tracer Tracer tracing.Config `toml:"tracer" json:"tracer"` ConfigFile string `json:"config-file"` // contains filtered or unexported fields }
Config is the configuration.
func (*Config) DecryptPassword ¶
DecryptPassword returns a decrypted config replica in config
func (*Config) UpdateConfigFile ¶
UpdateConfigFile write configure to local file
type KVConfig ¶
type KVConfig struct { BlockCacheCapacity int `toml:"block-cache-capacity" json:"block-cache-capacity"` BlockRestartInterval int `toml:"block-restart-interval" json:"block-restart-interval"` BlockSize int `toml:"block-size" json:"block-size"` CompactionL0Trigger int `toml:"compaction-L0-trigger" json:"compaction-L0-trigger"` CompactionTableSize int `toml:"compaction-table-size" json:"compaction-table-size"` CompactionTotalSize int `toml:"compaction-total-size" json:"compaction-total-size"` CompactionTotalSizeMultiplier float64 `toml:"compaction-total-size-multiplier" json:"compaction-total-size-multiplier"` WriteBuffer int `toml:"write-buffer" json:"write-buffer"` WriteL0PauseTrigger int `toml:"write-L0-pause-trigger" json:"write-L0-pause-trigger"` WriteL0SlowdownTrigger int `toml:"write-L0-slowdown-trigger" json:"write-L0-slowdown-trigger"` }
KVConfig is the configuration of goleveldb
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger manage task operation logs
func (*Logger) ForwardTo ¶
ForwardTo forward handled pointer to specified ID location not thread safe
func (*Logger) GetTaskLog ¶
GetTaskLog returns task log by given log ID
type Meta ¶
type Meta struct {
SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks" toml:"sub-tasks"`
}
Meta information contains (deprecated, instead of proto.WorkMeta) * sub-task
func (*Meta) DecodeFile ¶
DecodeFile loads and decodes config from file
type Metadata ¶
type Metadata struct { sync.RWMutex // we need to ensure only a thread can access to `metaDB` at a time // contains filtered or unexported fields }
Metadata stores metadata and log of task it also provides logger feature * append log * forward to specified log location
func NewMetadata ¶
NewMetadata returns a metadata object
func (*Metadata) AppendOperation ¶
AppendOperation appends operation into task log
func (*Metadata) GetTaskLog ¶
GetTaskLog returns task log by give log ID
func (*Metadata) LoadTaskMeta ¶
LoadTaskMeta returns meta of all tasks
func (*Metadata) MarkOperation ¶
MarkOperation marks operation result
type Pointer ¶
type Pointer struct {
Location int64
}
Pointer is a logic pointer that point to a location of log
func LoadHandledPointer ¶
LoadHandledPointer loads handled pointer value from kv DB
func (*Pointer) MarshalBinary ¶
MarshalBinary never return not nil err now
func (*Pointer) UnmarshalBinary ¶
UnmarshalBinary implement encoding.BinaryMarshal
type RelayHolder ¶
type RelayHolder interface { // Init initializes the holder Init(interceptors []purger.PurgeInterceptor) (purger.Purger, error) // Start starts run the relay Start() // Close closes the holder Close() // Status returns relay unit's status Status() *pb.RelayStatus // Stage returns the stage of the relay Stage() pb.Stage // Error returns relay unit's status Error() *pb.RelayError // SwitchMaster requests relay unit to switch master server SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error // Operate operates relay unit Operate(ctx context.Context, req *pb.OperateRelayRequest) error // Result returns the result of the relay Result() *pb.ProcessResult // Update updates relay config online Update(ctx context.Context, cfg *Config) error // Migrate resets binlog name and binlog position for relay unit Migrate(ctx context.Context, binlogName string, binlogPos uint32) error }
RelayHolder for relay unit
func NewDummyRelayHolder ¶
func NewDummyRelayHolder(cfg *Config) RelayHolder
NewDummyRelayHolder creates a new RelayHolder
func NewDummyRelayHolderWithInitError ¶
func NewDummyRelayHolderWithInitError(cfg *Config) RelayHolder
NewDummyRelayHolderWithInitError creates a new RelayHolder with init error
func NewRealRelayHolder ¶
func NewRealRelayHolder(cfg *Config) RelayHolder
NewRealRelayHolder creates a new RelayHolder
type ResumeStrategy ¶
type ResumeStrategy int
ResumeStrategy represents what we can do when we meet a paused task in task status checker
const ( // When a task is not in paused state, or paused by manually, or we can't get enough information from worker // to determine whether this task is paused because of some error, we will apply ResumeIgnore strategy, and // do nothing with this task in this check round. ResumeIgnore ResumeStrategy = iota + 1 // When checker detects a paused task can recover synchronization by resume, but its last auto resume // duration is less than backoff waiting time, we will apply ResumeSkip strategy, and skip auto resume // for this task in this check round. ResumeSkip // When checker detects a task is paused because of some un-resumable error, such as paused because of // executing incompatible DDL to downstream, we will apply ResumeNoSense strategy ResumeNoSense // ResumeDispatch means we will dispatch an auto resume operation in this check round for the paused task ResumeDispatch )
resume strategies, in each round of `check`, the checker will apply one of the following strategies to a given task based on its `state`, `result` from `SubTaskStatus` and backoff information recored in task status checker. operation of different strategies: ResumeIgnore:
- check duration since latestPausedTime, if larger than backoff rollback, rollback backoff once
ResumeNoSense:
- update latestPausedTime
- update latestBlockTime
ResumeSkip:
- update latestPausedTime
ResumeDispatch:
- update latestPausedTime
- dispatch auto resume task
- if step2 successes, update latestResumeTime, forward backoff
func (ResumeStrategy) String ¶
func (bs ResumeStrategy) String() string
String implements fmt.Stringer interface
type Server ¶
Server accepts RPC requests dispatches requests to worker sends responses to RPC client
func (*Server) BreakDDLLock ¶
func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)
BreakDDLLock implements WorkerServer.BreakDDLLock
func (*Server) Close ¶
func (s *Server) Close()
Close close the RPC server, this function can be called multiple times
func (*Server) ExecuteDDL ¶
func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)
ExecuteDDL implements WorkerServer.ExecuteDDL
func (*Server) FetchDDLInfo ¶
func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error
FetchDDLInfo implements WorkerServer.FetchDDLInfo we do ping-pong send-receive on stream for DDL (lock) info if error occurred in Send / Recv, just retry in client
func (*Server) HandleSQLs ¶
func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) (*pb.CommonWorkerResponse, error)
HandleSQLs implements WorkerServer.HandleSQLs
func (*Server) MigrateRelay ¶
func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)
MigrateRelay migrate relay to original binlog pos
func (*Server) OperateRelay ¶
func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)
OperateRelay implements WorkerServer.OperateRelay
func (*Server) OperateSubTask ¶
func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
OperateSubTask implements WorkerServer.OperateSubTask
func (*Server) PurgeRelay ¶
func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)
PurgeRelay implements WorkerServer.PurgeRelay
func (*Server) QueryError ¶
func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)
QueryError implements WorkerServer.QueryError
func (*Server) QueryStatus ¶
func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)
QueryStatus implements WorkerServer.QueryStatus
func (*Server) QueryTaskOperation ¶
func (s *Server) QueryTaskOperation(ctx context.Context, req *pb.QueryTaskOperationRequest) (*pb.QueryTaskOperationResponse, error)
QueryTaskOperation implements WorkerServer.QueryTaskOperation
func (*Server) QueryWorkerConfig ¶
func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)
QueryWorkerConfig return worker config worker config is defined in worker directory now, to avoid circular import, we only return db config
func (*Server) StartSubTask ¶
func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.OperateSubTaskResponse, error)
StartSubTask implements WorkerServer.StartSubTask
func (*Server) SwitchRelayMaster ¶
func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)
SwitchRelayMaster implements WorkerServer.SwitchRelayMaster
func (*Server) UpdateRelayConfig ¶
func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)
UpdateRelayConfig updates config for relay and (dm-worker)
func (*Server) UpdateSubTask ¶
func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
UpdateSubTask implements WorkerServer.UpdateSubTask
type SubTask ¶
type SubTask struct { sync.RWMutex // only support sync one DDL lock one time, refine if needed DDLInfo chan *pb.DDLInfo // DDL info pending to sync // contains filtered or unexported fields }
SubTask represents a sub task of data migration
func NewSubTask ¶
func NewSubTask(cfg *config.SubTaskConfig) *SubTask
NewSubTask creates a new SubTask
func NewSubTaskWithStage ¶
func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage) *SubTask
NewSubTaskWithStage creates a new SubTask with stage
func (*SubTask) ClearDDLInfo ¶
func (st *SubTask) ClearDDLInfo()
ClearDDLInfo clears current CacheDDLInfo.
func (*SubTask) ClearDDLLockInfo ¶
func (st *SubTask) ClearDDLLockInfo()
ClearDDLLockInfo clears current DDLLockInfo
func (*SubTask) DDLLockInfo ¶
func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo
DDLLockInfo returns current DDLLockInfo, maybe nil
func (*SubTask) Error ¶
func (st *SubTask) Error() interface{}
Error returns the error of the current sub task
func (*SubTask) ExecuteDDL ¶
ExecuteDDL requests current unit to execute a DDL
func (*SubTask) GetDDLInfo ¶
GetDDLInfo returns current CacheDDLInfo.
func (*SubTask) Result ¶
func (st *SubTask) Result() *pb.ProcessResult
Result returns the result of the sub task
func (*SubTask) SaveDDLInfo ¶
SaveDDLInfo saves a CacheDDLInfo.
func (*SubTask) SaveDDLLockInfo ¶
func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error
SaveDDLLockInfo saves a DDLLockInfo
func (*SubTask) SetSyncerSQLOperator ¶
func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
SetSyncerSQLOperator sets an operator to syncer.
func (*SubTask) Status ¶
func (st *SubTask) Status() interface{}
Status returns the status of the current sub task
func (*SubTask) StatusJSON ¶
StatusJSON returns the status of the current sub task as json string
func (*SubTask) Update ¶
func (st *SubTask) Update(cfg *config.SubTaskConfig) error
Update update the sub task's config
func (*SubTask) UpdateFromConfig ¶
func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error
UpdateFromConfig updates config for `From`
type TaskStatusChecker ¶
type TaskStatusChecker interface { // Init initializes the checker Init() error // Start starts the checker Start() // Close closes the checker Close() }
TaskStatusChecker is an interface that defines how we manage task status
func NewRealTaskStatusChecker ¶
func NewRealTaskStatusChecker(cfg CheckerConfig, w *Worker) TaskStatusChecker
NewRealTaskStatusChecker creates a new realTaskStatusChecker instance
type Worker ¶
type Worker struct { // ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this) sync.RWMutex // contains filtered or unexported fields }
Worker manages sub tasks and process units for data migration
func (*Worker) BreakDDLLock ¶
BreakDDLLock breaks current blocking DDL lock and/or remove current DDLLockInfo
func (*Worker) Error ¶
func (w *Worker) Error(stName string) []*pb.SubTaskError
Error returns the error information of the worker (and sub tasks) if stName is empty, all sub task's error information will be returned
func (*Worker) ExecuteDDL ¶
ExecuteDDL executes (or ignores) DDL (in sharding DDL lock, requested by dm-master)
func (*Worker) FetchDDLInfo ¶
FetchDDLInfo fetches all sub tasks' DDL info which pending to sync
func (*Worker) ForbidPurge ¶
ForbidPurge implements PurgeInterceptor.ForbidPurge
func (*Worker) HandleSQLs ¶
HandleSQLs implements Handler.HandleSQLs.
func (*Worker) MigrateRelay ¶
MigrateRelay migrate relay unit
func (*Worker) OperateRelay ¶
OperateRelay operates relay unit
func (*Worker) OperateSubTask ¶
OperateSubTask stop/resume/pause sub task
func (*Worker) PurgeRelay ¶
PurgeRelay purges relay log files
func (*Worker) QueryConfig ¶
QueryConfig returns worker's config
func (*Worker) QueryError ¶
func (w *Worker) QueryError(name string) []*pb.SubTaskError
QueryError query worker's sub tasks' error
func (*Worker) QueryStatus ¶
func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus
QueryStatus query worker's sub tasks' status
func (*Worker) RecordDDLLockInfo ¶
func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error
RecordDDLLockInfo records the current DDL lock info which pending to sync
func (*Worker) StartSubTask ¶
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) (int64, error)
StartSubTask creates a sub task an run it
func (*Worker) Status ¶
func (w *Worker) Status(stName string) []*pb.SubTaskStatus
Status returns the status of the worker (and sub tasks) if stName is empty, all sub task's status will be returned
func (*Worker) StatusJSON ¶
StatusJSON returns the status of the worker as json string
func (*Worker) SwitchRelayMaster ¶
SwitchRelayMaster switches relay unit's master server
func (*Worker) UpdateRelayConfig ¶
UpdateRelayConfig update subTask ans relay unit configure online
func (*Worker) UpdateSubTask ¶
func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) (int64, error)
UpdateSubTask update config for a sub task