Documentation ¶
Overview ¶
Package transport implements the transport component used for exchanging Raft messages between NodeHosts.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- func NewNOOPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.MessageHandler, ...) raftio.ITransport
- func NewTCPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.MessageHandler, ...) raftio.ITransport
- type Chunk
- type DefaultTransportFactory
- type IMessageHandler
- type INodeRegistry
- type IResolver
- type ITransport
- type ITransportEvent
- type NOOPConnection
- type NOOPSnapshotConnection
- type NOOPTransport
- func (g *NOOPTransport) Close() error
- func (g *NOOPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
- func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
- func (g *NOOPTransport) Name() string
- func (g *NOOPTransport) Start() error
- type NOOPTransportFactory
- type NodeHostIDRegistry
- func (n *NodeHostIDRegistry) Add(clusterID uint64, nodeID uint64, target string)
- func (n *NodeHostIDRegistry) AdvertiseAddress() string
- func (n *NodeHostIDRegistry) Close() error
- func (n *NodeHostIDRegistry) NumMembers() int
- func (n *NodeHostIDRegistry) Remove(clusterID uint64, nodeID uint64)
- func (n *NodeHostIDRegistry) RemoveCluster(clusterID uint64)
- func (n *NodeHostIDRegistry) Resolve(clusterID uint64, nodeID uint64) (string, string, error)
- type Registry
- type SendMessageBatchFunc
- type Sink
- type StreamChunkSendFunc
- type TCP
- type TCPConnection
- type TCPSnapshotConnection
- type Transport
- func (t *Transport) Close() error
- func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker
- func (t *Transport) GetStreamSink(clusterID uint64, nodeID uint64) *Sink
- func (t *Transport) GetTrans() raftio.ITransport
- func (t *Transport) Name() string
- func (t *Transport) Send(req pb.Message) bool
- func (t *Transport) SendSnapshot(m pb.Message) bool
- func (t *Transport) SetPreSendBatchHook(h SendMessageBatchFunc)
- func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
Constants ¶
const (
// TCPTransportName is the name of the tcp transport module.
TCPTransportName = "go-tcp-transport"
)
Variables ¶
var ( // ErrStopped is the error returned to indicate that the connection has // already been stopped. ErrStopped = errors.New("connection stopped") // ErrStreamSnapshot is the error returned to indicate that snapshot // streaming failed. ErrStreamSnapshot = errors.New("stream snapshot failed") )
var ( // NOOPRaftName is the module name for the NOOP transport module. NOOPRaftName = "noop-test-transport" // ErrRequestedToFail is the error used to indicate that the error is // requested. ErrRequestedToFail = errors.New("requested to returned error") )
var ( // ErrBadMessage is the error returned to indicate the incoming message is // corrupted. ErrBadMessage = errors.New("invalid message") )
var ( // ErrSnapshotOutOfDate is returned when the snapshot being received is // considered as out of date. ErrSnapshotOutOfDate = errors.New("snapshot is out of date") )
var ( // ErrUnknownTarget is the error returned when the target address of the node // is unknown. ErrUnknownTarget = errors.New("target address unknown") )
Functions ¶
func NewNOOPTransport ¶
func NewNOOPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) raftio.ITransport
NewNOOPTransport creates a new NOOPTransport instance.
func NewTCPTransport ¶
func NewTCPTransport(nhConfig config.NodeHostConfig, requestHandler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) raftio.ITransport
NewTCPTransport creates and returns a new TCP transport module.
Types ¶
type Chunk ¶
type Chunk struct {
// contains filtered or unexported fields
}
Chunk managed on the receiving side
type DefaultTransportFactory ¶
type DefaultTransportFactory struct{}
DefaultTransportFactory is the default transport module used.
func (*DefaultTransportFactory) Create ¶
func (dtm *DefaultTransportFactory) Create(nhConfig config.NodeHostConfig, handler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) raftio.ITransport
Create creates a default transport instance.
func (*DefaultTransportFactory) Validate ¶
func (dtm *DefaultTransportFactory) Validate(addr string) bool
Validate returns a boolean value indicating whether the specified address is valid.
type IMessageHandler ¶
type IMessageHandler interface { HandleMessageBatch(batch pb.MessageBatch) (uint64, uint64) HandleUnreachable(clusterID uint64, nodeID uint64) HandleSnapshotStatus(clusterID uint64, nodeID uint64, rejected bool) HandleSnapshot(clusterID uint64, nodeID uint64, from uint64) }
IMessageHandler is the interface required to handle incoming raft requests.
type INodeRegistry ¶
type INodeRegistry interface { Close() error Add(clusterID uint64, nodeID uint64, url string) Remove(clusterID uint64, nodeID uint64) RemoveCluster(clusterID uint64) Resolve(clusterID uint64, nodeID uint64) (string, string, error) }
INodeRegistry is the local registry interface used to keep all known nodes in the system..
func NewNodeHostIDRegistry ¶
func NewNodeHostIDRegistry(nhid string, nhConfig config.NodeHostConfig, streamConnections uint64, v config.TargetValidator) (INodeRegistry, error)
NewNodeHostIDRegistry creates a new NodeHostIDRegistry instance.
type IResolver ¶
type IResolver interface { Resolve(uint64, uint64) (string, string, error) Add(uint64, uint64, string) }
IResolver converts the (cluster id, node id( tuple to network address.
type ITransport ¶
type ITransport interface { Name() string Send(pb.Message) bool SendSnapshot(pb.Message) bool GetStreamSink(clusterID uint64, nodeID uint64) *Sink Close() error }
ITransport is the interface of the transport layer used for exchanging Raft messages.
type ITransportEvent ¶
type ITransportEvent interface { ConnectionEstablished(string, bool) ConnectionFailed(string, bool) }
ITransportEvent is the interface for notifying connection status changes.
type NOOPConnection ¶
type NOOPConnection struct {
// contains filtered or unexported fields
}
NOOPConnection is the connection used to exchange messages between node hosts.
func (*NOOPConnection) Close ¶
func (c *NOOPConnection) Close()
Close closes the NOOPConnection instance.
func (*NOOPConnection) SendMessageBatch ¶
func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error
SendMessageBatch return ErrRequestedToFail when requested.
type NOOPSnapshotConnection ¶
type NOOPSnapshotConnection struct {
// contains filtered or unexported fields
}
NOOPSnapshotConnection is the connection used to send snapshots.
func (*NOOPSnapshotConnection) Close ¶
func (c *NOOPSnapshotConnection) Close()
Close closes the NOOPSnapshotConnection.
type NOOPTransport ¶
type NOOPTransport struct {
// contains filtered or unexported fields
}
NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.
func (*NOOPTransport) Close ¶
func (g *NOOPTransport) Close() error
Close closes the NOOPTransport instance.
func (*NOOPTransport) GetConnection ¶
func (g *NOOPTransport) GetConnection(ctx context.Context, target string) (raftio.IConnection, error)
GetConnection returns a connection.
func (*NOOPTransport) GetSnapshotConnection ¶
func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
GetSnapshotConnection returns a snapshot connection.
func (*NOOPTransport) Start ¶
func (g *NOOPTransport) Start() error
Start starts the NOOPTransport instance.
type NOOPTransportFactory ¶
type NOOPTransportFactory struct{}
NOOPTransportFactory is a NOOP transport module used in testing
func (*NOOPTransportFactory) Create ¶
func (n *NOOPTransportFactory) Create(nhConfig config.NodeHostConfig, handler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) raftio.ITransport
Create creates a noop transport instance.
func (*NOOPTransportFactory) Validate ¶
func (n *NOOPTransportFactory) Validate(addr string) bool
Validate returns a boolean value indicating whether the input address is valid.
type NodeHostIDRegistry ¶
type NodeHostIDRegistry struct {
// contains filtered or unexported fields
}
NodeHostIDRegistry is a node registry backed by gossip. It is capable of supporting NodeHosts with dynamic RaftAddress values.
func (*NodeHostIDRegistry) Add ¶
func (n *NodeHostIDRegistry) Add(clusterID uint64, nodeID uint64, target string)
Add adds a new node with its known NodeHostID to the registry.
func (*NodeHostIDRegistry) AdvertiseAddress ¶
func (n *NodeHostIDRegistry) AdvertiseAddress() string
AdvertiseAddress returns the advertise address of the gossip service.
func (*NodeHostIDRegistry) Close ¶
func (n *NodeHostIDRegistry) Close() error
Close closes the NodeHostIDRegistry instance.
func (*NodeHostIDRegistry) NumMembers ¶
func (n *NodeHostIDRegistry) NumMembers() int
NumMembers returns the number of live nodes known by the gossip service.
func (*NodeHostIDRegistry) Remove ¶
func (n *NodeHostIDRegistry) Remove(clusterID uint64, nodeID uint64)
Remove removes the specified node from the registry.
func (*NodeHostIDRegistry) RemoveCluster ¶
func (n *NodeHostIDRegistry) RemoveCluster(clusterID uint64)
RemoveCluster removes the specified node from the registry.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is used to manage all known node addresses in the multi raft system. The transport layer uses this address registry to locate nodes.
func NewNodeRegistry ¶
func NewNodeRegistry(streamConnections uint64, v config.TargetValidator) *Registry
NewNodeRegistry returns a new Registry object.
func (*Registry) RemoveCluster ¶
RemoveCluster removes all nodes info associated with the specified cluster
type SendMessageBatchFunc ¶
type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)
SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink is the chunk sink for receiving generated snapshot chunk.
type StreamChunkSendFunc ¶
StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.
type TCP ¶
type TCP struct {
// contains filtered or unexported fields
}
TCP is a TCP based transport module for exchanging raft messages and snapshots between NodeHost instances.
func (*TCP) GetConnection ¶
GetConnection returns a new raftio.IConnection for sending raft messages.
func (*TCP) GetSnapshotConnection ¶
func (t *TCP) GetSnapshotConnection(ctx context.Context, target string) (raftio.ISnapshotConnection, error)
GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.
type TCPConnection ¶
type TCPConnection struct {
// contains filtered or unexported fields
}
TCPConnection is the connection used for sending raft messages to remote nodes.
func NewTCPConnection ¶
func NewTCPConnection(conn net.Conn, rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPConnection
NewTCPConnection creates and returns a new TCPConnection instance.
func (*TCPConnection) Close ¶
func (c *TCPConnection) Close()
Close closes the TCPConnection instance.
func (*TCPConnection) SendMessageBatch ¶
func (c *TCPConnection) SendMessageBatch(batch pb.MessageBatch) error
SendMessageBatch sends a raft message batch to remote node.
type TCPSnapshotConnection ¶
type TCPSnapshotConnection struct {
// contains filtered or unexported fields
}
TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.
func NewTCPSnapshotConnection ¶
func NewTCPSnapshotConnection(conn net.Conn, rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPSnapshotConnection
NewTCPSnapshotConnection creates and returns a new snapshot connection.
func (*TCPSnapshotConnection) Close ¶
func (c *TCPSnapshotConnection) Close()
Close closes the snapshot connection.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is the transport layer for delivering raft messages and snapshots.
func NewTransport ¶
func NewTransport(nhConfig config.NodeHostConfig, handler IMessageHandler, env *server.Env, resolver IResolver, dir server.SnapshotDirFunc, sysEvents ITransportEvent, fs vfs.IFS) (*Transport, error)
NewTransport creates a new Transport object.
func (*Transport) GetCircuitBreaker ¶
GetCircuitBreaker returns the circuit breaker used for the specified target node.
func (*Transport) GetStreamSink ¶
GetStreamSink returns a connection used for streaming snapshot.
func (*Transport) GetTrans ¶
func (t *Transport) GetTrans() raftio.ITransport
GetTrans returns the transport instance.
func (*Transport) Send ¶
Send asynchronously sends raft messages to their target nodes.
The generic async send Go pattern used in Send() is found in CockroachDB's codebase.
func (*Transport) SendSnapshot ¶
SendSnapshot asynchronously sends raft snapshot message to its target.
func (*Transport) SetPreSendBatchHook ¶
func (t *Transport) SetPreSendBatchHook(h SendMessageBatchFunc)
SetPreSendBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.
func (*Transport) SetPreStreamChunkSendHook ¶
func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.