worker

package
v0.0.0-...-8439122 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 58 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewRelayHolder = NewRealRelayHolder

NewRelayHolder is relay holder initializer it can be used for testing.

View Source
var NewSubTask = NewRealSubTask

NewSubTask is subtask initializer it can be used for testing.

View Source
var NewTaskStatusChecker = NewRealTaskStatusChecker

NewTaskStatusChecker is a TaskStatusChecker initializer.

View Source
var SampleConfigFile string

SampleConfigFile is sample config file of dm-worker.

Functions

func GetJoinURLs

func GetJoinURLs(addrs string) []string

GetJoinURLs gets the endpoints from the join address.

func InitConditionHub

func InitConditionHub(w *SourceWorker)

InitConditionHub inits the singleton instance of ConditionHub.

func InitStatus

func InitStatus(lis net.Listener)

InitStatus initializes the HTTP status server.

func RegistryMetrics

func RegistryMetrics()

RegistryMetrics registries metrics for worker.

Types

type AutoResumeInfo

type AutoResumeInfo struct {
	Backoff          *backoff.Backoff
	LatestPausedTime time.Time
	LatestBlockTime  time.Time
	LatestResumeTime time.Time
}

AutoResumeInfo contains some Time and Backoff that are related to auto resume. This structure is exposed for DM as library.

func (*AutoResumeInfo) CheckResumeSubtask

func (i *AutoResumeInfo) CheckResumeSubtask(
	stStatus *pb.SubTaskStatus,
	backoffRollback time.Duration,
) (strategy ResumeStrategy)

CheckResumeSubtask updates info and returns ResumeStrategy for a subtask. When ResumeDispatch and the subtask is successfully resumed at caller, caller should update LatestResumeTime and backoff. This function is exposed for DM as library.

type ConditionHub

type ConditionHub struct {
	// contains filtered or unexported fields
}

ConditionHub holds a DM-worker and it is used for wait condition detection.

func GetConditionHub

func GetConditionHub() *ConditionHub

GetConditionHub returns singleton instance of ConditionHub.

type Config

type Config struct {
	Name string `toml:"name" json:"name"`

	LogLevel  string `toml:"log-level" json:"log-level"`
	LogFile   string `toml:"log-file" json:"log-file"`
	LogFormat string `toml:"log-format" json:"log-format"`
	LogRotate string `toml:"log-rotate" json:"log-rotate"`

	// RedactInfoLog indicates that whether to enable the log redaction. It can be the following values:
	// - false: disable redact log.
	// - true: enable redact log, which will replace the sensitive information with "?".
	RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"`

	Join          string `toml:"join" json:"join" `
	WorkerAddr    string `toml:"worker-addr" json:"worker-addr"`
	AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

	ConfigFile string `toml:"config-file" json:"config-file"`
	// TODO: in the future dm-workers should share a same ttl from dm-master
	KeepAliveTTL      int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
	RelayKeepAliveTTL int64 `toml:"relay-keepalive-ttl" json:"relay-keepalive-ttl"`

	RelayDir string `toml:"relay-dir" json:"relay-dir"`

	// tls config
	security.Security
	// contains filtered or unexported fields
}

Config is the configuration.

func NewConfig

func NewConfig() *Config

NewConfig creates a new base config for worker.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone clones a config.

func (*Config) Parse

func (c *Config) Parse(arguments []string) error

Parse parses flag definitions from the argument list.

func (*Config) String

func (c *Config) String() string

func (*Config) Toml

func (c *Config) Toml() (string, error)

Toml returns TOML format representation of config.

type RelayHolder

type RelayHolder interface {
	// Init initializes the holder
	Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error)
	// Start starts run the relay
	Start()
	// Close closes the holder
	Close()
	// Status returns relay unit's status
	Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus
	// Stage returns the stage of the relay
	Stage() pb.Stage
	// Error returns relay unit's status
	Error() *pb.RelayError
	// Operate operates relay unit
	Operate(ctx context.Context, op pb.RelayOp) error
	// Result returns the result of the relay
	Result() *pb.ProcessResult
	// Update updates relay config online
	Update(ctx context.Context, cfg *config.SourceConfig) error
	// Relay returns relay object
	Relay() relay.Process
}

RelayHolder for relay unit.

func NewDummyRelayHolder

func NewDummyRelayHolder(cfg *config.SourceConfig) RelayHolder

NewDummyRelayHolder creates a new RelayHolder.

func NewDummyRelayHolderWithInitError

func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder

NewDummyRelayHolderWithInitError creates a new RelayHolder with init error.

func NewDummyRelayHolderWithRelayBinlog

func NewDummyRelayHolderWithRelayBinlog(cfg *config.SourceConfig, relayBinlog string) RelayHolder

NewDummyRelayHolderWithRelayBinlog creates a new RelayHolder with relayBinlog in relayStatus.

func NewRealRelayHolder

func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder

NewRealRelayHolder creates a new RelayHolder.

type ResumeStrategy

type ResumeStrategy int

ResumeStrategy represents what we can do when we meet a paused task in task status checker.

const (
	// When a task is not in paused state, or paused by manually, or we can't get enough information from worker
	// to determine whether this task is paused because of some error, we will apply ResumeIgnore strategy, and
	// do nothing with this task in this check round.
	ResumeIgnore ResumeStrategy = iota + 1
	// When checker detects a paused task can recover synchronization by resume, but its last auto resume
	// duration is less than backoff waiting time, we will apply ResumeSkip strategy, and skip auto resume
	// for this task in this check round.
	ResumeSkip
	// When checker detects a task is paused because of some un-resumable error, such as paused because of
	// executing incompatible DDL to downstream, we will apply ResumeNoSense strategy.
	ResumeNoSense
	// ResumeDispatch means we will dispatch an auto resume operation in this check round for the paused task.
	ResumeDispatch
)

resume strategies, in each round of `check`, the checker will apply one of the following strategies to a given task based on its `state`, `result` from `SubTaskStatus` and backoff information recored in task status checker. operation of different strategies: ResumeIgnore:

  1. check duration since latestPausedTime, if larger than backoff rollback, rollback backoff once

ResumeNoSense:

  1. update latestPausedTime
  2. update latestBlockTime

ResumeSkip:

  1. update latestPausedTime

ResumeDispatch:

  1. update latestPausedTime
  2. dispatch auto resume task
  3. if step2 successes, update latestResumeTime, forward backoff

func (ResumeStrategy) String

func (bs ResumeStrategy) String() string

String implements fmt.Stringer interface.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server accepts RPC requests dispatches requests to worker sends responses to RPC client.

func NewServer

func NewServer(cfg *Config) *Server

NewServer creates a new Server.

func (*Server) CheckSubtasksCanUpdate

CheckSubtasksCanUpdate check if input subtask cfg can be updated.

func (*Server) Close

func (s *Server) Close()

Close closes the RPC server, this function can be called multiple times.

func (*Server) GetValidatorError

func (*Server) GetWorkerCfg

func (s *Server) GetWorkerCfg(ctx context.Context, req *pb.GetWorkerCfgRequest) (*pb.GetWorkerCfgResponse, error)

GetWorkerCfg get worker config.

func (*Server) GetWorkerValidatorStatus

func (s *Server) GetWorkerValidatorStatus(ctx context.Context, req *pb.GetValidationStatusRequest) (*pb.GetValidationStatusResponse, error)

func (*Server) HandleError

HandleError handle error.

func (*Server) JoinMaster

func (s *Server) JoinMaster(endpoints []string) error

JoinMaster let dm-worker join the cluster with the specified master endpoints.

func (*Server) KeepAlive

func (s *Server) KeepAlive()

KeepAlive attempts to keep the lease of the server alive forever.

func (*Server) OperateSchema

OperateSchema operates schema for an upstream table.

func (*Server) OperateV1Meta

func (s *Server) OperateV1Meta(ctx context.Context, req *pb.OperateV1MetaRequest) (*pb.OperateV1MetaResponse, error)

OperateV1Meta implements WorkerServer.OperateV1Meta.

func (*Server) PurgeRelay

func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)

PurgeRelay implements WorkerServer.PurgeRelay.

func (*Server) QueryStatus

func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)

QueryStatus implements WorkerServer.QueryStatus.

func (*Server) Start

func (s *Server) Start() error

Start starts to serving. this function should only exit when can't dail DM-master, for other errors it should not exit.

func (*Server) UpdateKeepAliveTTL

func (s *Server) UpdateKeepAliveTTL(newTTL int64)

UpdateKeepAliveTTL updates keepalive key with new lease TTL in place, to avoid watcher observe a DELETE event.

func (*Server) UpdateValidator

type SourceWorker

type SourceWorker struct {
	// ensure no other operation can be done when closing (we can use `WaitGroup`/`Context` to archive this)
	// TODO: check what does it guards. Now it's used to guard relayHolder and relayPurger (maybe subTaskHolder?) since
	// query-status maybe access them when closing/disable functionalities
	// This lock is used to guards source worker's source config and subtask holder(subtask configs)
	sync.RWMutex
	// contains filtered or unexported fields
}

SourceWorker manages a source(upstream) which is mainly related to subtasks and relay.

func NewSourceWorker

func NewSourceWorker(
	cfg *config.SourceConfig,
	etcdClient *clientv3.Client,
	name string,
	relayDir string,
) (w *SourceWorker, err error)

NewSourceWorker creates a new SourceWorker. The functionality of relay and subtask is disabled by default, need call EnableRelay and EnableSubtask later.

func (*SourceWorker) CheckCfgCanUpdated

func (w *SourceWorker) CheckCfgCanUpdated(cfg *config.SubTaskConfig) error

CheckCfgCanUpdated check if current subtask config can be updated.

func (*SourceWorker) DisableHandleSubtasks

func (w *SourceWorker) DisableHandleSubtasks()

DisableHandleSubtasks disables the functionality of start/watch/handle subtasks.

func (*SourceWorker) DisableRelay

func (w *SourceWorker) DisableRelay()

DisableRelay disables the functionality of start/watch/handle relay. a source worker will disable relay by the reason of EnableRelay is no longer valid. The paths to DisableRelay are: - source config `enable-relay: true` no longer valid

  • when DM-worker Server watches a SourceBound change, which is to notify that source config has changed, and the worker has started relay by that bound
  • when the source worker is unbound and has started relay by that bound

- UpstreamRelayWorkerKeyAdapter no longer valid

  • when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change
  • when DM-worker Server fails watching and recovers from a snapshot

func (*SourceWorker) EnableHandleSubtasks

func (w *SourceWorker) EnableHandleSubtasks() error

EnableHandleSubtasks enables the functionality of start/watch/handle subtasks.

func (*SourceWorker) EnableRelay

func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error)

EnableRelay enables the functionality of start/watch/handle relay. According to relay schedule of DM-master, a source worker will enable relay in two scenarios: its bound source has `enable-relay: true` in config, or it has a UpstreamRelayWorkerKeyAdapter etcd KV. The paths to EnableRelay are: - source config `enable-relay: true`, which is checked in enableHandleSubtasks

  • when DM-worker Server.Start
  • when DM-worker Server watches a SourceBound change, which is to turn a free source worker to bound or notify a bound worker that source config has changed
  • when DM-worker Server fails watching and recovers from a snapshot

- UpstreamRelayWorkerKeyAdapter

  • when DM-worker Server.Start
  • when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change
  • when DM-worker Server fails watching and recovers from a snapshot

func (*SourceWorker) ForbidPurge

func (w *SourceWorker) ForbidPurge() (bool, string)

ForbidPurge implements PurgeInterceptor.ForbidPurge.

func (*SourceWorker) GetUnitAndSourceStatusJSON

func (w *SourceWorker) GetUnitAndSourceStatusJSON(stName string, sourceStatus *binlog.SourceStatus) string

GetUnitAndSourceStatusJSON returns the status of the worker and its unit as json string. This function will also cause every unit to print its status to log.

func (*SourceWorker) GetValidatorStatus

func (w *SourceWorker) GetValidatorStatus(taskName string) (*pb.ValidationStatus, error)

func (*SourceWorker) GetValidatorTableStatus

func (w *SourceWorker) GetValidatorTableStatus(taskName string, filterStatus pb.Stage) ([]*pb.ValidationTableStatus, error)

func (*SourceWorker) GetWorkerValidatorErr

func (w *SourceWorker) GetWorkerValidatorErr(taskName string, errState pb.ValidateErrorState) ([]*pb.ValidationError, error)

func (*SourceWorker) HandleError

func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (string, error)

HandleError handle worker error.

func (*SourceWorker) OperateSchema

func (w *SourceWorker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (schema string, err error)

OperateSchema operates schema for an upstream table.

func (*SourceWorker) OperateSubTask

func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error

OperateSubTask stop/resume/pause sub task.

func (*SourceWorker) OperateWorkerValidatorErr

func (w *SourceWorker) OperateWorkerValidatorErr(taskName string, op pb.ValidationErrOp, errID uint64, isAll bool) error

func (*SourceWorker) PurgeRelay

func (w *SourceWorker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error

PurgeRelay purges relay log files.

func (*SourceWorker) QueryStatus

func (w *SourceWorker) QueryStatus(ctx context.Context, name string) ([]*pb.SubTaskStatus, *pb.RelayStatus, error)

QueryStatus query worker's sub tasks' status. If relay enabled, also return source status.

func (*SourceWorker) Start

func (w *SourceWorker) Start()

Start starts working, but the functionalities should be turned on separately.

func (*SourceWorker) StartSubTask

func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage, validatorStage pb.Stage, needLock bool) error

StartSubTask creates a subtask and run it. TODO(ehco) rename this func.

func (*SourceWorker) Status

func (w *SourceWorker) Status(stName string, sourceStatus *binlog.SourceStatus) []*pb.SubTaskStatus

Status returns the status of the worker (and sub tasks) if stName is empty, all sub task's status will be returned.

func (*SourceWorker) Stop

func (w *SourceWorker) Stop(graceful bool)

Stop stops working and releases resources.

func (*SourceWorker) UpdateSubTask

func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskConfig, needLock bool) error

UpdateSubTask update config for a sub task.

func (*SourceWorker) UpdateWorkerValidator

func (w *SourceWorker) UpdateWorkerValidator(req *pb.UpdateValidationWorkerRequest) error

type SubTask

type SubTask struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SubTask represents a sub task of data migration.

func NewRealSubTask

func NewRealSubTask(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) *SubTask

NewRealSubTask creates a new SubTask.

func NewSubTaskWithStage

func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *clientv3.Client, workerName string) *SubTask

NewSubTaskWithStage creates a new SubTask with stage.

func (*SubTask) CheckUnit

func (st *SubTask) CheckUnit() bool

CheckUnit checks whether current unit is sync unit.

func (*SubTask) CheckUnitCfgCanUpdate

func (st *SubTask) CheckUnitCfgCanUpdate(cfg *config.SubTaskConfig) error

CheckUnitCfgCanUpdate checks this unit cfg can update.

func (*SubTask) Close

func (st *SubTask) Close()

Close stops the sub task.

func (*SubTask) CurrUnit

func (st *SubTask) CurrUnit() unit.Unit

CurrUnit returns current dm unit.

func (*SubTask) GetValidatorError

func (st *SubTask) GetValidatorError(errState pb.ValidateErrorState) ([]*pb.ValidationError, error)

func (*SubTask) GetValidatorStatus

func (st *SubTask) GetValidatorStatus() (*pb.ValidationStatus, error)

func (*SubTask) GetValidatorTableStatus

func (st *SubTask) GetValidatorTableStatus(filterStatus pb.Stage) ([]*pb.ValidationTableStatus, error)

func (*SubTask) HandleError

func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest, relay relay.Process) (string, error)

HandleError handle error for syncer unit.

func (*SubTask) Kill

func (st *SubTask) Kill()

Kill kill running unit and stop the sub task.

func (*SubTask) OperateSchema

func (st *SubTask) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (schema string, err error)

OperateSchema operates schema for an upstream table.

func (*SubTask) OperateValidatorError

func (st *SubTask) OperateValidatorError(op pb.ValidationErrOp, errID uint64, isAll bool) error

func (*SubTask) Pause

func (st *SubTask) Pause() error

Pause pauses a running subtask or a subtask paused by error.

func (*SubTask) PrevUnit

func (st *SubTask) PrevUnit() unit.Unit

PrevUnit returns dm previous unit.

func (*SubTask) Result

func (st *SubTask) Result() *pb.ProcessResult

Result returns the result of the sub task.

func (*SubTask) Resume

func (st *SubTask) Resume(relay relay.Process) error

Resume resumes the paused sub task TODO: similar to Run, refactor later.

func (*SubTask) Run

func (st *SubTask) Run(expectStage pb.Stage, expectValidatorStage pb.Stage, relay relay.Process)

Run runs the sub task. TODO: check concurrent problems.

func (*SubTask) SetCfg

func (st *SubTask) SetCfg(subTaskConfig config.SubTaskConfig)

func (*SubTask) ShardDDLOperation

func (st *SubTask) ShardDDLOperation() *pessimism.Operation

ShardDDLOperation returns the current shard DDL lock operation.

func (*SubTask) Stage

func (st *SubTask) Stage() pb.Stage

Stage returns the stage of the sub task.

func (*SubTask) StartValidator

func (st *SubTask) StartValidator(expect pb.Stage, startWithSubtask bool)

func (*SubTask) Status

func (st *SubTask) Status() interface{}

Status returns the status of the current sub task.

func (*SubTask) StatusJSON

func (st *SubTask) StatusJSON() string

StatusJSON returns the status of the current sub task as json string.

func (*SubTask) StopValidator

func (st *SubTask) StopValidator()

func (*SubTask) Update

func (st *SubTask) Update(ctx context.Context, cfg *config.SubTaskConfig) error

Update update the sub task's config.

func (*SubTask) UpdateValidator

func (st *SubTask) UpdateValidator(req *pb.UpdateValidationWorkerRequest) error

func (*SubTask) UpdateValidatorCfg

func (st *SubTask) UpdateValidatorCfg(validatorCfg config.ValidatorConfig)

type TaskStatusChecker

type TaskStatusChecker interface {
	// Init initializes the checker
	Init() error
	// Start starts the checker
	Start()
	// Close closes the checker
	Close()
}

TaskStatusChecker is an interface that defines how we manage task status.

func NewRealTaskStatusChecker

func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker

NewRealTaskStatusChecker creates a new realTaskStatusChecker instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL