Documentation ¶
Overview ¶
Package transport implements the transport component used for exchanging Raft messages between NodeHosts.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- func NewNOOPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.RequestHandler, ...) raftio.IRaftRPC
- func NewTCPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.RequestHandler, ...) raftio.IRaftRPC
- type ChunkFile
- type Chunks
- type DeploymentID
- type INodeAddressResolver
- type INodeRegistry
- type IRaftMessageHandler
- type ITransport
- type ITransportEvent
- type Marshaler
- type NOOPConnection
- type NOOPSnapshotConnection
- type NOOPTransport
- func (g *NOOPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
- func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
- func (g *NOOPTransport) Name() string
- func (g *NOOPTransport) Start() error
- func (g *NOOPTransport) Stop()
- type Nodes
- func (n *Nodes) AddNode(clusterID uint64, nodeID uint64, url string)
- func (n *Nodes) AddRemoteAddress(clusterID uint64, nodeID uint64, address string)
- func (n *Nodes) RemoveAllPeers()
- func (n *Nodes) RemoveCluster(clusterID uint64)
- func (n *Nodes) RemoveNode(clusterID uint64, nodeID uint64)
- func (n *Nodes) Resolve(clusterID uint64, nodeID uint64) (string, string, error)
- func (n *Nodes) ReverseResolve(addr string) []raftio.NodeInfo
- type SendMessageBatchFunc
- type Sink
- type StreamChunkSendFunc
- type TCPConnection
- type TCPSnapshotConnection
- type TCPTransport
- func (g *TCPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
- func (g *TCPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
- func (g *TCPTransport) Name() string
- func (g *TCPTransport) Start() error
- func (g *TCPTransport) Stop()
- type Transport
- func (t *Transport) ASyncSend(req pb.Message) bool
- func (t *Transport) ASyncSendSnapshot(m pb.Message) bool
- func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker
- func (t *Transport) GetRaftRPC() raftio.IRaftRPC
- func (t *Transport) GetStreamConnection(clusterID uint64, nodeID uint64) *Sink
- func (t *Transport) Name() string
- func (t *Transport) SetMessageHandler(handler IRaftMessageHandler)
- func (t *Transport) SetPreSendMessageBatchHook(h SendMessageBatchFunc)
- func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
- func (t *Transport) Stop()
Constants ¶
const (
// TCPRaftRPCName is the name of the tcp RPC module.
TCPRaftRPCName = "go-tcp-transport"
)
const ( // UnmanagedDeploymentID is the special DeploymentID used when the system is // not managed by master servers. UnmanagedDeploymentID = uint64(1) )
Variables ¶
var ( // ErrStopped is the error returned to indicate that the connection has // already been stopped. ErrStopped = errors.New("connection stopped") // ErrStreamSnapshot is the error returned to indicate that snapshot // streaming failed. ErrStreamSnapshot = errors.New("stream snapshot failed") )
var ( // NOOPRaftName is the module name for the NOOP transport module. NOOPRaftName = "noop-test-transport" // ErrRequestedToFail is the error used to indicate that the error is // requested. ErrRequestedToFail = errors.New("requested to returned error") )
var ( // ErrBadMessage is the error returned to indicate the incoming message is // corrupted. ErrBadMessage = errors.New("invalid message") )
var ( // ErrSnapshotOutOfDate is returned when the snapshot being received is // considered as out of date. ErrSnapshotOutOfDate = errors.New("snapshot is out of date") )
Functions ¶
func NewNOOPTransport ¶
func NewNOOPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.RequestHandler, chunkHandler raftio.IChunkHandler) raftio.IRaftRPC
NewNOOPTransport creates a new NOOPTransport instance.
func NewTCPTransport ¶
func NewTCPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.RequestHandler, chunkHandler raftio.IChunkHandler) raftio.IRaftRPC
NewTCPTransport creates and returns a new TCP transport module.
Types ¶
type ChunkFile ¶
type ChunkFile struct {
// contains filtered or unexported fields
}
ChunkFile is the snapshot chunk file being transferred.
func CreateChunkFile ¶
CreateChunkFile creates a new chunk file.
func OpenChunkFileForAppend ¶
OpenChunkFileForAppend opens the chunk file at fp for appending.
func OpenChunkFileForRead ¶
OpenChunkFileForRead opens for the chunk file for read-only operation.
type Chunks ¶
type Chunks struct {
// contains filtered or unexported fields
}
Chunks managed on the receiving side
func NewChunks ¶
func NewChunks(onReceive func(pb.MessageBatch), confirm func(uint64, uint64, uint64), getDeploymentID func() uint64, folder server.GetSnapshotDirFunc, fs vfs.IFS) *Chunks
NewChunks creates and returns a new snapshot chunks instance.
type DeploymentID ¶
type DeploymentID struct {
// contains filtered or unexported fields
}
DeploymentID struct is the manager type used to manage the deployment id value.
func (*DeploymentID) SetDeploymentID ¶
func (d *DeploymentID) SetDeploymentID(x uint64)
SetDeploymentID sets the deployment id to the specified value.
func (*DeploymentID) SetUnmanagedDeploymentID ¶
func (d *DeploymentID) SetUnmanagedDeploymentID()
SetUnmanagedDeploymentID sets the deployment id to indicate that the user is not managed.
type INodeAddressResolver ¶
type INodeAddressResolver interface { Resolve(uint64, uint64) (string, string, error) ReverseResolve(string) []raftio.NodeInfo AddRemoteAddress(uint64, uint64, string) }
INodeAddressResolver converts the (cluster id, node id( tuple to network address
type INodeRegistry ¶
type INodeRegistry interface { AddNode(clusterID uint64, nodeID uint64, url string) RemoveNode(clusterID uint64, nodeID uint64) RemoveCluster(clusterID uint64) Resolve(clusterID uint64, nodeID uint64) (string, string, error) }
INodeRegistry is the local registry interface used to keep all known nodes in the system..
type IRaftMessageHandler ¶
type IRaftMessageHandler interface { HandleMessageBatch(batch pb.MessageBatch) (uint64, uint64) HandleUnreachable(clusterID uint64, nodeID uint64) HandleSnapshotStatus(clusterID uint64, nodeID uint64, rejected bool) HandleSnapshot(clusterID uint64, nodeID uint64, from uint64) }
IRaftMessageHandler is the interface required to handle incoming raft requests.
type ITransport ¶
type ITransport interface { Name() string SetUnmanagedDeploymentID() SetDeploymentID(uint64) SetMessageHandler(IRaftMessageHandler) ASyncSend(pb.Message) bool ASyncSendSnapshot(pb.Message) bool GetStreamConnection(clusterID uint64, nodeID uint64) *Sink Stop() }
ITransport is the interface of the transport layer used for exchanging Raft messages.
type ITransportEvent ¶
type ITransportEvent interface { ConnectionEstablished(string, bool) ConnectionFailed(string, bool) }
ITransportEvent is the interface for notifying connection status changes.
type NOOPConnection ¶
type NOOPConnection struct {
// contains filtered or unexported fields
}
NOOPConnection is the connection used to exchange messages between node hosts.
func (*NOOPConnection) Close ¶
func (c *NOOPConnection) Close()
Close closes the NOOPConnection instance.
func (*NOOPConnection) SendMessageBatch ¶
func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error
SendMessageBatch return ErrRequestedToFail when requested.
type NOOPSnapshotConnection ¶
type NOOPSnapshotConnection struct {
// contains filtered or unexported fields
}
NOOPSnapshotConnection is the connection used to send snapshots.
func (*NOOPSnapshotConnection) Close ¶
func (c *NOOPSnapshotConnection) Close()
Close closes the NOOPSnapshotConnection.
type NOOPTransport ¶
type NOOPTransport struct {
// contains filtered or unexported fields
}
NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.
func (*NOOPTransport) GetConnection ¶
func (g *NOOPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
GetConnection returns a connection.
func (*NOOPTransport) GetSnapshotConnection ¶
func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
GetSnapshotConnection returns a snapshot connection.
func (*NOOPTransport) Start ¶
func (g *NOOPTransport) Start() error
Start starts the NOOPTransport instance.
type Nodes ¶
type Nodes struct {
// contains filtered or unexported fields
}
Nodes is used to manage all known node addresses in the multi raft system. The transport layer uses this address registry to locate nodes.
func (*Nodes) AddRemoteAddress ¶
AddRemoteAddress remembers the specified address obtained from the source of the incoming message.
func (*Nodes) RemoveCluster ¶
RemoveCluster removes all nodes info associated with the specified cluster
func (*Nodes) RemoveNode ¶
RemoveNode removes a remote from the node registry.
type SendMessageBatchFunc ¶
type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)
SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink is the chunk sink for receiving generated snapshot chunk.
type StreamChunkSendFunc ¶
StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.
type TCPConnection ¶
type TCPConnection struct {
// contains filtered or unexported fields
}
TCPConnection is the connection used for sending raft messages to remote nodes.
func NewTCPConnection ¶
func NewTCPConnection(conn net.Conn, rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPConnection
NewTCPConnection creates and returns a new TCPConnection instance.
func (*TCPConnection) Close ¶
func (c *TCPConnection) Close()
Close closes the TCPConnection instance.
func (*TCPConnection) SendMessageBatch ¶
func (c *TCPConnection) SendMessageBatch(batch pb.MessageBatch) error
SendMessageBatch sends a raft message batch to remote node.
type TCPSnapshotConnection ¶
type TCPSnapshotConnection struct {
// contains filtered or unexported fields
}
TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.
func NewTCPSnapshotConnection ¶
func NewTCPSnapshotConnection(conn net.Conn, rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPSnapshotConnection
NewTCPSnapshotConnection creates and returns a new snapshot connection.
func (*TCPSnapshotConnection) Close ¶
func (c *TCPSnapshotConnection) Close()
Close closes the snapshot connection.
type TCPTransport ¶
type TCPTransport struct {
// contains filtered or unexported fields
}
TCPTransport is a TCP based RPC module for exchanging raft messages and snapshots between NodeHost instances.
func (*TCPTransport) GetConnection ¶
func (g *TCPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
GetConnection returns a new raftio.IConnection for sending raft messages.
func (*TCPTransport) GetSnapshotConnection ¶
func (g *TCPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.
func (*TCPTransport) Name ¶
func (g *TCPTransport) Name() string
Name returns a human readable name of the TCP transport module.
func (*TCPTransport) Start ¶
func (g *TCPTransport) Start() error
Start starts the TCP transport module.
type Transport ¶
type Transport struct { DeploymentID // contains filtered or unexported fields }
Transport is the transport layer for delivering raft messages and snapshots.
func NewTransport ¶
func NewTransport(nhConfig config.NodeHostConfig, ctx *server.Context, resolver INodeAddressResolver, folder server.GetSnapshotDirFunc, sysEvents ITransportEvent, fs vfs.IFS) (*Transport, error)
NewTransport creates a new Transport object.
func (*Transport) ASyncSend ¶
ASyncSend sends raft messages using RPC
The generic async send Go pattern used in ASyncSend is found in CockroachDB's codebase.
func (*Transport) ASyncSendSnapshot ¶
ASyncSendSnapshot sends raft snapshot message to its target.
func (*Transport) GetCircuitBreaker ¶
GetCircuitBreaker returns the circuit breaker used for the specified target node.
func (*Transport) GetRaftRPC ¶
GetRaftRPC returns the raft RPC instance.
func (*Transport) GetStreamConnection ¶
GetStreamConnection returns a connection used for streaming snapshot.
func (*Transport) SetMessageHandler ¶
func (t *Transport) SetMessageHandler(handler IRaftMessageHandler)
SetMessageHandler sets the raft message handler.
func (*Transport) SetPreSendMessageBatchHook ¶
func (t *Transport) SetPreSendMessageBatchHook(h SendMessageBatchFunc)
SetPreSendMessageBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.
func (*Transport) SetPreStreamChunkSendHook ¶
func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.