Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateResponse(id messageID, command string, msg interface{}) interface{}
- func GenerateTopic(senderID, receiverID string) string
- type BinlogRequest
- type BinlogResponse
- type BinlogSchemaRequest
- type BinlogSchemaResponse
- type BinlogSchemaTaskRequest
- type BinlogTaskRequest
- type CommonTaskResponse
- type CoordinateDDLRequest
- type CoordinateDDLResponse
- type MessageAgent
- type MessageAgentImpl
- func (agent *MessageAgentImpl) Close(ctx context.Context) error
- func (agent *MessageAgentImpl) RemoveClient(clientID string) error
- func (agent *MessageAgentImpl) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error
- func (agent *MessageAgentImpl) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)
- func (agent *MessageAgentImpl) Tick(ctx context.Context) error
- func (agent *MessageAgentImpl) UpdateClient(clientID string, client client) error
- type MockMessageAgent
- func (m *MockMessageAgent) Close(ctx context.Context) error
- func (m *MockMessageAgent) RemoveClient(clientID string) error
- func (m *MockMessageAgent) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error
- func (m *MockMessageAgent) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)
- func (m *MockMessageAgent) Tick(ctx context.Context) error
- func (m *MockMessageAgent) UpdateClient(clientID string, client client) error
- type OperateTaskMessage
- type OperateType
- type ProcessError
- type ProcessResult
- type QueryStatusRequest
- type QueryStatusResponse
- type StopWorkerMessage
Constants ¶
const ( OperateTask p2p.Topic = "OperateTask" QueryStatus p2p.Topic = "QueryStatus" StopWorker p2p.Topic = "StopWorker" GetJobCfg p2p.Topic = "GetJobCfg" Binlog p2p.Topic = "Binlog" BinlogSchema p2p.Topic = "BinlogSchema" // internal BinlogTask p2p.Topic = "BinlogTask" BinlogSchemaTask p2p.Topic = "BinlogSchemaTask" CoordinateDDL p2p.Topic = "CoordinateDDL" )
Defines topics here
Variables ¶
var ( // NewMessageAgent creates a new MessageAgent instance. NewMessageAgent = NewMessageAgentImpl )
Functions ¶
func GenerateResponse ¶
func GenerateResponse(id messageID, command string, msg interface{}) interface{}
GenerateResponse generate mock response message.
func GenerateTopic ¶
GenerateTopic generate mock message topic.
Types ¶
type BinlogResponse ¶
type BinlogResponse struct { ErrorMsg string // taskID -> task response Results map[string]*CommonTaskResponse }
BinlogResponse is binlog response
type BinlogSchemaRequest ¶
type BinlogSchemaRequest pb.OperateSchemaRequest
BinlogSchemaRequest is binlog schema request
type BinlogSchemaResponse ¶
type BinlogSchemaResponse struct { ErrorMsg string // taskID -> task response Results map[string]*CommonTaskResponse }
BinlogSchemaResponse is binlog schema response
type BinlogSchemaTaskRequest ¶
type BinlogSchemaTaskRequest pb.OperateWorkerSchemaRequest
BinlogSchemaTaskRequest is binlog schema task request
type BinlogTaskRequest ¶
type BinlogTaskRequest pb.HandleWorkerErrorRequest
BinlogTaskRequest is binlog task request
type CommonTaskResponse ¶
CommonTaskResponse is common task response
type CoordinateDDLRequest ¶
CoordinateDDLRequest is coordinate DDL request
type CoordinateDDLResponse ¶
type CoordinateDDLResponse struct { ErrorMsg string DDLs []string ConflictStage optimism.ConflictStage }
CoordinateDDLResponse is coordinate DDL response
type MessageAgent ¶
type MessageAgent interface { Tick(ctx context.Context) error Close(ctx context.Context) error // UpdateClient updates the client status. // When client online, caller should use this method to with not nil client. // When client offline temporary, caller should use this method with nil client. UpdateClient(clientID string, client client) error // RemoveClient is used when client is offline permanently, or the new client // with this clientID should be treated as a different client. RemoveClient(clientID string) error SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error) }
MessageAgent defines interface for message communication.
func NewMessageAgentImpl ¶
func NewMessageAgentImpl(id string, commandHandler interface{}, messageHandlerManager p2p.MessageHandlerManager, pLogger *zap.Logger) MessageAgent
NewMessageAgentImpl creates a new MessageAgent instance. message agent will call the method of commandHandler by command name automatically. The type of method of commandHandler should follow one of below: MessageFuncType: func(ctx context.Context, msg *interface{}) error {} RequestFuncType(1): func(ctx context.Context, req *interface{}) (resp *interface{}, err error) {} RequestFuncType(2): func(ctx context.Context, req *interface{}) (resp *interface{}) {}
type MessageAgentImpl ¶
type MessageAgentImpl struct {
// contains filtered or unexported fields
}
MessageAgentImpl implements the message processing mechanism.
func (*MessageAgentImpl) Close ¶
func (agent *MessageAgentImpl) Close(ctx context.Context) error
Close closes message agent.
func (*MessageAgentImpl) RemoveClient ¶
func (agent *MessageAgentImpl) RemoveClient(clientID string) error
RemoveClient implements MessageAgent.RemoveClient.
func (*MessageAgentImpl) SendMessage ¶
func (agent *MessageAgentImpl) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error
SendMessage send message asynchronously.
func (*MessageAgentImpl) SendRequest ¶
func (agent *MessageAgentImpl) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)
SendRequest send request synchronously. caller should add its own retry mechanism if needed. caller should persist the request itself if needed.
func (*MessageAgentImpl) Tick ¶
func (agent *MessageAgentImpl) Tick(ctx context.Context) error
Tick implements MessageAgent.Tick
func (*MessageAgentImpl) UpdateClient ¶
func (agent *MessageAgentImpl) UpdateClient(clientID string, client client) error
UpdateClient implements MessageAgent.UpdateClient.
type MockMessageAgent ¶
MockMessageAgent implement MessageAgent
func (*MockMessageAgent) Close ¶
func (m *MockMessageAgent) Close(ctx context.Context) error
Close implement MessageAgent.Close.
func (*MockMessageAgent) RemoveClient ¶
func (m *MockMessageAgent) RemoveClient(clientID string) error
RemoveClient implement MessageAgent.RemoveClient.
func (*MockMessageAgent) SendMessage ¶
func (m *MockMessageAgent) SendMessage(ctx context.Context, clientID string, command string, msg interface{}) error
SendMessage implement MessageAgent.SendMessage.
func (*MockMessageAgent) SendRequest ¶
func (m *MockMessageAgent) SendRequest(ctx context.Context, clientID string, command string, req interface{}) (interface{}, error)
SendRequest implement MessageAgent.SendRequest.
func (*MockMessageAgent) Tick ¶
func (m *MockMessageAgent) Tick(ctx context.Context) error
Tick implement MessageAgent.Tick.
func (*MockMessageAgent) UpdateClient ¶
func (m *MockMessageAgent) UpdateClient(clientID string, client client) error
UpdateClient implement MessageAgent.UpdateClient.
type OperateTaskMessage ¶
type OperateTaskMessage struct { Task string Op OperateType }
OperateTaskMessage is operate task message
type OperateType ¶
type OperateType int
OperateType represents internal operate type in DM TODO: use OperateType in lib or move OperateType to lib.
const ( None OperateType = iota Create Pause Resume Update Delete // internal Deleting )
These op may updated in later pr.
func (OperateType) MarshalJSON ¶
func (op OperateType) MarshalJSON() ([]byte, error)
MarshalJSON marshals the enum as a quoted json string
func (OperateType) String ¶
func (op OperateType) String() string
String implements fmt.Stringer interface
func (*OperateType) UnmarshalJSON ¶
func (op *OperateType) UnmarshalJSON(b []byte) error
UnmarshalJSON unmashals a quoted json string to the enum value
type ProcessError ¶
type ProcessError struct { ErrCode int32 `json:"error_code,omitempty"` ErrClass string `json:"error_class,omitempty"` ErrScope string `json:"error_scope,omitempty"` ErrLevel string `json:"error_level,omitempty"` Message string `json:"message,omitempty"` RawCause string `json:"raw_cause,omitempty"` Workaround string `json:"workaround,omitempty"` }
ProcessError copies pb.ProcessError expect for JSON tag.
type ProcessResult ¶
type ProcessResult struct { IsCanceled bool `protobuf:"varint,1,opt,name=isCanceled,proto3" json:"is_canceled,omitempty"` Errors []*ProcessError `protobuf:"bytes,2,rep,name=errors,proto3" json:"errors,omitempty"` Detail []byte `protobuf:"bytes,3,opt,name=detail,proto3" json:"detail,omitempty"` }
ProcessResult copies pb.ProcessResult expect for JSON tag.
func NewProcessResultFromPB ¶
func NewProcessResultFromPB(result *pb.ProcessResult) *ProcessResult
NewProcessResultFromPB converts ProcessResult from pb.ProcessResult.
type QueryStatusRequest ¶
type QueryStatusRequest struct {
Task string
}
QueryStatusRequest is query status request
type QueryStatusResponse ¶
type QueryStatusResponse struct { ErrorMsg string `json:"error_message"` Unit frameModel.WorkerType `json:"unit"` Stage metadata.TaskStage `json:"stage"` Result *ProcessResult `json:"result"` Status json.RawMessage `json:"status"` IoTotalBytes uint64 `json:"io_total_bytes"` DumpIoTotalBytes uint64 `json:"dump_io_total_bytes"` }
QueryStatusResponse is query status response
type StopWorkerMessage ¶
type StopWorkerMessage struct {
Task string
}
StopWorkerMessage is stop worker message