Documentation ¶
Index ¶
- func NewMessageServer(selfID NodeID, _credential *security.Credential, opts ...MessageServerOpt) *p2pImpl.MessageServer
- type Config
- type HandlerFunc
- type MessageHandlerManager
- type MessageRPCService
- func NewDependentMessageRPCService(selfID NodeID, _credential *security.Credential, grpcSvr *grpc.Server, ...) (*MessageRPCService, error)
- func NewMessageRPCService(selfID NodeID, _credential *security.Credential, opts ...MessageServerOpt) (*MessageRPCService, error)
- func NewMessageRPCServiceWithRPCServer(selfID NodeID, _credential *security.Credential, grpcSvr *grpc.Server, ...) *MessageRPCService
- type MessageRouter
- type MessageSender
- type MessageServerOpt
- type MessageValue
- type MockMessageHandlerManager
- func (m *MockMessageHandlerManager) AssertHasHandler(t *testing.T, topic Topic, tpi TypeInformation)
- func (m *MockMessageHandlerManager) AssertNoHandler(t *testing.T, topic Topic)
- func (m *MockMessageHandlerManager) CheckError(ctx context.Context) error
- func (m *MockMessageHandlerManager) Clean(ctx context.Context) error
- func (m *MockMessageHandlerManager) InjectError(err error)
- func (m *MockMessageHandlerManager) InvokeHandler(t *testing.T, topic Topic, senderID NodeID, message interface{}) error
- func (m *MockMessageHandlerManager) RegisterHandler(ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc) (bool, error)
- func (m *MockMessageHandlerManager) SetTimeout(timeout time.Duration)
- func (m *MockMessageHandlerManager) UnregisterHandler(ctx context.Context, topic Topic) (bool, error)
- type MockMessageSender
- func (m *MockMessageSender) InjectError(err error)
- func (m *MockMessageSender) MarkNodeOffline(nodeID NodeID)
- func (m *MockMessageSender) MarkNodeOnline(nodeID NodeID)
- func (m *MockMessageSender) SendToNode(_ context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error)
- func (m *MockMessageSender) SendToNodeB(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) error
- func (m *MockMessageSender) SetBlocked(isBlocked bool)
- func (m *MockMessageSender) TryPop(targetNodeID NodeID, topic Topic) (interface{}, bool)
- type NodeID
- type Topic
- type TypeInformation
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMessageServer ¶
func NewMessageServer( selfID NodeID, _credential *security.Credential, opts ...MessageServerOpt, ) *p2pImpl.MessageServer
NewMessageServer creates a new message server from given configs
Types ¶
type HandlerFunc ¶
type HandlerFunc = func(sender NodeID, value MessageValue) error
HandlerFunc alias to message handler function
type MessageHandlerManager ¶
type MessageHandlerManager interface { RegisterHandler(ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc) (bool, error) UnregisterHandler(ctx context.Context, topic Topic) (bool, error) CheckError(ctx context.Context) error // Clean unregisters all existing handlers. Clean(ctx context.Context) error // SetTimeout sets the timeout for handler operations. // A timeout is needed because the underlying handler operations are // asynchronous in the MessageServer. SetTimeout(timeout time.Duration) }
MessageHandlerManager is for managing message topic handlers. NOTE: for each topic, only one handler is allowed.
type MessageRPCService ¶
type MessageRPCService struct {
// contains filtered or unexported fields
}
MessageRPCService is a background service wrapping a MessageServer instance.
func NewDependentMessageRPCService ¶
func NewDependentMessageRPCService( selfID NodeID, _credential *security.Credential, grpcSvr *grpc.Server, opts ...MessageServerOpt, ) (*MessageRPCService, error)
NewDependentMessageRPCService creates a new MessageRPCService that DOES NOT own a `grpc.Server`. TODO refactor the design.
func NewMessageRPCService ¶
func NewMessageRPCService( selfID NodeID, _credential *security.Credential, opts ...MessageServerOpt, ) (*MessageRPCService, error)
NewMessageRPCService creates a new MessageRPCService. Note: TLS is not supported for now.
func NewMessageRPCServiceWithRPCServer ¶
func NewMessageRPCServiceWithRPCServer( selfID NodeID, _credential *security.Credential, grpcSvr *grpc.Server, opts ...MessageServerOpt, ) *MessageRPCService
NewMessageRPCServiceWithRPCServer creates a new MessageRPCService with an existing gRPC server.
func (*MessageRPCService) GetMessageServer ¶
func (s *MessageRPCService) GetMessageServer() *p2pImpl.MessageServer
GetMessageServer returns the internal message server
func (*MessageRPCService) MakeHandlerManager ¶
func (s *MessageRPCService) MakeHandlerManager() MessageHandlerManager
MakeHandlerManager returns a MessageHandlerManager
type MessageRouter ¶
type MessageRouter = p2p.MessageRouter
MessageRouter alias to p2p.MessageRouter
func NewMessageRouter ¶
func NewMessageRouter(nodeID NodeID, advertisedAddr string) MessageRouter
NewMessageRouter creates a new MessageRouter instance via tiflow p2p API
type MessageSender ¶
type MessageSender interface { // SendToNode sends a message to a given node. Returns whether it is successful and a possible error. // A `would-block` error will not be returned. (false, nil) would be returned instead. SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error) // SendToNodeB sends a message to a given node in a blocking way SendToNodeB(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) error }
MessageSender is used to send a message of a given topic to a given node.
func NewMessageSender ¶
func NewMessageSender(router MessageRouter) MessageSender
NewMessageSender returns a new message sender.
type MessageServerOpt ¶
type MessageServerOpt = func(*Config)
MessageServerOpt alias to the option setter function
type MockMessageHandlerManager ¶
type MockMessageHandlerManager struct {
// contains filtered or unexported fields
}
MockMessageHandlerManager is used in unit-test only, it simulates a message handler manager
func NewMockMessageHandlerManager ¶
func NewMockMessageHandlerManager() *MockMessageHandlerManager
NewMockMessageHandlerManager creates a new MockMessageHandlerManager instance
func (*MockMessageHandlerManager) AssertHasHandler ¶
func (m *MockMessageHandlerManager) AssertHasHandler(t *testing.T, topic Topic, tpi TypeInformation)
AssertHasHandler checks the given topic is registered
func (*MockMessageHandlerManager) AssertNoHandler ¶
func (m *MockMessageHandlerManager) AssertNoHandler(t *testing.T, topic Topic)
AssertNoHandler checks the given topic is not registered
func (*MockMessageHandlerManager) CheckError ¶
func (m *MockMessageHandlerManager) CheckError(ctx context.Context) error
CheckError implements MessageHandlerManager.CheckError
func (*MockMessageHandlerManager) Clean ¶
func (m *MockMessageHandlerManager) Clean(ctx context.Context) error
Clean implements MessageHandlerManager.Clean
func (*MockMessageHandlerManager) InjectError ¶
func (m *MockMessageHandlerManager) InjectError(err error)
InjectError injects an error into the mock message handler
func (*MockMessageHandlerManager) InvokeHandler ¶
func (m *MockMessageHandlerManager) InvokeHandler(t *testing.T, topic Topic, senderID NodeID, message interface{}) error
InvokeHandler gets the handler of given topic and invoke the handler to simulate to send message from given sender
func (*MockMessageHandlerManager) RegisterHandler ¶
func (m *MockMessageHandlerManager) RegisterHandler( ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc, ) (bool, error)
RegisterHandler implements MessageHandlerManager.RegisterHandler
func (*MockMessageHandlerManager) SetTimeout ¶
func (m *MockMessageHandlerManager) SetTimeout(timeout time.Duration)
SetTimeout implements MessageHandlerManager.SetTimeout
func (*MockMessageHandlerManager) UnregisterHandler ¶
func (m *MockMessageHandlerManager) UnregisterHandler(ctx context.Context, topic Topic) (bool, error)
UnregisterHandler implements MessageHandlerManager.UnregisterHandler
type MockMessageSender ¶
type MockMessageSender struct {
// contains filtered or unexported fields
}
MockMessageSender defines a mock message sender
func NewMockMessageSender ¶
func NewMockMessageSender() *MockMessageSender
NewMockMessageSender creates a new MockMessageSender instance
func (*MockMessageSender) InjectError ¶
func (m *MockMessageSender) InjectError(err error)
InjectError injects error to simulate error scenario
func (*MockMessageSender) MarkNodeOffline ¶
func (m *MockMessageSender) MarkNodeOffline(nodeID NodeID)
MarkNodeOffline marks a node as offline.
func (*MockMessageSender) MarkNodeOnline ¶
func (m *MockMessageSender) MarkNodeOnline(nodeID NodeID)
MarkNodeOnline marks a node as online. Note that by default all nodes are treated as online to facilitate testing.
func (*MockMessageSender) SendToNode ¶
func (m *MockMessageSender) SendToNode( _ context.Context, targetNodeID NodeID, topic Topic, message interface{}, ) (bool, error)
SendToNode implements pkg/p2p.MessageSender.SendToNode
func (*MockMessageSender) SendToNodeB ¶
func (m *MockMessageSender) SendToNodeB( ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}, ) error
SendToNodeB implements pkg/p2p.MessageSender.SendToNodeB
func (*MockMessageSender) SetBlocked ¶
func (m *MockMessageSender) SetBlocked(isBlocked bool)
SetBlocked makes the message send blocking