master

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 72 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRate      = float64(10)
	DefaultBurst     = 40
	ErrorNoEmitToken = "fail to get emit opportunity for %s"
)

rate limit related constant value.

Variables

View Source
var (

	// CheckAndAdjustSourceConfigFunc is exposed to dataflow engine.
	// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
	// compatibility compromise.
	// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
	CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
)
View Source
var SampleConfig string

SampleConfig is sample config of dm-master.

Functions

func AdjustTargetDBSessionCfg

func AdjustTargetDBSessionCfg(ctx context.Context, dbConfig *dbconfig.DBConfig) error

func GetLatestMeta

func GetLatestMeta(ctx context.Context, flavor string, dbConfig *dbconfig.DBConfig) (*config.Meta, error)

GetLatestMeta gets newest meta(binlog name, pos, gtid) from upstream.

Types

type Agent

type Agent struct {
	ID int
}

Agent communicate with dm-workers.

type AgentPool

type AgentPool struct {
	// contains filtered or unexported fields
}

AgentPool is a pool to control communication with dm-workers It provides rate limit control for agent acquire, including dispatch rate r and permits bursts of at most b tokens. caller shouldn't to hold agent to avoid deadlock.

func NewAgentPool

func NewAgentPool(cfg *RateLimitConfig) *AgentPool

NewAgentPool returns a agent pool.

func (*AgentPool) Apply

func (ap *AgentPool) Apply(ctx context.Context, id int) *Agent

Apply applies for a agent if ctx is canceled before we get an agent, returns nil.

func (*AgentPool) Emit

func (ap *AgentPool) Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{})

Emit applies for an agent to communicates with dm-worker.

func (*AgentPool) Start

func (ap *AgentPool) Start(ctx context.Context)

Start starts AgentPool background dispatcher.

type Config

type Config struct {
	LogLevel  string `toml:"log-level" json:"log-level"`
	LogFile   string `toml:"log-file" json:"log-file"`
	LogFormat string `toml:"log-format" json:"log-format"`
	LogRotate string `toml:"log-rotate" json:"log-rotate"`

	RPCTimeoutStr string        `toml:"rpc-timeout" json:"rpc-timeout"`
	RPCTimeout    time.Duration `toml:"-" json:"-"`
	RPCRateLimit  float64       `toml:"rpc-rate-limit" json:"rpc-rate-limit"`
	RPCRateBurst  int           `toml:"rpc-rate-burst" json:"rpc-rate-burst"`

	MasterAddr    string `toml:"master-addr" json:"master-addr"`
	AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

	ConfigFile string `toml:"config-file" json:"config-file"`

	// etcd relative config items
	// NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls`
	// NOTE: more items will be add when adding leader election
	Name                    string `toml:"name" json:"name"`
	DataDir                 string `toml:"data-dir" json:"data-dir"`
	PeerUrls                string `toml:"peer-urls" json:"peer-urls"`
	AdvertisePeerUrls       string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
	InitialCluster          string `toml:"initial-cluster" json:"initial-cluster"`
	InitialClusterState     string `toml:"initial-cluster-state" json:"initial-cluster-state"`
	Join                    string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
	MaxTxnOps               uint   `toml:"max-txn-ops" json:"max-txn-ops"`
	MaxRequestBytes         uint   `toml:"max-request-bytes" json:"max-request-bytes"`
	AutoCompactionMode      string `toml:"auto-compaction-mode" json:"auto-compaction-mode"`
	AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention"`
	QuotaBackendBytes       int64  `toml:"quota-backend-bytes" json:"quota-backend-bytes"`
	OpenAPI                 bool   `toml:"openapi" json:"openapi"`

	// directory path used to store source config files when upgrading from v1.0.x.
	// if this path set, DM-master leader will try to upgrade from v1.0.x to the current version.
	V1SourcesPath string `toml:"v1-sources-path" json:"v1-sources-path"`

	// tls config
	security.Security
	SecretKeyPath string `toml:"secret-key-path" json:"secret-key-path"  yaml:"secret-key-path"`
	SecretKey     []byte `toml:"-" json:"-" yaml:"-"`

	ExperimentalFeatures ExperimentalFeatures `toml:"experimental"`
	// contains filtered or unexported fields
}

Config is the configuration for dm-master.

func NewConfig

func NewConfig() *Config

NewConfig creates a config for dm-master.

func (*Config) FromContent

func (c *Config) FromContent(content string) error

FromContent loads config from TOML format content.

func (*Config) Parse

func (c *Config) Parse(arguments []string) error

Parse parses flag definitions from the argument list.

func (*Config) Reload

func (c *Config) Reload() error

Reload load config from local file.

func (*Config) String

func (c *Config) String() string

func (*Config) Toml

func (c *Config) Toml() (string, error)

Toml returns TOML format representation of config.

type ExperimentalFeatures

type ExperimentalFeatures struct {
	OpenAPI bool `toml:"openapi,omitempty"` // OpenAPI is available in v5.4 as default.
}

type RateLimitConfig

type RateLimitConfig struct {
	// contains filtered or unexported fields
}

RateLimitConfig holds rate limit config.

type Server

type Server struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Server handles RPC requests for dm-master.

func NewServer

func NewServer(cfg *Config) *Server

NewServer creates a new Server.

func (*Server) CheckTask

func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.CheckTaskResponse, error)

CheckTask checks legality of task configuration.

func (*Server) Close

func (s *Server) Close()

Close close the RPC server, this function can be called multiple times.

func (*Server) ClusterID

func (s *Server) ClusterID() uint64

ClusterID return correct cluster id when as leader.

func (*Server) DMAPIConvertTask

func (s *Server) DMAPIConvertTask(c *gin.Context)

DMAPIConvertTask turns task into the format of a configuration file or vice versa url is: (POST /api/v1/tasks/,).

func (*Server) DMAPICreateSource

func (s *Server) DMAPICreateSource(c *gin.Context)

DMAPICreateSource url is:(POST /api/v1/sources).

func (*Server) DMAPICreateTask

func (s *Server) DMAPICreateTask(c *gin.Context)

DMAPICreateTask url is:(POST /api/v1/tasks).

func (*Server) DMAPICreateTaskTemplate

func (s *Server) DMAPICreateTaskTemplate(c *gin.Context)

DMAPICreateTaskTemplate create task_config_template url is: (POST /api/tasks/templates).

func (*Server) DMAPIDeleteSource

func (s *Server) DMAPIDeleteSource(c *gin.Context, sourceName string, params openapi.DMAPIDeleteSourceParams)

DMAPIDeleteSource url is:(DELETE /api/v1/sources/{source-name}).

func (*Server) DMAPIDeleteTableStructure

func (s *Server) DMAPIDeleteTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string)

DMAPIDeleteTableStructure delete task source table structure url is: (DELETE /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).

func (*Server) DMAPIDeleteTask

func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi.DMAPIDeleteTaskParams)

DMAPIDeleteTask url is:(DELETE /api/v1/tasks).

func (*Server) DMAPIDeleteTaskTemplate

func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string)

DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/tasks/templates/{task-name}).

func (*Server) DMAPIDisableRelay

func (s *Server) DMAPIDisableRelay(c *gin.Context, sourceName string)

DMAPIEnableRelay url is:(POST /api/v1/relay/disable).

func (*Server) DMAPIDisableSource

func (s *Server) DMAPIDisableSource(c *gin.Context, sourceName string)

DMAPIDisableSource url is:(POST /api/v1/sources/{source-name}/disable).

func (*Server) DMAPIEnableRelay

func (s *Server) DMAPIEnableRelay(c *gin.Context, sourceName string)

DMAPIEnableRelay url is:(POST /api/v1/relay/enable).

func (*Server) DMAPIEnableSource

func (s *Server) DMAPIEnableSource(c *gin.Context, sourceName string)

DMAPIEnableSource url is:(POST /api/v1/sources/{source-name}/enable).

func (*Server) DMAPIGetClusterInfo

func (s *Server) DMAPIGetClusterInfo(c *gin.Context)

DMAPIGetClusterInfo return cluster id of dm cluster url is: (GET /api/v1/cluster/info).

func (*Server) DMAPIGetClusterMasterList

func (s *Server) DMAPIGetClusterMasterList(c *gin.Context)

DMAPIGetClusterMasterList get cluster master node list url is:(GET /api/v1/cluster/masters).

func (*Server) DMAPIGetClusterWorkerList

func (s *Server) DMAPIGetClusterWorkerList(c *gin.Context)

DMAPIGetClusterWorkerList get cluster worker node list url is: (GET /api/v1/cluster/workers).

func (*Server) DMAPIGetSchemaListByTaskAndSource

func (s *Server) DMAPIGetSchemaListByTaskAndSource(c *gin.Context, taskName string, sourceName string)

DMAPIGetSchemaListByTaskAndSource get task source schema list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas).

func (*Server) DMAPIGetSource

func (s *Server) DMAPIGetSource(c *gin.Context, sourceName string, params openapi.DMAPIGetSourceParams)

DMAPIGetSource url is:(GET /api/v1/sources/{source-name}).

func (*Server) DMAPIGetSourceList

func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourceListParams)

DMAPIGetSourceList url is:(GET /api/v1/sources).

func (*Server) DMAPIGetSourceSchemaList

func (s *Server) DMAPIGetSourceSchemaList(c *gin.Context, sourceName string)

DMAPIGetSourceSchemaList get source schema list url is: (GET /api/v1/sources/{source-name}/schemas).

func (*Server) DMAPIGetSourceStatus

func (s *Server) DMAPIGetSourceStatus(c *gin.Context, sourceName string)

DMAPIGetSourceStatus url is: (GET /api/v1/sources/{source-id}/status).

func (*Server) DMAPIGetSourceTableList

func (s *Server) DMAPIGetSourceTableList(c *gin.Context, sourceName string, schemaName string)

DMAPIGetSourceTableList get source table list url is: (GET /api/v1/sources/{source-name}/schemas/{schema-name}).

func (*Server) DMAPIGetTableListByTaskAndSource

func (s *Server) DMAPIGetTableListByTaskAndSource(c *gin.Context, taskName string, sourceName string, schemaName string)

DMAPIGetTableListByTaskAndSource get task source table list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}).

func (*Server) DMAPIGetTableStructure

func (s *Server) DMAPIGetTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string)

DMAPIGetTableStructure get task source table structure url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).

func (*Server) DMAPIGetTask

func (s *Server) DMAPIGetTask(c *gin.Context, taskName string, params openapi.DMAPIGetTaskParams)

DMAPIGetTask url is:(GET /api/v1/tasks/{task-name}).

func (*Server) DMAPIGetTaskList

func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskListParams)

DMAPIGetTaskList url is:(GET /api/v1/tasks).

func (*Server) DMAPIGetTaskMigrateTargets

func (s *Server) DMAPIGetTaskMigrateTargets(c *gin.Context, taskName string, sourceName string, params openapi.DMAPIGetTaskMigrateTargetsParams)

DMAPIGetTaskMigrateTargets get task migrate targets list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/migrate_targets).

func (*Server) DMAPIGetTaskStatus

func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params openapi.DMAPIGetTaskStatusParams)

DMAPIGetTaskStatus url is:(GET /api/v1/tasks/{task-name}/status).

func (*Server) DMAPIGetTaskTemplate

func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string)

DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/tasks/templates/{task-name}).

func (*Server) DMAPIGetTaskTemplateList

func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context)

DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/tasks/templates).

func (*Server) DMAPIImportTaskTemplate

func (s *Server) DMAPIImportTaskTemplate(c *gin.Context)

DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/tasks/templates/import).

func (*Server) DMAPIOfflineMasterNode

func (s *Server) DMAPIOfflineMasterNode(c *gin.Context, masterName string)

DMAPIOfflineMasterNode offline master node url is: (DELETE /api/v1/cluster/masters/{master-name}).

func (*Server) DMAPIOfflineWorkerNode

func (s *Server) DMAPIOfflineWorkerNode(c *gin.Context, workerName string)

DMAPIOfflineWorkerNode offline worker node url is: (DELETE /api/v1/cluster/workers/{worker-name}).

func (*Server) DMAPIOperateTableStructure

func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string)

DMAPIOperateTableStructure operate task source table structure url is: (PUT /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}).

func (*Server) DMAPIPurgeRelay

func (s *Server) DMAPIPurgeRelay(c *gin.Context, sourceName string)

DMAPIPurgeRelay url is:(POST /api/v1/relay/purge).

func (*Server) DMAPIStartTask

func (s *Server) DMAPIStartTask(c *gin.Context, taskName string)

DMAPIStartTask url is: (POST /api/v1/tasks/{task-name}/start).

func (*Server) DMAPIStopTask

func (s *Server) DMAPIStopTask(c *gin.Context, taskName string)

DMAPIStopTask url is: (POST /api/v1/tasks/{task-name}/stop).

func (*Server) DMAPITransferSource

func (s *Server) DMAPITransferSource(c *gin.Context, sourceName string)

DMAPITransferSource transfer source to another free worker url is: (POST /api/v1/sources/{source-name}/transfer).

func (*Server) DMAPIUpdateClusterInfo

func (s *Server) DMAPIUpdateClusterInfo(c *gin.Context)

DMAPIGetClusterInfo return cluster id of dm cluster url is: (PUT /api/v1/cluster/info).

func (*Server) DMAPIUpdateSource

func (s *Server) DMAPIUpdateSource(c *gin.Context, sourceName string)

DMAPIUpdateSource url is:(PUT /api/v1/sources/{source-name}).

func (*Server) DMAPIUpdateTask

func (s *Server) DMAPIUpdateTask(c *gin.Context, taskName string)

DMAPIUpdateTask url is: (PUT /api/v1/tasks/{task-name}).

func (*Server) DMAPUpdateTaskTemplate

func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string)

DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/tasks/templates/{task-name}).

func (*Server) Encrypt

func (s *Server) Encrypt(ctx context.Context, req *pb.EncryptRequest) (*pb.EncryptResponse, error)

func (*Server) GetCfg

func (s *Server) GetCfg(ctx context.Context, req *pb.GetCfgRequest) (*pb.GetCfgResponse, error)

GetCfg implements MasterServer.GetCfg.

func (*Server) GetDocHTML

func (s *Server) GetDocHTML(c *gin.Context)

GetDocHTML url is:(GET /api/v1/docs).

func (*Server) GetDocJSON

func (s *Server) GetDocJSON(c *gin.Context)

GetDocJSON url is:(GET /api/v1/dm.json).

func (*Server) GetMasterCfg

func (s *Server) GetMasterCfg(ctx context.Context, req *pb.GetMasterCfgRequest) (*pb.GetMasterCfgResponse, error)

GetMasterCfg implements MasterServer.GetMasterCfg.

func (*Server) GetSubTaskCfg

func (s *Server) GetSubTaskCfg(ctx context.Context, req *pb.GetSubTaskCfgRequest) (*pb.GetSubTaskCfgResponse, error)

GetSubTaskCfg implements MasterServer.GetSubTaskCfg.

func (*Server) GetValidationError

func (*Server) GetValidationStatus

func (*Server) HandleError

func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (*pb.HandleErrorResponse, error)

HandleError implements MasterServer.HandleError.

func (*Server) InitOpenAPIHandles

func (s *Server) InitOpenAPIHandles(tlsCfg *tls.Config) error

InitOpenAPIHandles init openapi handlers.

func (*Server) ListMember

func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb.ListMemberResponse, error)

ListMember list member information.

func (*Server) ListSourceConfigs

func (s *Server) ListSourceConfigs(ctx context.Context, req *emptypb.Empty) (*pb.ListSourceConfigsResponse, error)

func (*Server) ListTaskConfigs

func (s *Server) ListTaskConfigs(ctx context.Context, req *emptypb.Empty) (*pb.ListTaskConfigsResponse, error)

func (*Server) OfflineMember

func (s *Server) OfflineMember(ctx context.Context, req *pb.OfflineMemberRequest) (*pb.OfflineMemberResponse, error)

OfflineMember removes info of the master/worker which has been Closed all the masters are store in etcd member list all the workers are store in the path: key: /dm-worker/r value: WorkerInfo

func (*Server) OperateLeader

func (s *Server) OperateLeader(ctx context.Context, req *pb.OperateLeaderRequest) (*pb.OperateLeaderResponse, error)

OperateLeader implements MasterServer.OperateLeader Note: this request doesn't need to forward to leader.

func (*Server) OperateRelay

func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)

OperateRelay implements MasterServer.OperateRelay.

func (*Server) OperateSchema

func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest) (*pb.OperateSchemaResponse, error)

OperateSchema operates schema of an upstream table.

func (*Server) OperateSource

func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest) (*pb.OperateSourceResponse, error)

OperateSource will create or update an upstream source.

func (*Server) OperateTask

func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*pb.OperateTaskResponse, error)

OperateTask implements MasterServer.OperateTask.

func (*Server) OperateWorkerRelayTask

func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWorkerRelayRequest) (*pb.OperateWorkerRelayResponse, error)

OperateWorkerRelayTask implements MasterServer.OperateWorkerRelayTask.

func (*Server) PurgeWorkerRelay

func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayRequest) (*pb.PurgeWorkerRelayResponse, error)

PurgeWorkerRelay implements MasterServer.PurgeWorkerRelay.

func (*Server) QueryStatus

QueryStatus implements MasterServer.QueryStatus.

func (*Server) RegisterWorker

func (s *Server) RegisterWorker(ctx context.Context, req *pb.RegisterWorkerRequest) (*pb.RegisterWorkerResponse, error)

RegisterWorker registers the worker to the master, and all the worker will be store in the path: key: /dm-worker/r/name value: workerInfo

func (*Server) ShowDDLLocks

func (s *Server) ShowDDLLocks(ctx context.Context, req *pb.ShowDDLLocksRequest) (*pb.ShowDDLLocksResponse, error)

ShowDDLLocks implements MasterServer.ShowDDLLocks.

func (*Server) Start

func (s *Server) Start(ctx context.Context) (err error)

Start starts to serving.

func (*Server) StartTask

func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.StartTaskResponse, error)

StartTask implements MasterServer.StartTask.

func (*Server) StartValidation

func (s *Server) StartValidation(ctx context.Context, req *pb.StartValidationRequest) (*pb.StartValidationResponse, error)

func (*Server) StopValidation

func (s *Server) StopValidation(ctx context.Context, req *pb.StopValidationRequest) (*pb.StopValidationResponse, error)

func (*Server) TransferSource

func (s *Server) TransferSource(ctx context.Context, req *pb.TransferSourceRequest) (*pb.TransferSourceResponse, error)

TransferSource implements MasterServer.TransferSource.

func (*Server) UnlockDDLLock

func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest) (*pb.UnlockDDLLockResponse, error)

UnlockDDLLock implements MasterServer.UnlockDDLLock TODO(csuzhangxc): implement this later.

func (*Server) UpdateTask

func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb.UpdateTaskResponse, error)

UpdateTask implements MasterServer.UpdateTask TODO: support update task later.

func (*Server) UpdateValidation

func (s *Server) UpdateValidation(ctx context.Context, req *pb.UpdateValidationRequest) (*pb.UpdateValidationResponse, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL