transport

package
v4.0.0-...-1d6e2d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package transport implements the transport component used for exchanging Raft messages between NodeHosts.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	// TCPTransportName is the name of the tcp transport module.
	TCPTransportName = "go-tcp-transport"
)

Variables

View Source
var (
	// ErrStopped is the error returned to indicate that the connection has
	// already been stopped.
	ErrStopped = errors.New("connection stopped")
	// ErrStreamSnapshot is the error returned to indicate that snapshot
	// streaming failed.
	ErrStreamSnapshot = errors.New("stream snapshot failed")
)
View Source
var (
	// NOOPRaftName is the module name for the NOOP transport module.
	NOOPRaftName = "noop-test-transport"
	// ErrRequestedToFail is the error used to indicate that the error is
	// requested.
	ErrRequestedToFail = errors.New("requested to returned error")
)
View Source
var (
	// ErrBadMessage is the error returned to indicate the incoming message is
	// corrupted.
	ErrBadMessage = errors.New("invalid message")
)
View Source
var (
	// ErrSnapshotOutOfDate is returned when the snapshot being received is
	// considered as out of date.
	ErrSnapshotOutOfDate = errors.New("snapshot is out of date")
)

Functions

func NewNOOPTransport

func NewNOOPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.MessageHandler,
	chunkHandler raftio.ChunkHandler) raftio.ITransport

NewNOOPTransport creates a new NOOPTransport instance.

func NewTCPTransport

func NewTCPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.MessageHandler,
	chunkHandler raftio.ChunkHandler) raftio.ITransport

NewTCPTransport creates and returns a new TCP transport module.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Chunk managed on the receiving side

func NewChunk

func NewChunk(onReceive func(pb.MessageBatch),
	confirm func(uint64, uint64, uint64), dir server.SnapshotDirFunc,
	did uint64, fs vfs.IFS) *Chunk

NewChunk creates and returns a new snapshot chunks instance.

func (*Chunk) Add

func (c *Chunk) Add(chunk pb.Chunk) bool

Add adds a received trunk to chunks.

func (*Chunk) Close

func (c *Chunk) Close()

Close closes the chunks instance.

func (*Chunk) Tick

func (c *Chunk) Tick()

Tick moves the internal logical clock forward.

type DefaultTransportFactory

type DefaultTransportFactory struct{}

DefaultTransportFactory is the default transport module used.

func (*DefaultTransportFactory) Create

Create creates a default transport instance.

func (*DefaultTransportFactory) Validate

func (dtm *DefaultTransportFactory) Validate(addr string) bool

Validate returns a boolean value indicating whether the specified address is valid.

type IMessageHandler

type IMessageHandler interface {
	HandleMessageBatch(batch pb.MessageBatch) (uint64, uint64)
	HandleUnreachable(shardID uint64, replicaID uint64)
	HandleSnapshotStatus(shardID uint64, replicaID uint64, rejected bool)
	HandleSnapshot(shardID uint64, replicaID uint64, from uint64)
}

IMessageHandler is the interface required to handle incoming raft requests.

type ITransport

type ITransport interface {
	Name() string
	Send(pb.Message) bool
	SendSnapshot(pb.Message) bool
	GetStreamSink(shardID uint64, replicaID uint64) *Sink
	Close() error
}

ITransport is the interface of the transport layer used for exchanging Raft messages.

type ITransportEvent

type ITransportEvent interface {
	ConnectionEstablished(string, bool)
	ConnectionFailed(string, bool)
}

ITransportEvent is the interface for notifying connection status changes.

type NOOPConnection

type NOOPConnection struct {
	// contains filtered or unexported fields
}

NOOPConnection is the connection used to exchange messages between node hosts.

func (*NOOPConnection) Close

func (c *NOOPConnection) Close()

Close closes the NOOPConnection instance.

func (*NOOPConnection) SendMessageBatch

func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error

SendMessageBatch return ErrRequestedToFail when requested.

type NOOPSnapshotConnection

type NOOPSnapshotConnection struct {
	// contains filtered or unexported fields
}

NOOPSnapshotConnection is the connection used to send snapshots.

func (*NOOPSnapshotConnection) Close

func (c *NOOPSnapshotConnection) Close()

Close closes the NOOPSnapshotConnection.

func (*NOOPSnapshotConnection) SendChunk

func (c *NOOPSnapshotConnection) SendChunk(chunk raftpb.Chunk) error

SendChunk returns ErrRequestedToFail when requested.

type NOOPTransport

type NOOPTransport struct {
	// contains filtered or unexported fields
}

NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.

func (*NOOPTransport) Close

func (g *NOOPTransport) Close() error

Close closes the NOOPTransport instance.

func (*NOOPTransport) GetConnection

func (g *NOOPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a connection.

func (*NOOPTransport) GetSnapshotConnection

func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a snapshot connection.

func (*NOOPTransport) Name

func (g *NOOPTransport) Name() string

Name returns the module name.

func (*NOOPTransport) Start

func (g *NOOPTransport) Start() error

Start starts the NOOPTransport instance.

type NOOPTransportFactory

type NOOPTransportFactory struct{}

NOOPTransportFactory is a NOOP transport module used in testing

func (*NOOPTransportFactory) Create

Create creates a noop transport instance.

func (*NOOPTransportFactory) Validate

func (n *NOOPTransportFactory) Validate(addr string) bool

Validate returns a boolean value indicating whether the input address is valid.

type SendMessageBatchFunc

type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)

SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink is the chunk sink for receiving generated snapshot chunk.

func (*Sink) Close

func (s *Sink) Close() error

Close closes the sink processing.

func (*Sink) Receive

func (s *Sink) Receive(chunk pb.Chunk) (bool, bool)

Receive receives a snapshot chunk.

func (*Sink) ShardID

func (s *Sink) ShardID() uint64

ShardID returns the shard ID of the source node.

func (*Sink) ToReplicaID

func (s *Sink) ToReplicaID() uint64

ToReplicaID returns the node ID of the node intended to get and handle the received snapshot chunk.

type StreamChunkSendFunc

type StreamChunkSendFunc func(pb.Chunk) (pb.Chunk, bool)

StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.

type TCP

type TCP struct {
	// contains filtered or unexported fields
}

TCP is a TCP based transport module for exchanging raft messages and snapshots between NodeHost instances.

func (*TCP) Close

func (t *TCP) Close() error

Close closes the TCP transport module.

func (*TCP) GetConnection

func (t *TCP) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a new raftio.IConnection for sending raft messages.

func (*TCP) GetSnapshotConnection

func (t *TCP) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.

func (*TCP) Name

func (t *TCP) Name() string

Name returns a human readable name of the TCP transport module.

func (*TCP) Start

func (t *TCP) Start() error

Start starts the TCP transport module.

type TCPConnection

type TCPConnection struct {
	// contains filtered or unexported fields
}

TCPConnection is the connection used for sending raft messages to remote nodes.

func NewTCPConnection

func NewTCPConnection(conn net.Conn, encrypted bool) *TCPConnection

NewTCPConnection creates and returns a new TCPConnection instance.

func (*TCPConnection) Close

func (c *TCPConnection) Close()

Close closes the TCPConnection instance.

func (*TCPConnection) SendMessageBatch

func (c *TCPConnection) SendMessageBatch(batch pb.MessageBatch) error

SendMessageBatch sends a raft message batch to remote node.

type TCPSnapshotConnection

type TCPSnapshotConnection struct {
	// contains filtered or unexported fields
}

TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.

func NewTCPSnapshotConnection

func NewTCPSnapshotConnection(conn net.Conn,
	encrypted bool) *TCPSnapshotConnection

NewTCPSnapshotConnection creates and returns a new snapshot connection.

func (*TCPSnapshotConnection) Close

func (c *TCPSnapshotConnection) Close()

Close closes the snapshot connection.

func (*TCPSnapshotConnection) SendChunk

func (c *TCPSnapshotConnection) SendChunk(chunk pb.Chunk) error

SendChunk sends the specified snapshot chunk to remote node.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the transport layer for delivering raft messages and snapshots.

func NewTransport

func NewTransport(nhConfig config.NodeHostConfig,
	handler IMessageHandler, env *server.Env, resolver registry.IResolver,
	dir server.SnapshotDirFunc, sysEvents ITransportEvent,
	fs vfs.IFS) (*Transport, error)

NewTransport creates a new Transport object.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the Transport object.

func (*Transport) GetCircuitBreaker

func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker

GetCircuitBreaker returns the circuit breaker used for the specified target node.

func (*Transport) GetStreamSink

func (t *Transport) GetStreamSink(shardID uint64, replicaID uint64) *Sink

GetStreamSink returns a connection used for streaming snapshot.

func (*Transport) GetTrans

func (t *Transport) GetTrans() raftio.ITransport

GetTrans returns the transport instance.

func (*Transport) Name

func (t *Transport) Name() string

Name returns the type name of the transport module

func (*Transport) Send

func (t *Transport) Send(req pb.Message) bool

Send asynchronously sends raft messages to their target nodes.

The generic async send Go pattern used in Send() is found in CockroachDB's codebase.

func (*Transport) SendSnapshot

func (t *Transport) SendSnapshot(m pb.Message) bool

SendSnapshot asynchronously sends raft snapshot message to its target.

func (*Transport) SetPreSendBatchHook

func (t *Transport) SetPreSendBatchHook(h SendMessageBatchFunc)

SetPreSendBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.

func (*Transport) SetPreStreamChunkSendHook

func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)

SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL