ha

package
v0.0.0-...-19fc702 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LightningNotReady = "not-ready"
	LightningReady    = "ready"
	LightningFinished = "finished"
)

Simple status transition to make all lightning synchronized in physical import mode. When DM-master creates the task, it will write LightningNotReady for all subtasks. When DM-worker enters load unit, it will mark itself as LightningReady. When all DM-workers are ready, lightning can be started so its parallel import can work. When lightning is finished and its returned error is also resolved, it will mark itself as LightningFinished. When all DM-workers are finished, sync unit can be continued.

Variables

View Source
var (

	// KeepAliveUpdateCh is used to notify keepalive TTL changing, in order to let watcher not see a DELETE of old key.
	KeepAliveUpdateCh = make(chan int64, 10)
)

Functions

func ClearTestInfoOperation

func ClearTestInfoOperation(cli *clientv3.Client) error

ClearTestInfoOperation is used to clear all DM-HA relative etcd keys' information this function shouldn't be used in development environment.

func DelLoadTask

func DelLoadTask(cli *clientv3.Client, task, sourceID string) (int64, bool, error)

DelLoadTask dels the worker in load stage for the source of the subtask. k/v: (task, sourceID) -> worker.

func DelLoadTaskByTask

func DelLoadTaskByTask(cli *clientv3.Client, task string) (int64, bool, error)

DelLoadTaskByTask del the worker in load stage for the source by task.

func DeleteAllTaskCliArgs

func DeleteAllTaskCliArgs(cli *clientv3.Client, taskName string) error

DeleteAllTaskCliArgs deleted the command line arguments of this task.

func DeleteLightningStatusForTask

func DeleteLightningStatusForTask(cli *clientv3.Client, task string) (int64, error)

DeleteLightningStatusForTask deletes the status for all sources of the task.

func DeleteOpenAPITaskTemplate

func DeleteOpenAPITaskTemplate(cli *clientv3.Client, taskName string) error

DeleteOpenAPITaskTemplate deletes the openapi task config of task-name.

func DeleteRelayConfig

func DeleteRelayConfig(cli *clientv3.Client, workers ...string) (int64, error)

DeleteRelayConfig deletes the relay config for given workers.

func DeleteRelayStage

func DeleteRelayStage(cli *clientv3.Client, source string) (int64, error)

DeleteRelayStage deleted the relay stage of this source.

func DeleteSourceBound

func DeleteSourceBound(cli *clientv3.Client, workers ...string) (int64, error)

DeleteSourceBound deletes the bound relationship in etcd for the specified worker.

func DeleteSourceCfgRelayStageSourceBound

func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker string) (int64, error)

DeleteSourceCfgRelayStageSourceBound deletes the following data in one txn. - upstream source config. - relay stage. - source bound relationship.

func DeleteSubTaskCfgStage

func DeleteSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, validatorStages []Stage) (int64, error)

DeleteSubTaskCfgStage deletes the following data in one txn. - subtask config. - subtask stage. NOTE: golang can't use two `...` in the func, so use `[]` instead.

func DeleteSubTaskStage

func DeleteSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error)

DeleteSubTaskStage deletes the subtask stage.

func DeleteTaskCliArgs

func DeleteTaskCliArgs(cli *clientv3.Client, taskName string, sources []string) error

DeleteTaskCliArgs deleted the command line arguments of this task.

func DeleteWorkerInfoRelayConfig

func DeleteWorkerInfoRelayConfig(cli *clientv3.Client, worker string) (int64, error)

DeleteWorkerInfoRelayConfig deletes the specified DM-worker information and its relay config.

func GetAllLightningStatus

func GetAllLightningStatus(cli *clientv3.Client, task string) ([]string, error)

GetAllLightningStatus gets the status for all source of the task.

func GetAllLoadTask

func GetAllLoadTask(cli *clientv3.Client) (map[string]map[string]string, int64, error)

GetAllLoadTask gets all the worker which in load stage. k/v: (task, sourceID) -> worker-name.

func GetAllOpenAPITaskTemplate

func GetAllOpenAPITaskTemplate(cli *clientv3.Client) ([]*openapi.Task, error)

GetAllOpenAPITaskTemplate gets all openapi task config s.

func GetAllRelayConfig

func GetAllRelayConfig(cli *clientv3.Client) (map[string]map[string]struct{}, int64, error)

GetAllRelayConfig gets all source and its relay worker. k/v: source ID -> set(workers).

func GetAllRelayStage

func GetAllRelayStage(cli *clientv3.Client) (map[string]Stage, int64, error)

GetAllRelayStage gets all relay stages. k/v: source ID -> relay stage.

func GetAllSourceCfgBeforeV202

func GetAllSourceCfgBeforeV202(cli *clientv3.Client) (map[string]*config.SourceConfig, int64, error)

GetAllSourceCfgBeforeV202 gets all upstream source configs before v2.0.2. This func only use for config export command.

func GetAllSubTaskCfg

func GetAllSubTaskCfg(cli *clientv3.Client) (map[string]map[string]config.SubTaskConfig, int64, error)

GetAllSubTaskCfg gets all subtask configs. k/v: source ID -> task name -> subtask config.

func GetAllSubTaskStage

func GetAllSubTaskStage(cli *clientv3.Client) (map[string]map[string]Stage, int64, error)

GetAllSubTaskStage gets all subtask stages. k/v: source ID -> task name -> subtask stage.

func GetAllValidatorStage

func GetAllValidatorStage(cli *clientv3.Client) (map[string]map[string]Stage, int64, error)

func GetAllWorkerInfo

func GetAllWorkerInfo(cli *clientv3.Client) (map[string]WorkerInfo, int64, error)

GetAllWorkerInfo gets all DM-worker info in etcd currently. k/v: worker-name -> worker information.

func GetKeepAliveWorkers

func GetKeepAliveWorkers(cli *clientv3.Client) (map[string]WorkerEvent, int64, error)

GetKeepAliveWorkers gets current alive workers, and returns a map{workerName: WorkerEvent}, revision and error.

func GetLastSourceBounds

func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, error)

GetLastSourceBounds gets all last source bound relationship. Different with GetSourceBound, "last source bound" will not be deleted when worker offline.

func GetLoadTask

func GetLoadTask(cli *clientv3.Client, task, sourceID string) (string, int64, error)

GetLoadTask gets the worker which in load stage for the source of the subtask. k/v: (task, sourceID) -> worker-name.

func GetOpenAPITaskTemplate

func GetOpenAPITaskTemplate(cli *clientv3.Client, taskName string) (*openapi.Task, error)

GetOpenAPITaskTemplate gets the openapi task config of task-name.

func GetRelayConfig

func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig, int64, error)

GetRelayConfig returns the source config which the given worker need to pull relay log from etcd, with revision.

func GetSourceBound

func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound, int64, error)

GetSourceBound gets the source bound relationship for the specified DM-worker. if the bound relationship for the worker name not exist, return with `err == nil`. if the worker name is "", it will return all bound relationships as a map{worker-name: bound}. if the worker name is given, it will return a map{worker-name: bound} whose length is 1.

func GetSourceCfg

func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]*config.SourceConfig, int64, error)

GetSourceCfg gets upstream source configs. k/v: source ID -> source config. if the source config for the sourceID not exist, return with `err == nil`. if the source name is "", it will return all source configs as a map{sourceID: config}. if the source name is given, it will return a map{sourceID: config} whose length is 1.

func GetSubTaskCfg

func GetSubTaskCfg(cli *clientv3.Client, source, task string, rev int64) (map[string]config.SubTaskConfig, int64, error)

GetSubTaskCfg gets the subtask config of the specified source and task name. if the config for the source not exist, return with `err == nil` and `revision=0`. if task name is "", will return all the subtaskConfigs as a map{taskName: subtaskConfig} of the source if task name if given, will return a map{taskName: subtaskConfig} whose length is 1.

func GetSubTaskStage

func GetSubTaskStage(cli *clientv3.Client, source, task string) (map[string]Stage, int64, error)

GetSubTaskStage gets the subtask stage for the specified upstream source and task name. if the stage for the source and task name not exist, return with `err == nil` and `revision=0`. if task name is "", it will return all subtasks' stage as a map{task-name: stage} for the source. if task name is given, it will return a map{task-name: stage} whose length is 1.

func GetSubTaskStageConfig

func GetSubTaskStageConfig(cli *clientv3.Client, source string) (map[string]Stage, map[string]Stage, map[string]config.SubTaskConfig, int64, error)

GetSubTaskStageConfig gets source's subtask stages and configs at the same time source **must not be empty** return map{task name -> subtask stage}, map{task name -> validator stage}, map{task name -> subtask config}, revision, error.

func GetTaskCliArgs

func GetTaskCliArgs(cli *clientv3.Client, taskName, source string) (*config.TaskCliArgs, error)

GetTaskCliArgs gets the command line arguments for the specified task.

func GetValidatorStage

func GetValidatorStage(cli *clientv3.Client, source, task string, revision int64) (map[string]Stage, int64, error)

func KeepAlive

func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error

KeepAlive puts the join time of the workerName into etcd. this key will be kept in etcd until the worker is blocked or failed k/v: workerName -> join time.

func PutLightningNotReadyForAllSources

func PutLightningNotReadyForAllSources(cli *clientv3.Client, task string, sources []string) (int64, error)

PutLightningNotReadyForAllSources puts LightningNotReady for all sources of the subtask. This function should be called by DM-master.

func PutLightningStatus

func PutLightningStatus(cli *clientv3.Client, task, sourceID, status string) (int64, error)

PutLightningStatus puts the status for the source of the subtask. k/v: (task, sourceID) -> status. This function should be called by DM-worker.

func PutLoadTask

func PutLoadTask(cli *clientv3.Client, task, sourceID, worker string) (int64, error)

PutLoadTask puts the worker which load stage for the source of the subtask. k/v: (task, sourceID) -> worker. This function should often be called by DM-worker.

func PutOpenAPITaskTemplate

func PutOpenAPITaskTemplate(cli *clientv3.Client, task openapi.Task, overWrite bool) error

PutOpenAPITaskTemplate puts the openapi task config of task-name.

func PutRelayConfig

func PutRelayConfig(cli *clientv3.Client, source string, workers ...string) (int64, error)

PutRelayConfig puts the relay config for given workers. k/v: worker-name -> source-id. TODO: let caller wait until worker has enabled relay.

func PutRelayStage

func PutRelayStage(cli *clientv3.Client, stages ...Stage) (int64, error)

PutRelayStage puts the stage of the relay into etcd. k/v: sourceID -> the running stage of the relay.

func PutRelayStageRelayConfigSourceBound

func PutRelayStageRelayConfigSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error)

PutRelayStageRelayConfigSourceBound puts the following data in one txn. - relay stage. - relay config for a worker - source bound relationship.

func PutRelayStageSourceBound

func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error)

PutRelayStageSourceBound puts the following data in one txn. - relay stage. - source bound relationship.

func PutSourceBound

func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error)

PutSourceBound puts the bound relationship into etcd. k/v: worker-name -> bound relationship.

func PutSourceCfg

func PutSourceCfg(cli *clientv3.Client, cfg *config.SourceConfig) (int64, error)

PutSourceCfg puts the config of the upstream source into etcd. k/v: sourceID -> source config.

func PutSubTaskCfgStage

func PutSubTaskCfgStage(cli *clientv3.Client, cfgs []config.SubTaskConfig, stages []Stage, validatorStages []Stage) (int64, error)

PutSubTaskCfgStage puts the following data in one txn. - subtask config. - subtask stage. NOTE: golang can't use two `...` in the func, so use `[]` instead.

func PutSubTaskStage

func PutSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error)

PutSubTaskStage puts the stage of the subtask into etcd. k/v: sourceID, task -> the running stage of the subtask.

func PutTaskCliArgs

func PutTaskCliArgs(cli *clientv3.Client, taskName string, sources []string, args config.TaskCliArgs) error

PutTaskCliArgs puts TaskCliArgs into etcd.

func PutWorkerInfo

func PutWorkerInfo(cli *clientv3.Client, info WorkerInfo) (int64, error)

PutWorkerInfo puts the DM-worker info into etcd. k/v: worker-name -> worker information.

func ReplaceSourceBound

func ReplaceSourceBound(cli *clientv3.Client, source, oldWorker, newWorker string) (int64, error)

ReplaceSourceBound deletes an old bound and puts a new bound in one transaction, so a bound source will not become unbound because of failing halfway.

func UpdateOpenAPITaskTemplate

func UpdateOpenAPITaskTemplate(cli *clientv3.Client, task openapi.Task) error

UpdateOpenAPITaskTemplate updates the openapi task config by task-name.

func WatchLoadTask

func WatchLoadTask(ctx context.Context, cli *clientv3.Client, revision int64,
	outCh chan<- LoadTask, errCh chan<- error,
)

WatchLoadTask watches PUT & DELETE operations for worker in load stage. This function should often be called by DM-master.

func WatchRelayConfig

func WatchRelayConfig(ctx context.Context, cli *clientv3.Client,
	worker string, revision int64, outCh chan<- RelaySource, errCh chan<- error,
)

WatchRelayConfig watches PUT & DELETE operations for the relay relationship of the specified DM-worker. For the DELETE operations, it returns an nil source config.

func WatchRelayStage

func WatchRelayStage(ctx context.Context, cli *clientv3.Client,
	source string, revision int64, outCh chan<- Stage, errCh chan<- error,
)

WatchRelayStage watches PUT & DELETE operations for the relay stage. for the DELETE stage, it returns an empty stage.

func WatchSourceBound

func WatchSourceBound(ctx context.Context, cli *clientv3.Client, worker string, revision int64, outCh chan<- SourceBound, errCh chan<- error)

WatchSourceBound watches PUT & DELETE operations for the bound relationship of the specified DM-worker. For the DELETE operations, it returns an empty bound relationship. nolint:dupl

func WatchSubTaskStage

func WatchSubTaskStage(ctx context.Context, cli *clientv3.Client,
	source string, revision int64, outCh chan<- Stage, errCh chan<- error,
)

WatchSubTaskStage watches PUT & DELETE operations for the subtask stage. for the DELETE stage, it returns an empty stage.

func WatchValidatorStage

func WatchValidatorStage(ctx context.Context, cli *clientv3.Client,
	source string, rev int64, outCh chan<- Stage, errCh chan<- error,
)

func WatchWorkerEvent

func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outCh chan<- WorkerEvent, errCh chan<- error)

WatchWorkerEvent watches the online and offline of workers from etcd. this function will output the worker event to evCh, output the error to errCh.

Types

type LoadTask

type LoadTask struct {
	Task     string
	Source   string
	Worker   string
	IsDelete bool
}

LoadTask uses to watch load worker events.

type RelaySource

type RelaySource struct {
	Source string
	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the bound has been deleted in etcd.
	IsDeleted bool
	// record the etcd ModRevision of this bound
	Revision int64
}

RelaySource represents the bound relationship between the DM-worker instance and its upstream relay source.

type SourceBound

type SourceBound struct {
	Source string `json:"source"` // the source ID of the upstream.
	Worker string `json:"worker"` // the name of the bound DM-worker for the source.

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the bound has been deleted in etcd.
	IsDeleted bool `json:"-"`
	// record the etcd Revision of this bound
	Revision int64 `json:"-"`
}

SourceBound represents the bound relationship between the DM-worker instance and the upstream MySQL source.

func GetSourceBoundConfig

func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error)

GetSourceBoundConfig gets the source bound relationship and relative source config at the same time for the specified DM-worker. The index worker **must not be empty**: if source bound is empty, will return an empty sourceBound and an empty source config if source bound is not empty but sourceConfig is empty, will return an error if the source bound is different for over retryNum times, will return an error.

func NewSourceBound

func NewSourceBound(source, worker string) SourceBound

NewSourceBound creates a new SourceBound instance.

func (SourceBound) IsEmpty

func (b SourceBound) IsEmpty() bool

IsEmpty returns true when this bound has no value.

func (SourceBound) String

func (b SourceBound) String() string

String implements Stringer interface.

type Stage

type Stage struct {
	Expect pb.Stage `json:"expect"`         // the expectant stage.
	Source string   `json:"source"`         // the source ID of the upstream.
	Task   string   `json:"task,omitempty"` // the task name for subtask; empty for relay.

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the stage has been deleted in etcd.
	IsDeleted bool `json:"-"`
	// record the etcd Revision of this Stage
	Revision int64 `json:"-"`
}

Stage represents the running stage for a relay or subtask.

func GetRelayStage

func GetRelayStage(cli *clientv3.Client, source string) (Stage, int64, error)

GetRelayStage gets the relay stage for the specified upstream source. if the stage for the source not exist, return with `err == nil` and `revision=0`.

func NewRelayStage

func NewRelayStage(expect pb.Stage, source string) Stage

NewRelayStage creates a new Stage instance for relay.

func NewSubTaskStage

func NewSubTaskStage(expect pb.Stage, source, task string) Stage

NewSubTaskStage creates a new Stage instance for subtask.

func NewValidatorStage

func NewValidatorStage(expect pb.Stage, source, task string) Stage

func (Stage) IsEmpty

func (s Stage) IsEmpty() bool

IsEmpty returns true when this Stage has no value.

func (Stage) String

func (s Stage) String() string

String implements Stringer interface.

type WorkerEvent

type WorkerEvent struct {
	WorkerName string    `json:"worker-name"` // the worker name of the worker.
	JoinTime   time.Time `json:"join-time"`   // the time when worker start to keepalive with etcd

	// only used to report to the caller of the watcher, do not marsh it.
	// if it's true, it means the worker has been deleted in etcd.
	IsDeleted bool `json:"-"`
}

WorkerEvent represents the PUT/DELETE keepalive event of DM-worker.

func (WorkerEvent) String

func (w WorkerEvent) String() string

String implements Stringer interface.

type WorkerInfo

type WorkerInfo struct {
	Name string `json:"name"` // the name of the node.
	Addr string `json:"addr"` // the client address of the node to advertise.
}

WorkerInfo represents the node information of the DM-worker.

func NewWorkerInfo

func NewWorkerInfo(name, addr string) WorkerInfo

NewWorkerInfo creates a new WorkerInfo instance.

func (WorkerInfo) String

func (i WorkerInfo) String() string

String implements Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL