transport

package
v0.0.0-...-93a8726 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2022 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package transport implements the transport component used for exchanging Raft messages between NodeHosts.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	SnapshotChunkSize  uint64 = 2 * 1024 * 1024
	MaxConnectionCount uint64 = 64
)
View Source
const (
	StreamingChanLength = 4
)

Variables

View Source
var (
	// ErrStopped is the error returned to indicate that the connection has
	// already been stopped.
	ErrStopped = errors.New("connection stopped")
	// ErrStreamSnapshot is the error returned to indicate that snapshot
	// streaming failed.
	ErrStreamSnapshot = errors.New("stream snapshot failed")
)
View Source
var (
	// ErrSnapshotOutOfDate is returned when the snapshot being received is
	// considered as out of date.
	ErrSnapshotOutOfDate = errors.New("snapshot is out of date")
)

Functions

func SplitSnapshotMessage

func SplitSnapshotMessage(m pb.Message) ([]pb.Chunk, error)

Types

type Chunk

type Chunk struct {
	FS vfs.FS

	Validate bool
	// contains filtered or unexported fields
}

Chunk managed on the receiving side.

func NewChunk

func NewChunk(onReceive func(pb.MessageBatch),
	confirm func(uint64, uint64, uint64), dir server.SnapshotDirFunc,
	did uint64, fs vfs.FS) *Chunk

NewChunk creates and returns a new snapshot chunks instance.

func (*Chunk) Add

func (c *Chunk) Add(chunk pb.Chunk) bool

Add adds a received trunk to chunks.

func (*Chunk) AddLocked

func (c *Chunk) AddLocked(chunk pb.Chunk) bool

func (*Chunk) Close

func (c *Chunk) Close()

Close closes the chunks instance.

func (*Chunk) ShouldValidate

func (c *Chunk) ShouldValidate(chunk pb.Chunk) bool

func (*Chunk) Tick

func (c *Chunk) Tick()

Tick moves the internal logical clock forward.

func (*Chunk) ToMessage

func (c *Chunk) ToMessage(chunk pb.Chunk,
	files []*pb.SnapshotFile) pb.MessageBatch

type FailedSend

type FailedSend uint64
const (
	Success FailedSend = iota
	CircuitBreakerNotReady
	UnknownTarget
	RateLimited
	ChanIsFull
)

type IResolver

type IResolver interface {
	Resolve(uint64, uint64) (string, string, error)
	Add(uint64, uint64, string)
}

IResolver converts the (cluster id, node id( tuple to network address.

type Metrics

type Metrics interface {
	MessageConnectionFailure()
	SnapshotCnnectionFailure()
	ReceivedMessages(ss uint64, msg uint64, dropped uint64)
	MessageSendSuccess(count uint64)
	MessageSendFailure(count uint64)
	SnapshotSendSuccess()
	SnapshotSendFailure()
}

type NOOPMetrics

type NOOPMetrics struct {
}

func (*NOOPMetrics) MessageConnectionFailure

func (n *NOOPMetrics) MessageConnectionFailure()

func (*NOOPMetrics) MessageSendFailure

func (n *NOOPMetrics) MessageSendFailure(count uint64)

func (*NOOPMetrics) MessageSendSuccess

func (n *NOOPMetrics) MessageSendSuccess(count uint64)

func (*NOOPMetrics) ReceivedMessages

func (n *NOOPMetrics) ReceivedMessages(ss uint64, msg uint64, dropped uint64)

func (*NOOPMetrics) SnapshotCnnectionFailure

func (n *NOOPMetrics) SnapshotCnnectionFailure()

func (*NOOPMetrics) SnapshotSendFailure

func (n *NOOPMetrics) SnapshotSendFailure()

func (*NOOPMetrics) SnapshotSendSuccess

func (n *NOOPMetrics) SnapshotSendSuccess()

type SendMessageBatchFunc

type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)

SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.

type Sink

type Sink struct {
	J *SnapshotJob
}

Sink is the chunk sink for receiving generated snapshot chunk.

func (*Sink) Close

func (s *Sink) Close() error

Close closes the sink processing.

func (*Sink) ClusterID

func (s *Sink) ClusterID() uint64

ClusterID returns the cluster ID of the source node.

func (*Sink) Receive

func (s *Sink) Receive(chunk pb.Chunk) (bool, bool)

Receive receives a snapshot chunk.

func (*Sink) ToNodeID

func (s *Sink) ToNodeID() uint64

ToNodeID returns the node ID of the node intended to get and handle the received snapshot chunk.

type SnapshotJob

type SnapshotJob struct {
	Conn raftio.ISnapshotConnection

	Stream chan pb.Chunk
	// contains filtered or unexported fields
}

func NewJob

func NewJob(ctx context.Context,
	clusterID uint64, nodeID uint64,
	did uint64, streaming bool, sz int, transport raftio.ITransport,
	stopc chan struct{}, fs vfs.FS) *SnapshotJob

func (*SnapshotJob) AddChunk

func (j *SnapshotJob) AddChunk(chunk pb.Chunk) (bool, bool)

func (*SnapshotJob) AddSnapshot

func (j *SnapshotJob) AddSnapshot(chunks []pb.Chunk)

func (*SnapshotJob) Close

func (j *SnapshotJob) Close()

func (*SnapshotJob) Connect

func (j *SnapshotJob) Connect(addr string) error

func (*SnapshotJob) Process

func (j *SnapshotJob) Process() error

type StreamChunkSendFunc

type StreamChunkSendFunc func(pb.Chunk) (pb.Chunk, bool)

StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.

type Transport

type Transport[T raftio.ITransport] struct {
	// contains filtered or unexported fields
}

Transport is the transport layer for delivering raft messages and snapshots.

func Factory

func Factory[T raftio.ITransport](cfg config.NodeHostConfig, resolver IResolver, handler pb.IMessageHandler, event pb.ITransportEvent, dir func(cid uint64, nid uint64) string, transportFactory func(requestHandler raftio.MessageHandler, chunkHandler raftio.ChunkHandler) T) (*Transport[T], error)

Factory creates a new Transport object.

func (*Transport[T]) Close

func (t *Transport[T]) Close() error

Close closes the Transport object.

func (*Transport[T]) GetCircuitBreaker

func (t *Transport[T]) GetCircuitBreaker(key string) *circuit.Breaker

GetCircuitBreaker returns the circuit breaker used for the specified target node.

func (*Transport[T]) GetDeploymentID

func (t *Transport[T]) GetDeploymentID() uint64

func (*Transport[T]) GetSnapshotDirFunc

func (t *Transport[T]) GetSnapshotDirFunc() func(clusterID uint64, nodeID uint64) string

func (*Transport[T]) GetStreamSink

func (t *Transport[T]) GetStreamSink(clusterID uint64, nodeID uint64) pb.IChunkSink

GetStreamSink returns a connection used for streaming snapshot.

func (*Transport[T]) GetTrans

func (t *Transport[T]) GetTrans() T

GetTrans returns the transport instance.

func (*Transport[T]) HandleRequest

func (t *Transport[T]) HandleRequest(req pb.MessageBatch)

func (*Transport[T]) JobsCount

func (t *Transport[T]) JobsCount() uint64

func (*Transport[T]) Name

func (t *Transport[T]) Name() string

Name returns the type name of the transport module.

func (*Transport[T]) QueueSize

func (t *Transport[T]) QueueSize() int

func (*Transport[T]) Send

func (t *Transport[T]) Send(req pb.Message) bool

Send asynchronously sends raft messages to their target nodes.

The generic async SendResult Go pattern used in Send() is found in CockroachDB's codebase.

func (*Transport[T]) SendResult

func (t *Transport[T]) SendResult(req pb.Message) (bool, FailedSend)

func (*Transport[T]) SendSnapshot

func (t *Transport[T]) SendSnapshot(m pb.Message) bool

SendSnapshot asynchronously sends raft snapshot message to its target.

func (*Transport[T]) SetPreSendBatchHook

func (t *Transport[T]) SetPreSendBatchHook(h SendMessageBatchFunc)

SetPreSendBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.

func (*Transport[T]) SetPreStreamChunkSendHook

func (t *Transport[T]) SetPreStreamChunkSendHook(h StreamChunkSendFunc)

SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.

func (*Transport[T]) SnapshotReceived

func (t *Transport[T]) SnapshotReceived(clusterID uint64,
	nodeID uint64, from uint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL