Documentation ¶
Index ¶
- func InitMetrics(registry *prometheus.Registry)
- func NewGrpcMessageClient(senderID NodeID, config *MessageClientConfig) *grpcMessageClient
- func NewMessageRouter(selfID NodeID, credentials *security.Credential, ...) *messageRouterImpl
- func NewMessageRouterWithLocalClient(selfID NodeID, credentials *security.Credential, ...) *messageRouterImpl
- type MessageClient
- type MessageClientConfig
- type MessageClientStream
- type MessageEntry
- type MessageRouter
- type MessageServer
- func (m *MessageServer) AddHandler(ctx context.Context, topic string, tpi typeInformation, ...) (chan struct{}, <-chan error, error)
- func (m *MessageServer) RemoveHandler(ctx context.Context, topic string) (chan struct{}, error)
- func (m *MessageServer) Run(ctx context.Context, localCh <-chan RawMessageEntry) error
- func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error
- func (m *MessageServer) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
- func (m *MessageServer) SyncAddHandler(ctx context.Context, topic string, tpi typeInformation, ...) (<-chan error, error)
- func (m *MessageServer) SyncRemoveHandler(ctx context.Context, topic string) error
- type MessageServerConfig
- type MessageServerStream
- type MockCluster
- type MockNode
- type NodeID
- type RawMessageEntry
- type Seq
- type Serializable
- type ServerWrapper
- type Topic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics initializes metrics used by pkg/p2p
func NewGrpcMessageClient ¶
func NewGrpcMessageClient(senderID NodeID, config *MessageClientConfig) *grpcMessageClient
NewGrpcMessageClient creates a new MessageClient senderID is an identifier for the local node.
func NewMessageRouter ¶
func NewMessageRouter(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl
NewMessageRouter creates a new MessageRouter
func NewMessageRouterWithLocalClient ¶
func NewMessageRouterWithLocalClient(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl
NewMessageRouterWithLocalClient creates a new MessageRouter with a local client.
Types ¶
type MessageClient ¶
type MessageClient interface { // Run should be executed in a dedicated goroutine and it would block unless an irrecoverable error has been encountered. Run(ctx context.Context, network string, addr string, receiverID NodeID, credential *security.Credential) (ret error) // SendMessage sends a message of a given topic. It would block if the inner channel is congested. SendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error) // TrySendMessage tries to send a message of a given topic. It will return an error if the inner channel is congested. TrySendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error) // CurrentAck is used to query the latest sequence number for a topic that is acknowledged by the server. // Note: currently only used for test. CurrentAck(topic Topic) (Seq, bool) }
MessageClient is an interface for sending messages to a remote peer.
type MessageClientConfig ¶
type MessageClientConfig struct { // The size of the sending channel used to buffer // messages before they go to gRPC. SendChannelSize int // The maximum duration for which messages wait to be batched. BatchSendInterval time.Duration // The maximum size in bytes of a batch. MaxBatchBytes int // The maximum number of messages in a batch. MaxBatchCount int // The limit of the rate at which the connection to the server is retried. RetryRateLimitPerSecond float64 // The dial timeout for the gRPC client DialTimeout time.Duration // The advertised address of this node. Used for logging and monitoring purposes. AdvertisedAddr string // The version of the client for compatibility check. // It should be in semver format. Empty string means no check. ClientVersion string // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. MaxRecvMsgSize int }
MessageClientConfig is used to configure MessageClient
type MessageClientStream ¶
type MessageClientStream = proto.CDCPeerToPeer_SendMessageClient
MessageClientStream is an alias for the protobuf-generated interface for the message service.
type MessageEntry ¶
type MessageEntry = *proto.MessageEntry
MessageEntry is an alias for the protobuf-generated type for a message.
type MessageRouter ¶
type MessageRouter interface { // AddPeer should be invoked when a new peer is discovered. AddPeer(id NodeID, addr string) // RemovePeer should be invoked when a peer is determined to // be permanently unavailable. RemovePeer(id NodeID) // GetClient returns a MessageClient for `target`. It returns // nil if the target peer does not exist. The returned client // is canceled if RemovePeer is called on `target`. GetClient(target NodeID) MessageClient // GetLocalChannel returns a channel that can be used for intra-node communication. GetLocalChannel() <-chan RawMessageEntry // Close cancels all clients maintained internally and waits for all clients to exit. Close() // Err returns a channel to receive errors from. Err() <-chan error }
MessageRouter is used to maintain clients to all the peers in the cluster that the local node needs to communicate with.
type MessageServer ¶
type MessageServer struct {
// contains filtered or unexported fields
}
MessageServer is an implementation of the gRPC server for the peer-to-peer system
func NewMessageServer ¶
func NewMessageServer(serverID NodeID, config *MessageServerConfig) *MessageServer
NewMessageServer creates a new MessageServer
func (*MessageServer) AddHandler ¶
func (m *MessageServer) AddHandler( ctx context.Context, topic string, tpi typeInformation, fn func(string, interface{}) error, ) (chan struct{}, <-chan error, error)
AddHandler registers a handler for messages in a given topic.
func (*MessageServer) RemoveHandler ¶
func (m *MessageServer) RemoveHandler(ctx context.Context, topic string) (chan struct{}, error)
RemoveHandler removes the registered handler for the given topic.
func (*MessageServer) Run ¶
func (m *MessageServer) Run(ctx context.Context, localCh <-chan RawMessageEntry) error
Run starts the MessageServer's worker goroutines. It must be running to provide the gRPC service.
func (*MessageServer) ScheduleDeregisterPeerTask ¶
func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error
ScheduleDeregisterPeerTask schedules a task to deregister a peer.
func (*MessageServer) SendMessage ¶
func (m *MessageServer) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
SendMessage implements the gRPC call SendMessage.
func (*MessageServer) SyncAddHandler ¶
func (m *MessageServer) SyncAddHandler( ctx context.Context, topic string, tpi typeInformation, fn func(string, interface{}) error, ) (<-chan error, error)
SyncAddHandler registers a handler for messages in a given topic and waits for the operation to complete.
func (*MessageServer) SyncRemoveHandler ¶
func (m *MessageServer) SyncRemoveHandler(ctx context.Context, topic string) error
SyncRemoveHandler removes the registered handler for the given topic and wait for the operation to complete.
type MessageServerConfig ¶
type MessageServerConfig struct { // The maximum number of entries to be cached for topics with no handler registered MaxPendingMessageCountPerTopic int // The maximum number of unhandled internal tasks for the main thread. MaxPendingTaskCount int // The size of the channel for pending messages before sending them to gRPC. SendChannelSize int // The interval between ACKs. AckInterval time.Duration // The size of the goroutine pool for running the handlers. WorkerPoolSize int // The maximum send rate per stream (per peer). SendRateLimitPerStream float64 // The maximum number of peers acceptable by this server MaxPeerCount int // Semver of the server. Empty string means no version check. ServerVersion string // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. MaxRecvMsgSize int // After a duration of this time if the server doesn't see any activity it // pings the client to see if the transport is still alive. KeepAliveTime time.Duration // After having pinged for keepalive check, the server waits for a duration // of Timeout and if no activity is seen even after that the connection is // closed. KeepAliveTimeout time.Duration // The maximum time duration to wait before forcefully removing a handler. // // waitUnregisterHandleTimeout specifies how long to wait for // the topic handler to consume all pending messages before // forcefully unregister the handler. // For a correct implementation of the handler, the time it needs // to consume these messages is minimal, as the handler is not // expected to block on channels, etc. WaitUnregisterHandleTimeoutThreshold time.Duration }
MessageServerConfig stores configurations for the MessageServer
type MessageServerStream ¶
type MessageServerStream = proto.CDCPeerToPeer_SendMessageServer
MessageServerStream is an alias for the protobuf-generated interface for the message service.
type MockCluster ¶
MockCluster mocks the whole peer-messaging cluster.
func NewMockCluster ¶
func NewMockCluster(t *testing.T, nodeCount int) *MockCluster
NewMockCluster creates a mock cluster.
type MockNode ¶
type MockNode struct { Addr string ID NodeID Server *MessageServer Router MessageRouter // contains filtered or unexported fields }
MockNode represents one mock node.
type NodeID ¶
type NodeID = string
NodeID represents the identifier of a sender node. Using IP address is not enough because of possible restarts.
type RawMessageEntry ¶
type RawMessageEntry struct {
// contains filtered or unexported fields
}
RawMessageEntry is an alias for the protobuf-generated type for a message.
type Serializable ¶
Serializable is an interface for defining custom serialization methods for peer messages.
type ServerWrapper ¶
type ServerWrapper struct {
// contains filtered or unexported fields
}
ServerWrapper implements a CDCPeerToPeerServer, and it maintains an inner CDCPeerToPeerServer instance that can be replaced as needed.
func NewServerWrapper ¶
func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper
NewServerWrapper creates a new ServerWrapper
func (*ServerWrapper) Reset ¶
func (s *ServerWrapper) Reset(inner p2p.CDCPeerToPeerServer)
Reset resets the inner server object in the ServerWrapper
func (*ServerWrapper) SendMessage ¶
func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
SendMessage implements p2p.CDCPeerToPeerServer
func (*ServerWrapper) ServerOptions ¶
func (s *ServerWrapper) ServerOptions() []grpc.ServerOption
ServerOptions returns server option for creating grpc servers.