p2p

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2024 License: Apache-2.0 Imports: 44 Imported by: 0

README

p2p

The p2p package provides an abstraction around peer-to-peer communication.

Docs:

  • Connection for details on how connections and multiplexing work
  • Peer for details on peer ID, handshakes, and peer exchange
  • Node for details about different types of nodes and how they should work
  • Pex for details on peer discovery and exchange
  • Config for details on some config option

Documentation

Index

Constants

View Source
const (
	//
	// Consensus channels
	//
	ConsensusStateChannel = ChannelID(0x20)
	ConsensusDataChannel  = ChannelID(0x21)
	ConsensusVoteChannel  = ChannelID(0x22)
	VoteSetBitsChannel    = ChannelID(0x23)

	ErrorChannel = ChannelID(0x10)
	// BlockSyncChannel is a channelStore for blocks and status updates
	BlockSyncChannel = ChannelID(0x40)
	// SnapshotChannel exchanges snapshot metadata
	SnapshotChannel = ChannelID(0x60)
	// ChunkChannel exchanges chunk contents
	ChunkChannel = ChannelID(0x61)
	// LightBlockChannel exchanges light blocks
	LightBlockChannel = ChannelID(0x62)
	// ParamsChannel exchanges consensus params
	ParamsChannel = ChannelID(0x63)

	MempoolChannel = ChannelID(0x30)
)
View Source
const (
	// MetricsSubsystem is a subsystem shared by all metrics exposed by this
	// package.
	MetricsSubsystem = "p2p"
)

Variables

View Source
var (
	ErrRecvRateLimitExceeded = errors.New("receive rate limit exceeded")
)

Functions

func ChannelDescriptors

func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor

ChannelDescriptors returns a map of all supported descriptors

func ConsensusChannelDescriptors added in v1.0.0

func ConsensusChannelDescriptors() map[ChannelID]*ChannelDescriptor

GetChannelDescriptor produces an instance of a descriptor for this package's required channels.

func SetProTxHashToPeerInfo

func SetProTxHashToPeerInfo(proTxHash types.ProTxHash) func(info *peerInfo)

SetProTxHashToPeerInfo sets a proTxHash in peerInfo.proTxHash to keep this value in a store

func StatesyncChannelDescriptors added in v1.0.0

func StatesyncChannelDescriptors() map[ChannelID]*ChannelDescriptor

ChannelDescriptors returns a map of all supported descriptors

Types

type Channel

type Channel interface {
	fmt.Stringer

	Err() error

	Send(context.Context, Envelope) error
	SendError(context.Context, PeerError) error
	Receive(context.Context) ChannelIterator
}

func NewChannel

func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel

NewChannel creates a new channel. It is primarily for internal and test use, reactors should use Router.OpenChannel().

type ChannelCreator

type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error)

ChannelCreator allows routers to construct their own channels, either by receiving a reference to Router.OpenChannel or using some kind shim for testing purposes.

type ChannelDescriptor

type ChannelDescriptor = conn.ChannelDescriptor

type ChannelID

type ChannelID = conn.ChannelID

func ResolveChannelID

func ResolveChannelID(msg proto.Message) ChannelID

ResolveChannelID returns channel ID according to message type currently only is supported blocksync channelID, the remaining channelIDs should be added as it will be necessary

type ChannelIDSet

type ChannelIDSet map[ChannelID]struct{}

func (ChannelIDSet) Contains

func (cs ChannelIDSet) Contains(id ChannelID) bool

type ChannelIterator

type ChannelIterator interface {
	// Next returns true when the Envelope value has advanced, and false
	// when the context is canceled or iteration should stop. If an iterator has returned false,
	// it will never return true again.
	// in general, use Next, as in:
	//
	//	for iter.Next(ctx) {
	//	     envelope := iter.Envelope()
	//	     // ... do things ...
	//	}
	Next(ctx context.Context) bool
	Envelope() *Envelope
}

ChannelIterator is an iterator for receiving messages from a Channel.

func MergedChannelIterator

func MergedChannelIterator(ctx context.Context, chs ...Channel) ChannelIterator

MergedChannelIterator produces an iterator that merges the messages from the given channels in arbitrary order.

This allows the caller to consume messages from multiple channels without needing to manage the concurrency separately.

func NewChannelIterator

func NewChannelIterator(pipe chan Envelope) ChannelIterator

NewChannelIterator returns a new instance of channelIterator

func ThrottledChannelIterator added in v1.0.0

func ThrottledChannelIterator(_ context.Context, limiter *rate.Limiter, innerIterator ChannelIterator,
	reportError bool, innerChannel Channel, logger log.Logger) (ChannelIterator, error)

ThrottledChannelIterator wraps an existing channel iterator with a rate limiter.

## Arguments - ctx: the context in which the iterator will run - limiter: the rate limiter to use - innerIterator: the underlying iterator to use - reportError: if true, errors will be sent to the channel whenever the rate limit is exceeded; otherwise the messages will be dropped without error - innerChannel: the channel related; errors will be sent to this channel, also used for logging - logger: the logger to use

type Connection

type Connection interface {
	// Handshake executes a node handshake with the remote peer. It must be
	// called immediately after the connection is established, and returns the
	// remote peer's node info and public key. The caller is responsible for
	// validation.
	//
	// FIXME: The handshake should really be the Router's responsibility, but
	// that requires the connection interface to be byte-oriented rather than
	// message-oriented (see comment above).
	Handshake(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error)

	// ReceiveMessage returns the next message received on the connection,
	// blocking until one is available. Returns io.EOF if closed.
	ReceiveMessage(context.Context) (ChannelID, []byte, error)

	// SendMessage sends a message on the connection. Returns io.EOF if closed.
	SendMessage(context.Context, ChannelID, []byte) error

	// LocalEndpoint returns the local endpoint for the connection.
	LocalEndpoint() Endpoint

	// RemoteEndpoint returns the remote endpoint for the connection.
	RemoteEndpoint() Endpoint

	// Close closes the connection.
	Close() error

	// Stringer is used to display the connection, e.g. in logs.
	//
	// Without this, the logger may use reflection to access and display
	// internal fields. These can be written to concurrently, which can trigger
	// the race detector or even cause a panic.
	fmt.Stringer
}

Connection represents an established connection between two endpoints.

FIXME: This is a temporary interface for backwards-compatibility with the current MConnection-protocol, which is message-oriented. It should be migrated to a byte-oriented multi-stream interface instead, which would allow e.g. adopting QUIC and making message framing, traffic scheduling, and node handshakes a Router concern shared across all transports. However, this requires MConnection protocol changes or a shim. For details, see: https://github.com/tendermint/spec/pull/227

FIXME: The interface is currently very broad in order to accommodate MConnection behavior that the legacy P2P stack relies on. It should be cleaned up when the legacy stack is removed.

type DashDialer

type DashDialer interface {
	NodeIDResolver
	// ConnectAsync schedules asynchronous job to establish connection with provided node.
	ConnectAsync(NodeAddress) error
	// IsDialingOrConnected determines whether node with provided node ID is already connected,
	// or there is a pending connection attempt.
	IsDialingOrConnected(types.NodeID) bool
	// DisconnectAsync schedules asynchronous job to disconnect from the provided node.
	DisconnectAsync(types.NodeID) error
}

DashDialer defines a service that can be used to establish and manage peer connections

func NewRouterDashDialer

func NewRouterDashDialer(peerManager *PeerManager, logger log.Logger) DashDialer

type Endpoint

type Endpoint struct {
	// Protocol specifies the transport protocol.
	Protocol Protocol

	// IP is an IP address (v4 or v6) to connect to. If set, this defines the
	// endpoint as a networked endpoint.
	IP net.IP

	// Port is a network port (either TCP or UDP). If 0, a default port may be
	// used depending on the protocol.
	Port uint16

	// Path is an optional transport-specific path or identifier.
	Path string
}

Endpoint represents a transport connection endpoint, either local or remote.

Endpoints are not necessarily networked (see e.g. MemoryTransport) but all networked endpoints must use IP as the underlying transport protocol to allow e.g. IP address filtering. Either IP or Path (or both) must be set.

func NewEndpoint

func NewEndpoint(addr string) (*Endpoint, error)

NewEndpoint constructs an Endpoint from a types.NetAddress structure.

func (Endpoint) NodeAddress

func (e Endpoint) NodeAddress(nodeID types.NodeID) NodeAddress

NodeAddress converts the endpoint into a NodeAddress for the given node ID.

func (Endpoint) String

func (e Endpoint) String() string

String formats the endpoint as a URL string.

func (Endpoint) Validate

func (e Endpoint) Validate() error

Validate validates the endpoint.

type Envelope

type Envelope struct {
	From       types.NodeID  // sender (empty if outbound)
	To         types.NodeID  // receiver (empty if inbound)
	Broadcast  bool          // send to all connected peers (ignores To)
	Message    proto.Message // message payload
	ChannelID  ChannelID
	Attributes map[string]string
}

Envelope contains a message with sender/receiver routing info.

func EnvelopeFromProto

func EnvelopeFromProto(proto p2p.Envelope) (Envelope, error)

EnvelopeFromProto creates a domain Envelope from p2p representation

func (*Envelope) AddAttribute

func (e *Envelope) AddAttribute(key, val string)

AddAttribute adds an attribute to a attributes bag

func (*Envelope) IsZero

func (e *Envelope) IsZero() bool

func (*Envelope) MarshalZerologObject

func (e *Envelope) MarshalZerologObject(event *zerolog.Event)

func (*Envelope) ToProto

func (e *Envelope) ToProto() (*p2p.Envelope, error)

ToProto converts domain Envelope into p2p representation

type ErrCurrentlyDialingOrExistingAddress

type ErrCurrentlyDialingOrExistingAddress struct {
	Addr string
}

ErrCurrentlyDialingOrExistingAddress indicates that we're currently dialing this address or it belongs to an existing peer.

func (ErrCurrentlyDialingOrExistingAddress) Error

type ErrFilterTimeout

type ErrFilterTimeout struct{}

ErrFilterTimeout indicates that a filter operation timed out.

func (ErrFilterTimeout) Error

func (e ErrFilterTimeout) Error() string

type ErrNetAddressInvalid

type ErrNetAddressInvalid struct {
	Addr string
	Err  error
}

func (ErrNetAddressInvalid) Error

func (e ErrNetAddressInvalid) Error() string

type ErrNetAddressNoID

type ErrNetAddressNoID struct {
	Addr string
}

func (ErrNetAddressNoID) Error

func (e ErrNetAddressNoID) Error() string

type ErrRejected

type ErrRejected struct {
	// contains filtered or unexported fields
}

ErrRejected indicates that a Peer was rejected carrying additional information as to the reason.

func (ErrRejected) Addr

func (e ErrRejected) Addr() NodeAddress

Addr returns the NetAddress for the rejected Peer.

func (ErrRejected) Error

func (e ErrRejected) Error() string

func (ErrRejected) IsAuthFailure

func (e ErrRejected) IsAuthFailure() bool

IsAuthFailure when Peer authentication was unsuccessful.

func (ErrRejected) IsDuplicate

func (e ErrRejected) IsDuplicate() bool

IsDuplicate when Peer ID or IP are present already.

func (ErrRejected) IsFiltered

func (e ErrRejected) IsFiltered() bool

IsFiltered when Peer ID or IP was filtered.

func (ErrRejected) IsIncompatible

func (e ErrRejected) IsIncompatible() bool

IsIncompatible when Peer NodeInfo is not compatible with our own.

func (ErrRejected) IsNodeInfoInvalid

func (e ErrRejected) IsNodeInfoInvalid() bool

IsNodeInfoInvalid when the sent NodeInfo is not valid.

func (ErrRejected) IsSelf

func (e ErrRejected) IsSelf() bool

IsSelf when Peer is our own node.

type ErrSwitchAuthenticationFailure

type ErrSwitchAuthenticationFailure struct {
	Dialed *NodeAddress
	Got    types.NodeID
}

func (ErrSwitchAuthenticationFailure) Error

type ErrSwitchConnectToSelf

type ErrSwitchConnectToSelf struct {
	Addr *NodeAddress
}

ErrSwitchConnectToSelf to be raised when trying to connect to itself.

func (ErrSwitchConnectToSelf) Error

func (e ErrSwitchConnectToSelf) Error() string

type ErrSwitchDuplicatePeerID

type ErrSwitchDuplicatePeerID struct {
	ID types.NodeID
}

ErrSwitchDuplicatePeerID to be raised when a peer is connecting with a known ID.

func (ErrSwitchDuplicatePeerID) Error

func (e ErrSwitchDuplicatePeerID) Error() string

type ErrSwitchDuplicatePeerIP

type ErrSwitchDuplicatePeerIP struct {
	IP net.IP
}

ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known IP.

func (ErrSwitchDuplicatePeerIP) Error

func (e ErrSwitchDuplicatePeerIP) Error() string

type ErrTransportClosed

type ErrTransportClosed struct{}

ErrTransportClosed is raised when the Transport has been closed.

func (ErrTransportClosed) Error

func (e ErrTransportClosed) Error() string

type MConnTransport

type MConnTransport struct {
	// contains filtered or unexported fields
}

MConnTransport is a Transport implementation using the current multiplexed Tendermint protocol ("MConn").

func NewMConnTransport

func NewMConnTransport(
	logger log.Logger,
	mConnConfig conn.MConnConfig,
	channelDescs []*ChannelDescriptor,
	options MConnTransportOptions,
) *MConnTransport

NewMConnTransport sets up a new MConnection transport. This uses the proprietary Tendermint MConnection protocol, which is implemented as conn.MConnection.

func (*MConnTransport) Accept

func (m *MConnTransport) Accept(ctx context.Context) (Connection, error)

Accept implements Transport.

func (*MConnTransport) AddChannelDescriptors

func (m *MConnTransport) AddChannelDescriptors(channelDesc []*ChannelDescriptor)

SetChannels sets the channel descriptors to be used when establishing a connection.

FIXME: To be removed when the legacy p2p stack is removed. Channel descriptors should be managed by the router. The underlying transport and connections should be agnostic to everything but the channel ID's which are initialized in the handshake.

func (*MConnTransport) Close

func (m *MConnTransport) Close() error

Close implements Transport.

func (*MConnTransport) Dial

func (m *MConnTransport) Dial(ctx context.Context, endpoint *Endpoint) (Connection, error)

Dial implements Transport.

func (*MConnTransport) Endpoint

func (m *MConnTransport) Endpoint() (*Endpoint, error)

Endpoint implements Transport.

func (*MConnTransport) Listen

func (m *MConnTransport) Listen(endpoint *Endpoint) error

Listen asynchronously listens for inbound connections on the given endpoint. It must be called exactly once before calling Accept(), and the caller must call Close() to shut down the listener.

FIXME: Listen currently only supports listening on a single endpoint, it might be useful to support listening on multiple addresses (e.g. IPv4 and IPv6, or a private and public address) via multiple Listen() calls.

func (*MConnTransport) Protocols

func (m *MConnTransport) Protocols() []Protocol

Protocols implements Transport. We support tcp for backwards-compatibility.

func (*MConnTransport) String

func (m *MConnTransport) String() string

String implements Transport.

type MConnTransportOptions

type MConnTransportOptions struct {
	// MaxAcceptedConnections is the maximum number of simultaneous accepted
	// (incoming) connections. Beyond this, new connections will block until
	// a slot is free. 0 means unlimited.
	//
	// FIXME: We may want to replace this with connection accounting in the
	// Router, since it will need to do e.g. rate limiting and such as well.
	// But it might also make sense to have per-transport limits.
	MaxAcceptedConnections uint32
}

MConnTransportOptions sets options for MConnTransport.

type MemoryConnection

type MemoryConnection struct {
	// contains filtered or unexported fields
}

MemoryConnection is an in-memory connection between two transport endpoints.

func (*MemoryConnection) Close

func (c *MemoryConnection) Close() error

Close implements Connection.

func (*MemoryConnection) Handshake

func (c *MemoryConnection) Handshake(
	ctx context.Context,
	timeout time.Duration,
	nodeInfo types.NodeInfo,
	privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error)

Handshake implements Connection.

func (*MemoryConnection) LocalEndpoint

func (c *MemoryConnection) LocalEndpoint() Endpoint

LocalEndpoint implements Connection.

func (*MemoryConnection) ReceiveMessage

func (c *MemoryConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byte, error)

ReceiveMessage implements Connection.

func (*MemoryConnection) RemoteEndpoint

func (c *MemoryConnection) RemoteEndpoint() Endpoint

RemoteEndpoint implements Connection.

func (*MemoryConnection) SendMessage

func (c *MemoryConnection) SendMessage(ctx context.Context, chID ChannelID, msg []byte) error

SendMessage implements Connection.

func (*MemoryConnection) String

func (c *MemoryConnection) String() string

String implements Connection.

type MemoryNetwork

type MemoryNetwork struct {
	// contains filtered or unexported fields
}

MemoryNetwork is an in-memory "network" that uses buffered Go channels to communicate between endpoints. It is primarily meant for testing.

Network endpoints are allocated via CreateTransport(), which takes a node ID, and the endpoint is then immediately accessible via the URL "memory:<nodeID>".

func NewMemoryNetwork

func NewMemoryNetwork(logger log.Logger, bufferSize int) *MemoryNetwork

NewMemoryNetwork creates a new in-memory network.

func (*MemoryNetwork) CreateTransport

func (n *MemoryNetwork) CreateTransport(nodeID types.NodeID) *MemoryTransport

CreateTransport creates a new memory transport endpoint with the given node ID and immediately begins listening on the address "memory:<id>". It panics if the node ID is already in use (which is fine, since this is for tests).

func (*MemoryNetwork) GetTransport

func (n *MemoryNetwork) GetTransport(id types.NodeID) *MemoryTransport

GetTransport looks up a transport in the network, returning nil if not found.

func (*MemoryNetwork) RemoveTransport

func (n *MemoryNetwork) RemoveTransport(id types.NodeID)

RemoveTransport removes a transport from the network and closes it.

func (*MemoryNetwork) Size

func (n *MemoryNetwork) Size() int

Size returns the number of transports in the network.

type MemoryTransport

type MemoryTransport struct {
	// contains filtered or unexported fields
}

MemoryTransport is an in-memory transport that uses buffered Go channels to communicate between endpoints. It is primarily meant for testing.

New transports are allocated with MemoryNetwork.CreateTransport(). To contact a different endpoint, both transports must be in the same MemoryNetwork.

func (*MemoryTransport) Accept

func (t *MemoryTransport) Accept(ctx context.Context) (Connection, error)

Accept implements Transport.

func (*MemoryTransport) AddChannelDescriptors

func (t *MemoryTransport) AddChannelDescriptors([]*ChannelDescriptor)

func (*MemoryTransport) Close

func (t *MemoryTransport) Close() error

Close implements Transport.

func (*MemoryTransport) Dial

func (t *MemoryTransport) Dial(ctx context.Context, endpoint *Endpoint) (Connection, error)

Dial implements Transport.

func (*MemoryTransport) Endpoint

func (t *MemoryTransport) Endpoint() (*Endpoint, error)

Endpoints implements Transport.

func (*MemoryTransport) Listen

func (*MemoryTransport) Listen(*Endpoint) error

func (*MemoryTransport) Protocols

func (t *MemoryTransport) Protocols() []Protocol

Protocols implements Transport.

func (*MemoryTransport) String

func (t *MemoryTransport) String() string

String implements Transport.

type Metrics

type Metrics struct {
	// Number of peers connected.
	PeersConnected metrics.Gauge
	// Nomber of peers in the peer store database.
	PeersStored metrics.Gauge
	// Number of inactive peers stored.
	PeersInactivated metrics.Gauge
	// Number of bytes per channel received from a given peer.
	PeerReceiveBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"`
	// Number of bytes per channel sent to a given peer.
	PeerSendBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"`
	// Number of bytes pending being sent to a given peer.
	PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"`

	// Number of successful connection attempts
	PeersConnectedSuccess metrics.Counter
	// Number of failed connection attempts
	PeersConnectedFailure metrics.Counter

	// Number of peers connected as a result of dialing the
	// peer.
	PeersConnectedIncoming metrics.Gauge
	// Number of peers connected as a result of the peer dialing
	// this node.
	PeersConnectedOutgoing metrics.Gauge

	// Number of peers evicted by this node.
	PeersEvicted metrics.Counter

	// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
	// before sending on the connection.
	//metrics:The time taken to read off of a peer's queue before sending on the connection.
	RouterPeerQueueRecv metrics.Histogram

	// RouterPeerQueueSend defines the time taken to send on a peer's queue which
	// will later be read and sent on the connection (see RouterPeerQueueRecv).
	//metrics:The time taken to send on a peer's queue which will later be read and sent on the connection.
	RouterPeerQueueSend metrics.Histogram

	// RouterChannelQueueSend defines the time taken to send on a p2p channel's
	// queue which will later be consued by the corresponding reactor/service.
	//metrics:The time taken to send on a p2p channel's queue which will later be consued by the corresponding reactor/service.
	RouterChannelQueueSend metrics.Histogram

	// PeerQueueDroppedMsgs defines the number of messages dropped from a peer's
	// queue for a specific flow (i.e. Channel).
	//metrics:The number of messages dropped from a peer's queue for a specific p2p Channel.
	PeerQueueDroppedMsgs metrics.Counter `metrics_labels:"ch_id" metrics_name:"router_channel_queue_dropped_msgs"`

	// PeerQueueMsgSize defines the average size of messages sent over a peer's
	// queue for a specific flow (i.e. Channel).
	//metrics:The size of messages sent over a peer's queue for a specific p2p Channel.
	PeerQueueMsgSize metrics.Gauge `metrics_labels:"ch_id" metric_name:"router_channel_queue_msg_size"`
}

Metrics contains metrics exposed by this package.

func NopMetrics

func NopMetrics() *Metrics

func PrometheusMetrics

func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics

type NodeAddress

type NodeAddress struct {
	NodeID   types.NodeID
	Protocol Protocol
	Hostname string
	Port     uint16
	Path     string
}

NodeAddress is a node address URL. It differs from a transport Endpoint in that it contains the node's ID, and that the address hostname may be resolved into multiple IP addresses (and thus multiple endpoints).

If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part is expected to contain a node ID.

func ParseNodeAddress

func ParseNodeAddress(urlString string) (NodeAddress, error)

ParseNodeAddress parses a node address URL into a NodeAddress, normalizing and validating it.

func (NodeAddress) Resolve

func (a NodeAddress) Resolve(ctx context.Context) ([]*Endpoint, error)

Resolve resolves a NodeAddress into a set of Endpoints, by expanding out a DNS hostname to IP addresses.

func (NodeAddress) String

func (a NodeAddress) String() string

String formats the address as a URL string.

func (NodeAddress) Validate

func (a NodeAddress) Validate() error

Validate validates a NodeAddress.

type NodeIDResolver

type NodeIDResolver interface {
	// Resolve determines real node address, including node ID, based on the provided
	// validator address.
	Resolve(types.ValidatorAddress) (NodeAddress, error)
}

NodeIDResolver determines a node ID based on validator address

type PeerError

type PeerError struct {
	NodeID types.NodeID
	Err    error
	Fatal  bool
}

PeerError is a peer error reported via Channel.Error.

FIXME: This currently just disconnects the peer, which is too simplistic. For example, some errors should be logged, some should cause disconnects, and some should ban the peer.

FIXME: This should probably be replaced by a more general PeerBehavior concept that can mark good and bad behavior and contributes to peer scoring. It should possibly also allow reactors to request explicit actions, e.g. disconnection or banning, in addition to doing this based on aggregates.

func (PeerError) Error

func (pe PeerError) Error() string

func (PeerError) Unwrap

func (pe PeerError) Unwrap() error

type PeerEventSubscriber

type PeerEventSubscriber func(context.Context, string) *PeerUpdates

PeerEventSubscriber describes the type of the subscription method, to assist in isolating reactors specific construction and lifecycle from the peer manager.

type PeerManager

type PeerManager struct {
	// contains filtered or unexported fields
}

PeerManager manages peer lifecycle information, using a peerStore for underlying storage. Its primary purpose is to determine which peer to connect to next (including retry timers), make sure a peer only has a single active connection (either inbound or outbound), and evict peers to make room for higher-scored peers. It does not manage actual connections (this is handled by the Router), only the peer lifecycle state.

For an outbound connection, the flow is as follows:

  • DialNext: return a peer address to dial, mark peer as dialing.
  • DialFailed: report a dial failure, unmark as dialing.
  • Dialed: report a dial success, unmark as dialing and mark as connected (errors if already connected, e.g. by Accepted).
  • Ready: report routing is ready, mark as ready and broadcast PeerStatusUp.
  • Disconnected: report peer disconnect, unmark as connected and broadcasts PeerStatusDown.

For an inbound connection, the flow is as follows:

  • Accepted: report inbound connection success, mark as connected (errors if already connected, e.g. by Dialed).
  • Ready: report routing is ready, mark as ready and broadcast PeerStatusUp.
  • Disconnected: report peer disconnect, unmark as connected and broadcasts PeerStatusDown.

When evicting peers, either because peers are explicitly scheduled for eviction or we are connected to too many peers, the flow is as follows:

  • EvictNext: if marked evict and connected, unmark evict and mark evicting. If beyond MaxConnected, pick lowest-scored peer and mark evicting.
  • Disconnected: unmark connected, evicting, evict, and broadcast a PeerStatusDown peer update.

If all connection slots are full (at MaxConnections), we can use up to MaxConnectionsUpgrade additional connections to probe any higher-scored unconnected peers, and if we reach them (or they reach us) we allow the connection and evict a lower-scored peer. We mark the lower-scored peer as upgrading[from]=to to make sure no other higher-scored peers can claim the same one for an upgrade. The flow is as follows:

  • Accepted: if upgrade is possible, mark connected and add lower-scored to evict.
  • DialNext: if upgrade is possible, mark upgrading[from]=to and dialing.
  • DialFailed: unmark upgrading[from]=to and dialing.
  • Dialed: unmark upgrading[from]=to and dialing, mark as connected, add lower-scored to evict.
  • EvictNext: pick peer from evict, mark as evicting.
  • Disconnected: unmark connected, upgrading[from]=to, evict, evicting.

func NewPeerManager

func NewPeerManager(_ctx context.Context, selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptions) (*PeerManager, error)

NewPeerManager creates a new peer manager.

func (*PeerManager) Accepted

func (m *PeerManager) Accepted(peerID types.NodeID, peerOpts ...func(*peerInfo)) error

Accepted marks an incoming peer connection successfully accepted. If the peer is already connected or we don't allow additional connections then this will return an error.

If full but MaxConnectedUpgrade is non-zero and the incoming peer is better-scored than any existing peers, then we accept it and evict a lower-scored peer.

NOTE: We can't take an address here, since e.g. TCP uses a different port number for outbound traffic than inbound traffic, so the peer's endpoint wouldn't necessarily be an appropriate address to dial.

FIXME: When we accept a connection from a peer, we should register that peer's address in the peer store so that we can dial it later. In order to do that, we'll need to get the remote address after all, but as noted above that can't be the remote endpoint since that will usually have the wrong port number.

func (*PeerManager) Add

func (m *PeerManager) Add(address NodeAddress) (bool, error)

Add adds a peer to the manager, given as an address. If the peer already exists, the address is added to it if it isn't already present. This will push low scoring peers out of the address book if it exceeds the maximum size.

func (*PeerManager) Addresses

func (m *PeerManager) Addresses(peerID types.NodeID) []NodeAddress

Addresses returns all known addresses for a peer, primarily for testing. The order is arbitrary.

func (*PeerManager) Advertise

func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress

Advertise returns a list of peer addresses to advertise to a peer.

It sorts all peers in the peer store, and assembles a list of peers that is most likely to include the highest priority of peers.

func (*PeerManager) Close

func (m *PeerManager) Close() error

Close closes peer manager and frees up all resources

func (*PeerManager) DialFailed

func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error

DialFailed reports a failed dial attempt. This will make the peer available for dialing again when appropriate (possibly after a retry timeout).

func (*PeerManager) DialNext

func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error)

DialNext finds an appropriate peer address to dial, and marks it as dialing. If no peer is found, or all connection slots are full, it blocks until one becomes available. The caller must call Dialed() or DialFailed() for the returned peer.

func (*PeerManager) Dialed

func (m *PeerManager) Dialed(address NodeAddress, peerOpts ...func(*peerInfo)) error

Dialed marks a peer as successfully dialed. Any further connections will be rejected, and once disconnected the peer may be dialed again.

func (*PeerManager) Disconnected

func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID)

Disconnected unmarks a peer as connected, allowing it to be dialed or accepted again as appropriate.

func (*PeerManager) Errored

func (m *PeerManager) Errored(peerID types.NodeID, err error)

Errored reports a peer error, causing the peer to be evicted if it's currently connected.

FIXME: This should probably be replaced with a peer behavior API, see PeerError comments for more details.

FIXME: This will cause the peer manager to immediately try to reconnect to the peer, which is probably not always what we want.

func (*PeerManager) EvictNext

func (m *PeerManager) EvictNext(ctx context.Context) (types.NodeID, error)

EvictNext returns the next peer to evict (i.e. disconnect). If no evictable peers are found, the call will block until one becomes available.

func (*PeerManager) EvictPeer

func (m *PeerManager) EvictPeer(nodeID types.NodeID)

EvictPeer evicts a peer by a node id

func (*PeerManager) HasDialedMaxPeers

func (m *PeerManager) HasDialedMaxPeers() bool

func (*PeerManager) HasMaxPeerCapacity

func (m *PeerManager) HasMaxPeerCapacity() bool

func (*PeerManager) Inactivate

func (m *PeerManager) Inactivate(peerID types.NodeID) error

Inactivate marks a peer as inactive which means we won't attempt to dial this peer again. A peer can be reactivated by successfully dialing and connecting to the node.

func (*PeerManager) IsDialingOrConnected

func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool

IsDialingOrConnected returns true if dialing to a peer at the moment or already connected otherwise false

func (*PeerManager) PeerRatio

func (m *PeerManager) PeerRatio() float64

PeerRatio returns the ratio of peer addresses stored to the maximum size.

func (*PeerManager) Peers

func (m *PeerManager) Peers() []types.NodeID

Peers returns all known peers, primarily for testing. The order is arbitrary.

func (*PeerManager) Ready

func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID, channels ChannelIDSet)

Ready marks a peer as ready, broadcasting status updates to subscribers. The peer must already be marked as connected. This is separate from Dialed() and Accepted() to allow the router to set up its internal queues before reactors start sending messages. The channels set here are passed in the peer update broadcast to reactors, which can then mediate their own behavior based on the capability of the peers.

func (*PeerManager) Register

func (m *PeerManager) Register(ctx context.Context, peerUpdates *PeerUpdates)

Register allows you to inject a custom PeerUpdate instance into the PeerManager, rather than relying on the instance constructed by the Subscribe method, which wraps the functionality of the Register method.

The caller must consume the peer updates from this PeerUpdates instance in a timely fashion and close the subscription when done, otherwise the PeerManager will halt.

func (*PeerManager) Scores

func (m *PeerManager) Scores() map[types.NodeID]PeerScore

Scores returns the peer scores for all known peers, primarily for testing.

func (*PeerManager) SetLogger

func (m *PeerManager) SetLogger(logger log.Logger)

SetLogger sets a logger for the PeerManager

func (*PeerManager) Status

func (m *PeerManager) Status(id types.NodeID) PeerStatus

Status returns the status for a peer, primarily for testing.

func (*PeerManager) Subscribe

func (m *PeerManager) Subscribe(ctx context.Context, subscriberName string) *PeerUpdates

Subscribe subscribes to peer updates. The caller must consume the peer updates in a timely fashion and close the subscription when done, otherwise the PeerManager will halt.

func (*PeerManager) TryDialNext

func (m *PeerManager) TryDialNext() NodeAddress

TryDialNext is equivalent to DialNext(), but immediately returns an empty address if no peers or connection slots are available.

func (*PeerManager) TryEvictNext

func (m *PeerManager) TryEvictNext() (types.NodeID, error)

TryEvictNext is equivalent to EvictNext, but immediately returns an empty node ID if no evictable peers are found.

func (*PeerManager) Unsubscribe

func (m *PeerManager) Unsubscribe(sub *PeerUpdates)

Unsubscribe removes subscription from subscription list

func (*PeerManager) UpdatePeerInfo

func (m *PeerManager) UpdatePeerInfo(nodeID types.NodeID, modifier func(peerInfo peerInfo) peerInfo) error

UpdatePeerInfo modifies a peer-info using a function modificator, this operation is a transactional

type PeerManagerOptions

type PeerManagerOptions struct {
	// PersistentPeers are peers that we want to maintain persistent connections
	// to. These will be scored higher than other peers, and if
	// MaxConnectedUpgrade is non-zero any lower-scored peers will be evicted if
	// necessary to make room for these.
	PersistentPeers []types.NodeID

	// MaxPeers is the maximum number of peers to track information about, i.e.
	// store in the peer store. When exceeded, the lowest-scored unconnected peers
	// will be deleted. 0 means no limit.
	MaxPeers uint16

	// MaxConnected is the maximum number of connected peers (inbound and
	// outbound). 0 means no limit.
	MaxConnected uint16

	// MaxOutgoingConnections specifies how many outgoing
	// connections a node will maintain. It must be lower than MaxConnected. If it is
	// 0, then all connections can be outgoing. Once this limit is
	// reached, the node will not dial peers, allowing the
	// remaining peer connections to be used by incoming connections.
	MaxOutgoingConnections uint16

	// MaxConnectedUpgrade is the maximum number of additional connections to
	// use for probing any better-scored peers to upgrade to when all connection
	// slots are full. 0 disables peer upgrading.
	//
	// For example, if we are already connected to MaxConnected peers, but we
	// know or learn about better-scored peers (e.g. configured persistent
	// peers) that we are not connected too, then we can probe these peers by
	// using up to MaxConnectedUpgrade connections, and once connected evict the
	// lowest-scored connected peers. This also works for inbound connections,
	// i.e. if a higher-scored peer attempts to connect to us, we can accept
	// the connection and evict a lower-scored peer.
	MaxConnectedUpgrade uint16

	// MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted.
	// Defaults to 0 which disables this mechanism.
	// Used on seed nodes to evict peers and make space for others.
	MaxIncomingConnectionTime time.Duration

	// MinRetryTime is the minimum time to wait between retries. Retry times
	// double for each retry, up to MaxRetryTime. 0 disables retries.
	MinRetryTime time.Duration

	// MaxRetryTime is the maximum time to wait between retries. 0 means
	// no maximum, in which case the retry time will keep doubling.
	MaxRetryTime time.Duration

	// MaxRetryTimePersistent is the maximum time to wait between retries for
	// peers listed in PersistentPeers. 0 uses MaxRetryTime instead.
	MaxRetryTimePersistent time.Duration

	// RetryTimeJitter is the upper bound of a random interval added to
	// retry times, to avoid thundering herds. 0 disables jitter.
	RetryTimeJitter time.Duration

	// DisconnectCooldownPeriod is the amount of time after we
	// disconnect from a peer before we'll consider dialing a new peer
	DisconnectCooldownPeriod time.Duration

	// PeerScores sets fixed scores for specific peers. It is mainly used
	// for testing. A score of 0 is ignored.
	PeerScores map[types.NodeID]PeerScore

	// PrivatePeerIDs defines a set of NodeID objects which the PEX reactor will
	// consider private and never gossip.
	PrivatePeers map[types.NodeID]struct{}

	// SelfAddress is the address that will be advertised to peers for them to dial back to us.
	// If Hostname and Port are unset, Advertise() will include no self-announcement
	SelfAddress NodeAddress

	// Peer Metrics
	Metrics *Metrics
	// contains filtered or unexported fields
}

PeerManagerOptions specifies options for a PeerManager.

func (*PeerManagerOptions) Validate

func (o *PeerManagerOptions) Validate() error

Validate validates the options.

type PeerScore

type PeerScore int16

PeerScore is a numeric score assigned to a peer (higher is better).

const (
	PeerScorePersistent       PeerScore = math.MaxInt16 // persistent peers
	MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1
)

type PeerStatus

type PeerStatus string

PeerStatus is a peer status.

The peer manager has many more internal states for a peer (e.g. dialing, connected, evicting, and so on), which are tracked separately. PeerStatus is for external use outside of the peer manager.

const (
	PeerStatusUp   PeerStatus = "up"   // connected and ready
	PeerStatusDown PeerStatus = "down" // disconnected
	PeerStatusGood PeerStatus = "good" // peer observed as good
	PeerStatusBad  PeerStatus = "bad"  // peer observed as bad
)

type PeerUpdate

type PeerUpdate struct {
	NodeID   types.NodeID
	Status   PeerStatus
	Channels ChannelIDSet
	// ProTxHash is accessible only for validator
	ProTxHash types.ProTxHash
}

PeerUpdate is a peer update event sent via PeerUpdates.

func (*PeerUpdate) SetProTxHash

func (pu *PeerUpdate) SetProTxHash(proTxHash types.ProTxHash)

SetProTxHash copies `proTxHash` into `PeerUpdate.ProTxHash`

type PeerUpdates

type PeerUpdates struct {
	// contains filtered or unexported fields
}

PeerUpdates is a peer update subscription with notifications about peer events (currently just status changes).

func NewPeerUpdates

func NewPeerUpdates(updatesCh chan PeerUpdate, routerUpdatesBufSize int, subscriberName string) *PeerUpdates

NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for internal use, callers should typically use PeerManager.Subscribe(). The subscriber must call Close() when done.

func (*PeerUpdates) SendUpdate

func (pu *PeerUpdates) SendUpdate(ctx context.Context, update PeerUpdate)

SendUpdate pushes information about a peer into the routing layer, presumably from a peer.

func (*PeerUpdates) Updates

func (pu *PeerUpdates) Updates() <-chan PeerUpdate

Updates returns a channel for consuming peer updates.

type Protocol

type Protocol string

Protocol identifies a transport protocol.

const (
	MConnProtocol Protocol = "mconn"
	TCPProtocol   Protocol = "tcp"
)
const (
	MemoryProtocol Protocol = "memory"
)

type Router

type Router struct {
	*service.BaseService
	// contains filtered or unexported fields
}

Router manages peer connections and routes messages between peers and reactor channels. It takes a PeerManager for peer lifecycle management (e.g. which peers to dial and when) and a set of Transports for connecting and communicating with peers.

On startup, three main goroutines are spawned to maintain peer connections:

dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
address to dial and spawns a goroutine that dials the peer, handshakes
with it, and begins to route messages if successful.

acceptPeers(): in a loop, waits for an inbound connection via
Transport.Accept() and spawns a goroutine that handshakes with it and
begins to route messages if successful.

evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
peer to evict, and disconnects it by closing its message queue.

When a peer is connected, an outbound peer message queue is registered in peerQueues, and routePeer() is called to spawn off two additional goroutines:

sendPeer(): waits for an outbound message from the peerQueues queue,
marshals it, and passes it to the peer transport which delivers it.

receivePeer(): waits for an inbound message from the peer transport,
unmarshals it, and passes it to the appropriate inbound channel queue
in channelQueues.

When a reactor opens a channel via OpenChannel, an inbound channel message queue is registered in channelQueues, and a channel goroutine is spawned:

routeChannel(): waits for an outbound message from the channel, looks
up the recipient peer's outbound message queue in peerQueues, and submits
the message to it.

All channel sends in the router are blocking. It is the responsibility of the queue interface in peerQueues and channelQueues to prioritize and drop messages as appropriate during contention to prevent stalls and ensure good quality of service.

func NewRouter

func NewRouter(
	logger log.Logger,
	metrics *Metrics,
	privKey crypto.PrivKey,
	peerManager *PeerManager,
	nodeInfoProducer func() *types.NodeInfo,
	transport Transport,
	endpoint *Endpoint,
	options RouterOptions,
) (*Router, error)

NewRouter creates a new Router. The given Transports must already be listening on appropriate interfaces, and will be closed by the Router when it stops.

func (*Router) OnStart

func (r *Router) OnStart(ctx context.Context) error

OnStart implements service.Service.

func (*Router) OnStop

func (r *Router) OnStop()

OnStop implements service.Service.

All channels must be closed by OpenChannel() callers before stopping the router, to prevent blocked channel sends in reactors. Channels are not closed here, since that would cause any reactor senders to panic, so it is the sender's responsibility.

func (*Router) OpenChannel

func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error)

OpenChannel opens a new channel for the given message type. The caller must close the channel when done, before stopping the Router. messageType is the type of message passed through the channel (used for unmarshaling), which can implement Wrapper to automatically (un)wrap multiple message types in a wrapper message. The caller may provide a size to make the channel buffered, which internally makes the inbound, outbound, and error channel buffered.

type RouterOptions

type RouterOptions struct {
	// ResolveTimeout is the timeout for resolving NodeAddress URLs.
	// 0 means no timeout.
	ResolveTimeout time.Duration

	// DialTimeout is the timeout for dialing a peer. 0 means no timeout.
	DialTimeout time.Duration

	// HandshakeTimeout is the timeout for handshaking with a peer. 0 means
	// no timeout.
	HandshakeTimeout time.Duration

	// QueueType must be, "priority", or "fifo". Defaults to
	// "fifo".
	QueueType string

	// MaxIncomingConnectionAttempts rate limits the number of incoming connection
	// attempts per IP address. Defaults to 100.
	MaxIncomingConnectionAttempts uint

	// IncomingConnectionWindow describes how often an IP address
	// can attempt to create a new connection. Defaults to 10
	// milliseconds, and cannot be less than 1 millisecond.
	IncomingConnectionWindow time.Duration

	// FilterPeerByIP is used by the router to inject filtering
	// behavior for new incoming connections. The router passes
	// the remote IP of the incoming connection the port number as
	// arguments. Functions should return an error to reject the
	// peer.
	FilterPeerByIP func(context.Context, net.IP, uint16) error

	// FilterPeerByID is used by the router to inject filtering
	// behavior for new incoming connections. The router passes
	// the NodeID of the node before completing the connection,
	// but this occurs after the handshake is complete. Filter by
	// IP address to filter before the handshake. Functions should
	// return an error to reject the peer.
	FilterPeerByID func(context.Context, types.NodeID) error

	// NumConcrruentDials controls how many parallel go routines
	// are used to dial peers. This defaults to the value of
	// runtime.NumCPU.
	NumConcurrentDials func() int
}

RouterOptions specifies options for a Router.

func (*RouterOptions) Validate

func (o *RouterOptions) Validate() error

Validate validates router options.

type Transport

type Transport interface {
	// Listen starts the transport on the specified endpoint.
	Listen(*Endpoint) error

	// Protocols returns the protocols supported by the transport. The Router
	// uses this to pick a transport for an Endpoint.
	Protocols() []Protocol

	// Endpoints returns the local endpoints the transport is listening on, if any.
	//
	// How to listen is transport-dependent, e.g. MConnTransport uses Listen() while
	// MemoryTransport starts listening via MemoryNetwork.CreateTransport().
	Endpoint() (*Endpoint, error)

	// Accept waits for the next inbound connection on a listening endpoint, blocking
	// until either a connection is available or the transport is closed. On closure,
	// io.EOF is returned and further Accept calls are futile.
	Accept(context.Context) (Connection, error)

	// Dial creates an outbound connection to an endpoint.
	Dial(context.Context, *Endpoint) (Connection, error)

	// Close stops accepting new connections, but does not close active connections.
	Close() error

	// AddChannelDescriptors is only part of this interface
	// temporarily
	AddChannelDescriptors([]*ChannelDescriptor)

	// Stringer is used to display the transport, e.g. in logs.
	//
	// Without this, the logger may use reflection to access and display
	// internal fields. These can be written to concurrently, which can trigger
	// the race detector or even cause a panic.
	fmt.Stringer
}

Transport is a connection-oriented mechanism for exchanging data with a peer.

type Wrapper

type Wrapper interface {
	proto.Message

	// Wrap will take a message and wrap it in this one if possible.
	Wrap(proto.Message) error

	// Unwrap will unwrap the inner message contained in this message.
	Unwrap() (proto.Message, error)
}

Wrapper is a Protobuf message that can contain a variety of inner messages (e.g. via oneof fields). If a Channel's message type implements Wrapper, the Router will automatically wrap outbound messages and unwrap inbound messages, such that reactors do not have to do this themselves.

Directories

Path Synopsis
Package PEX (Peer exchange) handles all the logic necessary for nodes to share information about their peers to other nodes.
Package PEX (Peer exchange) handles all the logic necessary for nodes to share information about their peers to other nodes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL