p2p

package
v0.0.0-...-8830dc8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: Apache-2.0 Imports: 33 Imported by: 9

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics initializes metrics used by pkg/p2p

func NewGrpcMessageClient

func NewGrpcMessageClient(senderID NodeID, config *MessageClientConfig) *grpcMessageClient

NewGrpcMessageClient creates a new MessageClient senderID is an identifier for the local node.

func NewMessageRouter

func NewMessageRouter(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl

NewMessageRouter creates a new MessageRouter

func NewMessageRouterWithLocalClient

func NewMessageRouterWithLocalClient(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl

NewMessageRouterWithLocalClient creates a new MessageRouter with a local client.

Types

type MessageClient

type MessageClient interface {
	// Run should be executed in a dedicated goroutine and it would block unless an irrecoverable error has been encountered.
	Run(ctx context.Context, network string, addr string, receiverID NodeID, credential *security.Credential) (ret error)

	// SendMessage sends a message of a given topic. It would block if the inner channel is congested.
	SendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error)

	// TrySendMessage tries to send a message of a given topic. It will return an error if the inner channel is congested.
	TrySendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error)

	// CurrentAck is used to query the latest sequence number for a topic that is acknowledged by the server.
	// Note: currently only used for test.
	CurrentAck(topic Topic) (Seq, bool)
}

MessageClient is an interface for sending messages to a remote peer.

type MessageClientConfig

type MessageClientConfig struct {
	// The size of the sending channel used to buffer
	// messages before they go to gRPC.
	SendChannelSize int
	// The maximum duration for which messages wait to be batched.
	BatchSendInterval time.Duration
	// The maximum size in bytes of a batch.
	MaxBatchBytes int
	// The maximum number of messages in a batch.
	MaxBatchCount int
	// The limit of the rate at which the connection to the server is retried.
	RetryRateLimitPerSecond float64
	// The dial timeout for the gRPC client
	DialTimeout time.Duration
	// The advertised address of this node. Used for logging and monitoring purposes.
	AdvertisedAddr string
	// The version of the client for compatibility check.
	// It should be in semver format. Empty string means no check.
	ClientVersion string
	// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
	MaxRecvMsgSize int
}

MessageClientConfig is used to configure MessageClient

type MessageClientStream

type MessageClientStream = proto.CDCPeerToPeer_SendMessageClient

MessageClientStream is an alias for the protobuf-generated interface for the message service.

type MessageEntry

type MessageEntry = *proto.MessageEntry

MessageEntry is an alias for the protobuf-generated type for a message.

type MessageRouter

type MessageRouter interface {
	// AddPeer should be invoked when a new peer is discovered.
	AddPeer(id NodeID, addr string)
	// RemovePeer should be invoked when a peer is determined to
	// be permanently unavailable.
	RemovePeer(id NodeID)
	// GetClient returns a MessageClient for `target`. It returns
	// nil if the target peer does not exist. The returned client
	// is canceled if RemovePeer is called on `target`.
	GetClient(target NodeID) MessageClient
	// GetLocalChannel returns a channel that can be used for intra-node communication.
	GetLocalChannel() <-chan RawMessageEntry
	// Close cancels all clients maintained internally and waits for all clients to exit.
	Close()
	// Err returns a channel to receive errors from.
	Err() <-chan error
}

MessageRouter is used to maintain clients to all the peers in the cluster that the local node needs to communicate with.

type MessageServer

type MessageServer struct {
	// contains filtered or unexported fields
}

MessageServer is an implementation of the gRPC server for the peer-to-peer system

func NewMessageServer

func NewMessageServer(serverID NodeID, config *MessageServerConfig) *MessageServer

NewMessageServer creates a new MessageServer

func (*MessageServer) AddHandler

func (m *MessageServer) AddHandler(
	ctx context.Context,
	topic string,
	tpi typeInformation,
	fn func(string, interface{}) error,
) (chan struct{}, <-chan error, error)

AddHandler registers a handler for messages in a given topic.

func (*MessageServer) RemoveHandler

func (m *MessageServer) RemoveHandler(ctx context.Context, topic string) (chan struct{}, error)

RemoveHandler removes the registered handler for the given topic.

func (*MessageServer) Run

func (m *MessageServer) Run(ctx context.Context, localCh <-chan RawMessageEntry) error

Run starts the MessageServer's worker goroutines. It must be running to provide the gRPC service.

func (*MessageServer) ScheduleDeregisterPeerTask

func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error

ScheduleDeregisterPeerTask schedules a task to deregister a peer.

func (*MessageServer) SendMessage

func (m *MessageServer) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error

SendMessage implements the gRPC call SendMessage.

func (*MessageServer) SyncAddHandler

func (m *MessageServer) SyncAddHandler(
	ctx context.Context,
	topic string,
	tpi typeInformation,
	fn func(string, interface{}) error,
) (<-chan error, error)

SyncAddHandler registers a handler for messages in a given topic and waits for the operation to complete.

func (*MessageServer) SyncRemoveHandler

func (m *MessageServer) SyncRemoveHandler(ctx context.Context, topic string) error

SyncRemoveHandler removes the registered handler for the given topic and wait for the operation to complete.

type MessageServerConfig

type MessageServerConfig struct {
	// The maximum number of entries to be cached for topics with no handler registered
	MaxPendingMessageCountPerTopic int
	// The maximum number of unhandled internal tasks for the main thread.
	MaxPendingTaskCount int
	// The size of the channel for pending messages before sending them to gRPC.
	SendChannelSize int
	// The interval between ACKs.
	AckInterval time.Duration
	// The size of the goroutine pool for running the handlers.
	WorkerPoolSize int
	// The maximum send rate per stream (per peer).
	SendRateLimitPerStream float64
	// The maximum number of peers acceptable by this server
	MaxPeerCount int
	// Semver of the server. Empty string means no version check.
	ServerVersion string
	// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
	MaxRecvMsgSize int

	// After a duration of this time if the server doesn't see any activity it
	// pings the client to see if the transport is still alive.
	KeepAliveTime time.Duration

	// After having pinged for keepalive check, the server waits for a duration
	// of Timeout and if no activity is seen even after that the connection is
	// closed.
	KeepAliveTimeout time.Duration

	// The maximum time duration to wait before forcefully removing a handler.
	//
	// waitUnregisterHandleTimeout specifies how long to wait for
	// the topic handler to consume all pending messages before
	// forcefully unregister the handler.
	// For a correct implementation of the handler, the time it needs
	// to consume these messages is minimal, as the handler is not
	// expected to block on channels, etc.
	WaitUnregisterHandleTimeoutThreshold time.Duration
}

MessageServerConfig stores configurations for the MessageServer

type MessageServerStream

type MessageServerStream = proto.CDCPeerToPeer_SendMessageServer

MessageServerStream is an alias for the protobuf-generated interface for the message service.

type MockCluster

type MockCluster struct {
	Nodes map[NodeID]*MockNode
}

MockCluster mocks the whole peer-messaging cluster.

func NewMockCluster

func NewMockCluster(t *testing.T, nodeCount int) *MockCluster

NewMockCluster creates a mock cluster.

func (*MockCluster) Close

func (c *MockCluster) Close()

Close closes the mock cluster.

type MockNode

type MockNode struct {
	Addr string
	ID   NodeID

	Server *MessageServer
	Router MessageRouter
	// contains filtered or unexported fields
}

MockNode represents one mock node.

func (*MockNode) Close

func (n *MockNode) Close()

Close closes the mock node.

type NodeID

type NodeID = string

NodeID represents the identifier of a sender node. Using IP address is not enough because of possible restarts.

type RawMessageEntry

type RawMessageEntry struct {
	// contains filtered or unexported fields
}

RawMessageEntry is an alias for the protobuf-generated type for a message.

type Seq

type Seq = int64

Seq represents the serial number of a message for a given topic.

type Serializable

type Serializable interface {
	Marshal() ([]byte, error)
	Unmarshal(data []byte) error
}

Serializable is an interface for defining custom serialization methods for peer messages.

type ServerWrapper

type ServerWrapper struct {
	// contains filtered or unexported fields
}

ServerWrapper implements a CDCPeerToPeerServer, and it maintains an inner CDCPeerToPeerServer instance that can be replaced as needed.

func NewServerWrapper

func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper

NewServerWrapper creates a new ServerWrapper

func (*ServerWrapper) Reset

func (s *ServerWrapper) Reset(inner p2p.CDCPeerToPeerServer)

Reset resets the inner server object in the ServerWrapper

func (*ServerWrapper) SendMessage

func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error

SendMessage implements p2p.CDCPeerToPeerServer

func (*ServerWrapper) ServerOptions

func (s *ServerWrapper) ServerOptions() []grpc.ServerOption

ServerOptions returns server option for creating grpc servers.

type Topic

type Topic = string

Topic represents the topic for a peer-to-peer message

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL