Documentation ¶
Overview ¶
Package transport implements the transport component used for exchanging Raft messages between NodeHosts.
This package is internally used by Dragonboat, applications are not expected to import this package.
Index ¶
- Constants
- Variables
- func SplitSnapshotMessage(m pb.Message) ([]pb.Chunk, error)
- type Chunk
- type FailedSend
- type IResolver
- type Metrics
- type NOOPMetrics
- func (n *NOOPMetrics) MessageConnectionFailure()
- func (n *NOOPMetrics) MessageSendFailure(count uint64)
- func (n *NOOPMetrics) MessageSendSuccess(count uint64)
- func (n *NOOPMetrics) ReceivedMessages(ss uint64, msg uint64, dropped uint64)
- func (n *NOOPMetrics) SnapshotCnnectionFailure()
- func (n *NOOPMetrics) SnapshotSendFailure()
- func (n *NOOPMetrics) SnapshotSendSuccess()
- type SendMessageBatchFunc
- type Sink
- type SnapshotJob
- type StreamChunkSendFunc
- type Transport
- func (t *Transport[T]) Close() error
- func (t *Transport[T]) GetCircuitBreaker(key string) *circuit.Breaker
- func (t *Transport[T]) GetDeploymentID() uint64
- func (t *Transport[T]) GetSnapshotDirFunc() func(clusterID uint64, nodeID uint64) string
- func (t *Transport[T]) GetStreamSink(clusterID uint64, nodeID uint64) pb.IChunkSink
- func (t *Transport[T]) GetTrans() T
- func (t *Transport[T]) HandleRequest(req pb.MessageBatch)
- func (t *Transport[T]) JobsCount() uint64
- func (t *Transport[T]) Name() string
- func (t *Transport[T]) QueueSize() int
- func (t *Transport[T]) Send(req pb.Message) bool
- func (t *Transport[T]) SendResult(req pb.Message) (bool, FailedSend)
- func (t *Transport[T]) SendSnapshot(m pb.Message) bool
- func (t *Transport[T]) SetPreSendBatchHook(h SendMessageBatchFunc)
- func (t *Transport[T]) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
- func (t *Transport[T]) SnapshotReceived(clusterID uint64, nodeID uint64, from uint64)
Constants ¶
const ( SnapshotChunkSize uint64 = 2 * 1024 * 1024 MaxConnectionCount uint64 = 64 )
const (
StreamingChanLength = 4
)
Variables ¶
var ( // ErrStopped is the error returned to indicate that the connection has // already been stopped. ErrStopped = errors.New("connection stopped") // ErrStreamSnapshot is the error returned to indicate that snapshot // streaming failed. ErrStreamSnapshot = errors.New("stream snapshot failed") )
var ( // ErrSnapshotOutOfDate is returned when the snapshot being received is // considered as out of date. ErrSnapshotOutOfDate = errors.New("snapshot is out of date") )
Functions ¶
Types ¶
type Chunk ¶
Chunk managed on the receiving side.
func NewChunk ¶
func NewChunk(onReceive func(pb.MessageBatch), confirm func(uint64, uint64, uint64), dir server.SnapshotDirFunc, did uint64, fs vfs.FS) *Chunk
NewChunk creates and returns a new snapshot chunks instance.
func (*Chunk) ToMessage ¶
func (c *Chunk) ToMessage(chunk pb.Chunk, files []*pb.SnapshotFile) pb.MessageBatch
type FailedSend ¶
type FailedSend uint64
const ( Success FailedSend = iota CircuitBreakerNotReady UnknownTarget RateLimited ChanIsFull )
type IResolver ¶
type IResolver interface { Resolve(uint64, uint64) (string, string, error) Add(uint64, uint64, string) }
IResolver converts the (cluster id, node id( tuple to network address.
type NOOPMetrics ¶
type NOOPMetrics struct { }
func (*NOOPMetrics) MessageConnectionFailure ¶
func (n *NOOPMetrics) MessageConnectionFailure()
func (*NOOPMetrics) MessageSendFailure ¶
func (n *NOOPMetrics) MessageSendFailure(count uint64)
func (*NOOPMetrics) MessageSendSuccess ¶
func (n *NOOPMetrics) MessageSendSuccess(count uint64)
func (*NOOPMetrics) ReceivedMessages ¶
func (n *NOOPMetrics) ReceivedMessages(ss uint64, msg uint64, dropped uint64)
func (*NOOPMetrics) SnapshotCnnectionFailure ¶
func (n *NOOPMetrics) SnapshotCnnectionFailure()
func (*NOOPMetrics) SnapshotSendFailure ¶
func (n *NOOPMetrics) SnapshotSendFailure()
func (*NOOPMetrics) SnapshotSendSuccess ¶
func (n *NOOPMetrics) SnapshotSendSuccess()
type SendMessageBatchFunc ¶
type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)
SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.
type Sink ¶
type Sink struct {
J *SnapshotJob
}
Sink is the chunk sink for receiving generated snapshot chunk.
type SnapshotJob ¶
type SnapshotJob struct { Conn raftio.ISnapshotConnection Stream chan pb.Chunk // contains filtered or unexported fields }
func (*SnapshotJob) AddSnapshot ¶
func (j *SnapshotJob) AddSnapshot(chunks []pb.Chunk)
func (*SnapshotJob) Close ¶
func (j *SnapshotJob) Close()
func (*SnapshotJob) Connect ¶
func (j *SnapshotJob) Connect(addr string) error
func (*SnapshotJob) Process ¶
func (j *SnapshotJob) Process() error
type StreamChunkSendFunc ¶
StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.
type Transport ¶
type Transport[T raftio.ITransport] struct { // contains filtered or unexported fields }
Transport is the transport layer for delivering raft messages and snapshots.
func Factory ¶
func Factory[T raftio.ITransport](cfg config.NodeHostConfig, resolver IResolver, handler pb.IMessageHandler, event pb.ITransportEvent, dir func(cid uint64, nid uint64) string, transportFactory func(requestHandler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) T) (*Transport[T], error)
Factory creates a new Transport object.
func (*Transport[T]) GetCircuitBreaker ¶
GetCircuitBreaker returns the circuit breaker used for the specified target node.
func (*Transport[T]) GetDeploymentID ¶
func (*Transport[T]) GetSnapshotDirFunc ¶
func (*Transport[T]) GetStreamSink ¶
func (t *Transport[T]) GetStreamSink(clusterID uint64, nodeID uint64) pb.IChunkSink
GetStreamSink returns a connection used for streaming snapshot.
func (*Transport[T]) GetTrans ¶
func (t *Transport[T]) GetTrans() T
GetTrans returns the transport instance.
func (*Transport[T]) HandleRequest ¶
func (t *Transport[T]) HandleRequest(req pb.MessageBatch)
func (*Transport[T]) Send ¶
Send asynchronously sends raft messages to their target nodes.
The generic async SendResult Go pattern used in Send() is found in CockroachDB's codebase.
func (*Transport[T]) SendResult ¶
func (t *Transport[T]) SendResult(req pb.Message) (bool, FailedSend)
func (*Transport[T]) SendSnapshot ¶
SendSnapshot asynchronously sends raft snapshot message to its target.
func (*Transport[T]) SetPreSendBatchHook ¶
func (t *Transport[T]) SetPreSendBatchHook(h SendMessageBatchFunc)
SetPreSendBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.
func (*Transport[T]) SetPreStreamChunkSendHook ¶
func (t *Transport[T]) SetPreStreamChunkSendHook(h StreamChunkSendFunc)
SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.