dm

package
v0.0.0-...-686f8ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OperateTask  p2p.Topic = "OperateTask"
	QueryStatus  p2p.Topic = "QueryStatus"
	StopWorker   p2p.Topic = "StopWorker"
	GetJobCfg    p2p.Topic = "GetJobCfg"
	Binlog       p2p.Topic = "Binlog"
	BinlogSchema p2p.Topic = "BinlogSchema"

	// internal
	BinlogTask       p2p.Topic = "BinlogTask"
	BinlogSchemaTask p2p.Topic = "BinlogSchemaTask"
	CoordinateDDL    p2p.Topic = "CoordinateDDL"
)

Defines topics here

Variables

View Source
var (

	// NewMessageAgent creates a new MessageAgent instance.
	NewMessageAgent = NewMessageAgentImpl
)

Functions

func GenerateResponse

func GenerateResponse(id messageID, command string, msg interface{}) interface{}

GenerateResponse generate mock response message.

func GenerateTopic

func GenerateTopic(senderID, receiverID string) string

GenerateTopic generate mock message topic.

Types

type BinlogRequest

type BinlogRequest pb.HandleErrorRequest

BinlogRequest is binlog request

type BinlogResponse

type BinlogResponse struct {
	ErrorMsg string
	// taskID -> task response
	Results map[string]*CommonTaskResponse
}

BinlogResponse is binlog response

type BinlogSchemaRequest

type BinlogSchemaRequest pb.OperateSchemaRequest

BinlogSchemaRequest is binlog schema request

type BinlogSchemaResponse

type BinlogSchemaResponse struct {
	ErrorMsg string
	// taskID -> task response
	Results map[string]*CommonTaskResponse
}

BinlogSchemaResponse is binlog schema response

type BinlogSchemaTaskRequest

type BinlogSchemaTaskRequest pb.OperateWorkerSchemaRequest

BinlogSchemaTaskRequest is binlog schema task request

type BinlogTaskRequest

type BinlogTaskRequest pb.HandleWorkerErrorRequest

BinlogTaskRequest is binlog task request

type CommonTaskResponse

type CommonTaskResponse struct {
	ErrorMsg string
	Msg      string
}

CommonTaskResponse is common task response

type CoordinateDDLRequest

type CoordinateDDLRequest metadata.DDLItem

CoordinateDDLRequest is coordinate DDL request

type CoordinateDDLResponse

type CoordinateDDLResponse struct {
	ErrorMsg      string
	DDLs          []string
	ConflictStage optimism.ConflictStage
}

CoordinateDDLResponse is coordinate DDL response

type MessageAgent

type MessageAgent interface {
	Tick(ctx context.Context) error
	Close(ctx context.Context) error
	// UpdateClient updates the client status.
	// When client online, caller should use this method to with not nil client.
	// When client offline temporary, caller should use this method with nil client.
	UpdateClient(clientID string, client client) error
	// RemoveClient is used when client is offline permanently, or the new client
	// with this clientID should be treated as a different client.
	RemoveClient(clientID string) error
	SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error
	SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)
}

MessageAgent defines interface for message communication.

func NewMessageAgentImpl

func NewMessageAgentImpl(id string, commandHandler interface{}, messageHandlerManager p2p.MessageHandlerManager, pLogger *zap.Logger) MessageAgent

NewMessageAgentImpl creates a new MessageAgent instance. message agent will call the method of commandHandler by command name automatically. The type of method of commandHandler should follow one of below: MessageFuncType: func(ctx context.Context, msg *interface{}) error {} RequestFuncType(1): func(ctx context.Context, req *interface{}) (resp *interface{}, err error) {} RequestFuncType(2): func(ctx context.Context, req *interface{}) (resp *interface{}) {}

type MessageAgentImpl

type MessageAgentImpl struct {
	// contains filtered or unexported fields
}

MessageAgentImpl implements the message processing mechanism.

func (*MessageAgentImpl) Close

func (agent *MessageAgentImpl) Close(ctx context.Context) error

Close closes message agent.

func (*MessageAgentImpl) RemoveClient

func (agent *MessageAgentImpl) RemoveClient(clientID string) error

RemoveClient implements MessageAgent.RemoveClient.

func (*MessageAgentImpl) SendMessage

func (agent *MessageAgentImpl) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error

SendMessage send message asynchronously.

func (*MessageAgentImpl) SendRequest

func (agent *MessageAgentImpl) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)

SendRequest send request synchronously. caller should add its own retry mechanism if needed. caller should persist the request itself if needed.

func (*MessageAgentImpl) Tick

func (agent *MessageAgentImpl) Tick(ctx context.Context) error

Tick implements MessageAgent.Tick

func (*MessageAgentImpl) UpdateClient

func (agent *MessageAgentImpl) UpdateClient(clientID string, client client) error

UpdateClient implements MessageAgent.UpdateClient.

type MockMessageAgent

type MockMessageAgent struct {
	sync.Mutex
	mock.Mock
}

MockMessageAgent implement MessageAgent

func (*MockMessageAgent) Close

func (m *MockMessageAgent) Close(ctx context.Context) error

Close implement MessageAgent.Close.

func (*MockMessageAgent) RemoveClient

func (m *MockMessageAgent) RemoveClient(clientID string) error

RemoveClient implement MessageAgent.RemoveClient.

func (*MockMessageAgent) SendMessage

func (m *MockMessageAgent) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error

SendMessage implement MessageAgent.SendMessage.

func (*MockMessageAgent) SendRequest

func (m *MockMessageAgent) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)

SendRequest implement MessageAgent.SendRequest.

func (*MockMessageAgent) Tick

func (m *MockMessageAgent) Tick(ctx context.Context) error

Tick implement MessageAgent.Tick.

func (*MockMessageAgent) UpdateClient

func (m *MockMessageAgent) UpdateClient(clientID string, client client) error

UpdateClient implement MessageAgent.UpdateClient.

type OperateTaskMessage

type OperateTaskMessage struct {
	Task string
	Op   OperateType
}

OperateTaskMessage is operate task message

type OperateType

type OperateType int

OperateType represents internal operate type in DM TODO: use OperateType in lib or move OperateType to lib.

const (
	None OperateType = iota
	Create
	Pause
	Resume
	Update
	Delete
	// internal
	Deleting
)

These op may updated in later pr.

func (OperateType) MarshalJSON

func (op OperateType) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (OperateType) String

func (op OperateType) String() string

String implements fmt.Stringer interface

func (*OperateType) UnmarshalJSON

func (op *OperateType) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type ProcessError

type ProcessError struct {
	ErrCode    int32  `json:"error_code,omitempty"`
	ErrClass   string `json:"error_class,omitempty"`
	ErrScope   string `json:"error_scope,omitempty"`
	ErrLevel   string `json:"error_level,omitempty"`
	Message    string `json:"message,omitempty"`
	RawCause   string `json:"raw_cause,omitempty"`
	Workaround string `json:"workaround,omitempty"`
}

ProcessError copies pb.ProcessError expect for JSON tag.

type ProcessResult

type ProcessResult struct {
	IsCanceled bool            `protobuf:"varint,1,opt,name=isCanceled,proto3" json:"is_canceled,omitempty"`
	Errors     []*ProcessError `protobuf:"bytes,2,rep,name=errors,proto3" json:"errors,omitempty"`
	Detail     []byte          `protobuf:"bytes,3,opt,name=detail,proto3" json:"detail,omitempty"`
}

ProcessResult copies pb.ProcessResult expect for JSON tag.

func NewProcessResultFromPB

func NewProcessResultFromPB(result *pb.ProcessResult) *ProcessResult

NewProcessResultFromPB converts ProcessResult from pb.ProcessResult.

type QueryStatusRequest

type QueryStatusRequest struct {
	Task string
}

QueryStatusRequest is query status request

type QueryStatusResponse

type QueryStatusResponse struct {
	ErrorMsg         string                `json:"error_message"`
	Unit             frameModel.WorkerType `json:"unit"`
	Stage            metadata.TaskStage    `json:"stage"`
	Result           *ProcessResult        `json:"result"`
	Status           json.RawMessage       `json:"status"`
	IoTotalBytes     uint64                `json:"io_total_bytes"`
	DumpIoTotalBytes uint64                `json:"dump_io_total_bytes"`
}

QueryStatusResponse is query status response

type StopWorkerMessage

type StopWorkerMessage struct {
	Task string
}

StopWorkerMessage is stop worker message

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL