transport

package
v2.1.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2019 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package transport implements the transport component used for exchanging Raft messages between NodeHosts.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	// TCPRaftRPCName is the name of the tcp RPC module.
	TCPRaftRPCName = "go-tcp-transport"
)
View Source
const (

	// UnmanagedDeploymentID is the special DeploymentID used when the system is
	// not managed by master servers.
	UnmanagedDeploymentID = uint64(1)
)

Variables

View Source
var (
	// NOOPRaftName is the module name for the NOOP transport module.
	NOOPRaftName = "noop-test-transport"
	// ErrRequestedToFail is the error used to indicate that the error is
	// requested.
	ErrRequestedToFail = errors.New("requested to returned error")
)
View Source
var (
	// ErrBadMessage is the error returned to indicate the incoming message is
	// corrupted.
	ErrBadMessage = errors.New("invalid message")
)
View Source
var (
	// ErrSnapshotOutOfDate is returned when the snapshot being received is
	// considered as out of date.
	ErrSnapshotOutOfDate = errors.New("snapshot is out of date")
)

Functions

func NewNOOPTransport

func NewNOOPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.RequestHandler,
	sinkFactory raftio.ChunkSinkFactory) raftio.IRaftRPC

NewNOOPTransport creates a new NOOPTransport instance.

func NewTCPTransport

func NewTCPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.RequestHandler,
	sinkFactory raftio.ChunkSinkFactory) raftio.IRaftRPC

NewTCPTransport creates and returns a new TCP transport module.

Types

type DeploymentID

type DeploymentID struct {
	// contains filtered or unexported fields
}

DeploymentID struct is the manager type used to manage the deployment id value.

func (*DeploymentID) SetDeploymentID

func (d *DeploymentID) SetDeploymentID(x uint64)

SetDeploymentID sets the deployment id to the specified value.

func (*DeploymentID) SetUnmanagedDeploymentID

func (d *DeploymentID) SetUnmanagedDeploymentID()

SetUnmanagedDeploymentID sets the deployment id to indicate that the user is not managed.

type INodeAddressResolver

type INodeAddressResolver interface {
	Resolve(uint64, uint64) (string, string, error)
	ReverseResolve(string) []raftio.NodeInfo
	AddRemoteAddress(uint64, uint64, string)
}

INodeAddressResolver converts the (cluster id, node id( tuple to network address

type INodeRegistry

type INodeRegistry interface {
	AddNode(clusterID uint64, nodeID uint64, url string)
	RemoveNode(clusterID uint64, nodeID uint64)
	RemoveCluster(clusterID uint64)
	Resolve(clusterID uint64, nodeID uint64) (string, string, error)
}

INodeRegistry is the local registry interface used to keep all known nodes in the multi raft system.

type IRaftMessageHandler

type IRaftMessageHandler interface {
	HandleMessageBatch(batch pb.MessageBatch)
	HandleUnreachable(clusterID uint64, nodeID uint64)
	HandleSnapshotStatus(clusterID uint64, nodeID uint64, rejected bool)
	HandleSnapshot(clusterID uint64, nodeID uint64, from uint64)
}

IRaftMessageHandler is the interface required to handle incoming raft requests.

type ITransport

type ITransport interface {
	Name() string
	SetUnmanagedDeploymentID()
	SetDeploymentID(uint64)
	SetMessageHandler(IRaftMessageHandler)
	RemoveMessageHandler()
	ASyncSend(pb.Message) bool
	ASyncSendSnapshot(pb.Message) bool
	Stop()
}

ITransport is the interface of the transport layer used for exchanging Raft messages.

type Marshaler

type Marshaler interface {
	MarshalTo([]byte) (int, error)
	Size() int
}

Marshaler is the interface for types that can be Marshaled.

type NOOPConnection

type NOOPConnection struct {
	// contains filtered or unexported fields
}

NOOPConnection is the connection used to exchange messages between node hosts.

func (*NOOPConnection) Close

func (c *NOOPConnection) Close()

Close closes the NOOPConnection instance.

func (*NOOPConnection) SendMessageBatch

func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error

SendMessageBatch return ErrRequestedToFail when requested.

type NOOPSnapshotConnection

type NOOPSnapshotConnection struct {
	// contains filtered or unexported fields
}

NOOPSnapshotConnection is the connection used to send snapshots.

func (*NOOPSnapshotConnection) Close

func (c *NOOPSnapshotConnection) Close()

Close closes the NOOPSnapshotConnection.

func (*NOOPSnapshotConnection) SendSnapshotChunk

func (c *NOOPSnapshotConnection) SendSnapshotChunk(chunk raftpb.SnapshotChunk) error

SendSnapshotChunk returns ErrRequestedToFail when requested.

type NOOPTransport

type NOOPTransport struct {
	// contains filtered or unexported fields
}

NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.

func (*NOOPTransport) GetConnection

func (g *NOOPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a connection.

func (*NOOPTransport) GetSnapshotConnection

func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a snapshot connection.

func (*NOOPTransport) Name

func (g *NOOPTransport) Name() string

Name returns the module name.

func (*NOOPTransport) Start

func (g *NOOPTransport) Start() error

Start starts the NOOPTransport instance.

func (*NOOPTransport) Stop

func (g *NOOPTransport) Stop()

Stop stops the NOOPTransport instance.

type Nodes

type Nodes struct {
	// contains filtered or unexported fields
}

Nodes is used to manage all known node addresses in the multi raft system. The transport layer uses this address registry to locate nodes.

func NewNodes

func NewNodes(streamConnections uint64) *Nodes

NewNodes returns a new Nodes object.

func (*Nodes) AddNode

func (n *Nodes) AddNode(clusterID uint64, nodeID uint64, url string)

AddNode add a new node.

func (*Nodes) AddRemoteAddress

func (n *Nodes) AddRemoteAddress(clusterID uint64,
	nodeID uint64, address string)

AddRemoteAddress remembers the specified address obtained from the source of the incoming message.

func (*Nodes) RemoveAllPeers

func (n *Nodes) RemoveAllPeers()

RemoveAllPeers removes all remotes.

func (*Nodes) RemoveCluster

func (n *Nodes) RemoveCluster(clusterID uint64)

RemoveCluster removes all nodes info associated with the specified cluster

func (*Nodes) RemoveNode

func (n *Nodes) RemoveNode(clusterID uint64, nodeID uint64)

RemoveNode removes a remote from the node registry.

func (*Nodes) Resolve

func (n *Nodes) Resolve(clusterID uint64, nodeID uint64) (string, string, error)

Resolve looks up the Addr of the specified node.

func (*Nodes) ReverseResolve

func (n *Nodes) ReverseResolve(addr string) []raftio.NodeInfo

ReverseResolve does the reverse lookup for the specified address. A list of node raftio.NodeInfos are returned for nodes that match the specified address

type SendMessageBatchFunc

type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)

SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.

type StreamChunkSendFunc

type StreamChunkSendFunc func(pb.SnapshotChunk) (pb.SnapshotChunk, bool)

StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.

type TCPConnection

type TCPConnection struct {
	// contains filtered or unexported fields
}

TCPConnection is the connection used for sending raft messages to remote nodes.

func NewTCPConnection

func NewTCPConnection(conn net.Conn) *TCPConnection

NewTCPConnection creates and returns a new TCPConnection instance.

func (*TCPConnection) Close

func (c *TCPConnection) Close()

Close closes the TCPConnection instance.

func (*TCPConnection) SendMessageBatch

func (c *TCPConnection) SendMessageBatch(batch raftpb.MessageBatch) error

SendMessageBatch sends a raft message batch to remote node.

type TCPSnapshotConnection

type TCPSnapshotConnection struct {
	// contains filtered or unexported fields
}

TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.

func NewTCPSnapshotConnection

func NewTCPSnapshotConnection(conn net.Conn) *TCPSnapshotConnection

NewTCPSnapshotConnection creates and returns a new snapshot connection.

func (*TCPSnapshotConnection) Close

func (c *TCPSnapshotConnection) Close()

Close closes the snapshot connection.

func (*TCPSnapshotConnection) SendSnapshotChunk

func (c *TCPSnapshotConnection) SendSnapshotChunk(chunk raftpb.SnapshotChunk) error

SendSnapshotChunk sends the specified snapshot chunk to remote node.

type TCPTransport

type TCPTransport struct {
	// contains filtered or unexported fields
}

TCPTransport is a TCP based RPC module for exchanging raft messages and snapshots between NodeHost instances.

func (*TCPTransport) GetConnection

func (g *TCPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a new raftio.IConnection for sending raft messages.

func (*TCPTransport) GetSnapshotConnection

func (g *TCPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.

func (*TCPTransport) Name

func (g *TCPTransport) Name() string

Name returns a human readable name of the TCP transport module.

func (*TCPTransport) Start

func (g *TCPTransport) Start() error

Start starts the TCP transport module.

func (*TCPTransport) Stop

func (g *TCPTransport) Stop()

Stop stops the TCP transport module.

type Transport

type Transport struct {
	DeploymentID
	// contains filtered or unexported fields
}

Transport is the transport layer for delivering raft messages and snapshots.

func NewTransport

func NewTransport(nhConfig config.NodeHostConfig, ctx *server.Context,
	resolver INodeAddressResolver, locator server.GetSnapshotDirFunc) *Transport

NewTransport creates a new Transport object.

func (*Transport) ASyncSend

func (t *Transport) ASyncSend(req pb.Message) bool

ASyncSend sends raft messages using RPC

The generic async send Go pattern used in ASyncSend is found in CockroachDB's codebase.

func (*Transport) ASyncSendSnapshot

func (t *Transport) ASyncSendSnapshot(m pb.Message) bool

ASyncSendSnapshot sends raft snapshot message to its target.

func (*Transport) GetCircuitBreaker

func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker

GetCircuitBreaker returns the circuit breaker used for the specified target node.

func (*Transport) GetRaftRPC

func (t *Transport) GetRaftRPC() raftio.IRaftRPC

GetRaftRPC returns the raft RPC instance.

func (*Transport) Name

func (t *Transport) Name() string

Name returns the type name of the transport module

func (*Transport) RemoveMessageHandler

func (t *Transport) RemoveMessageHandler()

RemoveMessageHandler removes the raft message handler.

func (*Transport) SetMessageHandler

func (t *Transport) SetMessageHandler(handler IRaftMessageHandler)

SetMessageHandler sets the raft message handler.

func (*Transport) SetPreSendMessageBatchHook

func (t *Transport) SetPreSendMessageBatchHook(h SendMessageBatchFunc)

SetPreSendMessageBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.

func (*Transport) SetPreStreamChunkSendHook

func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)

SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.

func (*Transport) Stop

func (t *Transport) Stop()

Stop stops the Transport object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL