Documentation ¶
Overview ¶
Package p2p implements the Ethereum consensus networking specification.
Canonical spec reference: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md
Prysm specific implementation design docs
- Networking Design Doc: https://docs.google.com/document/d/1VyhobQRkEjEkEPxmmdWvaHfKWn0j6dEae_wLZlrFtfU/view
This package is heavily utilizes the libp2p go implementation by Protocol Labs.
Package p2p defines the network protocol implementation for Ethereum consensus used by beacon nodes, including peer discovery using discv5, gossip-sub using libp2p, and handing peer lifecycles + handshakes.
Index ¶
- Constants
- Variables
- func AllTopics() []string
- func ExtractGossipDigest(topic string) ([4]byte, error)
- func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message
- func MakePeer(addr string) (*peer.AddrInfo, error)
- func MsgID(genesisValidatorsRoot []byte, pmsg *pubsubpb.Message) string
- func MultiAddressBuilder(ipAddr string, port uint) (ma.Multiaddr, error)
- func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error)
- func SerializeENR(record *enr.Record) (string, error)
- func TopicDeconstructor(topic string) (string, string, string, error)
- func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error)
- func VerifyTopicMapping(topic string, msg interface{}) error
- type Broadcaster
- type Config
- type ConnectionHandler
- type EncodingProvider
- type Listener
- type MetadataProvider
- type P2P
- type PeerManager
- type PeersProvider
- type PubSubProvider
- type PubSubTopicUser
- type RPCTopic
- type Sender
- type SenderEncoder
- type Service
- func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error)
- func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error)
- func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
- func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error
- func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
- func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
- func (s *Service) CanSubscribe(topic string) bool
- func (s *Service) Connect(pi peer.AddrInfo) error
- func (s *Service) Disconnect(pid peer.ID) error
- func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error)
- func (s *Service) ENR() *enr.Record
- func (_ *Service) Encoding() encoder.NetworkEncoding
- func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error)
- func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, index uint64, threshold int) (bool, error)
- func (s *Service) Host() host.Host
- func (s *Service) InfoHandler(w http.ResponseWriter, _ *http.Request)
- func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool)
- func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool)
- func (_ *Service) InterceptPeerDial(_ peer.ID) (allow bool)
- func (_ *Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool)
- func (_ *Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason)
- func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error)
- func (s *Service) LeaveTopic(topic string) error
- func (s *Service) Metadata() metadata.Metadata
- func (s *Service) MetadataSeq() uint64
- func (s *Service) PeerID() peer.ID
- func (s *Service) Peers() *peers.Status
- func (s *Service) PubSub() *pubsub.PubSub
- func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error
- func (s *Service) RefreshENR()
- func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error)
- func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler)
- func (s *Service) Start()
- func (s *Service) Started() bool
- func (s *Service) Status() error
- func (s *Service) Stop() error
- func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error)
- type SetStreamHandler
Constants ¶
const ( // V1 RPC Topics // RPCStatusTopicV1 defines the v1 topic for the status rpc method. RPCStatusTopicV1 = protocolPrefix + StatusMessageName + SchemaVersionV1 // RPCGoodByeTopicV1 defines the v1 topic for the goodbye rpc method. RPCGoodByeTopicV1 = protocolPrefix + GoodbyeMessageName + SchemaVersionV1 // RPCBlocksByRangeTopicV1 defines v1 the topic for the blocks by range rpc method. RPCBlocksByRangeTopicV1 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV1 // RPCBlocksByRootTopicV1 defines the v1 topic for the blocks by root rpc method. RPCBlocksByRootTopicV1 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV1 // RPCPingTopicV1 defines the v1 topic for the ping rpc method. RPCPingTopicV1 = protocolPrefix + PingMessageName + SchemaVersionV1 // RPCMetaDataTopicV1 defines the v1 topic for the metadata rpc method. RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1 // V2 RPC Topics // RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method. RPCBlocksByRangeTopicV2 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV2 // RPCBlocksByRootTopicV2 defines the v2 topic for the blocks by root rpc method. RPCBlocksByRootTopicV2 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV2 // RPCMetaDataTopicV2 defines the v2 topic for the metadata rpc method. RPCMetaDataTopicV2 = protocolPrefix + MetadataMessageName + SchemaVersionV2 )
const ( // GossipProtocolAndDigest represents the protocol and fork digest prefix in a gossip topic. GossipProtocolAndDigest = "/eth2/%x/" // Message Types // // GossipAttestationMessage is the name for the attestation message type. It is // specially extracted so as to determine the correct message type from an attestation // subnet. GossipAttestationMessage = "beacon_attestation" // GossipSyncCommitteeMessage is the name for the sync committee message type. It is // specially extracted so as to determine the correct message type from a sync committee // subnet. GossipSyncCommitteeMessage = "sync_committee" // GossipBlockMessage is the name for the block message type. GossipBlockMessage = "beacon_block" // GossipExitMessage is the name for the voluntary exit message type. GossipExitMessage = "voluntary_exit" // GossipProposerSlashingMessage is the name for the proposer slashing message type. GossipProposerSlashingMessage = "proposer_slashing" // GossipAttesterSlashingMessage is the name for the attester slashing message type. GossipAttesterSlashingMessage = "attester_slashing" // GossipAggregateAndProofMessage is the name for the attestation aggregate and proof message type. GossipAggregateAndProofMessage = "beacon_aggregate_and_proof" // GossipContributionAndProofMessage is the name for the sync contribution and proof message type. GossipContributionAndProofMessage = "sync_committee_contribution_and_proof" // GossipBlsToExecutionChangeMessage is the name for the bls to execution change message type. GossipBlsToExecutionChangeMessage = "bls_to_execution_change" // Topic Formats // // AttestationSubnetTopicFormat is the topic format for the attestation subnet. AttestationSubnetTopicFormat = GossipProtocolAndDigest + GossipAttestationMessage + "_%d" // SyncCommitteeSubnetTopicFormat is the topic format for the sync committee subnet. SyncCommitteeSubnetTopicFormat = GossipProtocolAndDigest + GossipSyncCommitteeMessage + "_%d" // BlockSubnetTopicFormat is the topic format for the block subnet. BlockSubnetTopicFormat = GossipProtocolAndDigest + GossipBlockMessage // ExitSubnetTopicFormat is the topic format for the voluntary exit subnet. ExitSubnetTopicFormat = GossipProtocolAndDigest + GossipExitMessage // ProposerSlashingSubnetTopicFormat is the topic format for the proposer slashing subnet. ProposerSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipProposerSlashingMessage // AttesterSlashingSubnetTopicFormat is the topic format for the attester slashing subnet. AttesterSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipAttesterSlashingMessage // AggregateAndProofSubnetTopicFormat is the topic format for the aggregate and proof subnet. AggregateAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipAggregateAndProofMessage // SyncContributionAndProofSubnetTopicFormat is the topic format for the sync aggregate and proof subnet. SyncContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage // BlsToExecutionChangeSubnetTopicFormat is the topic format for the bls to execution change subnet. BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage )
const BeaconBlocksByRangeMessageName = "/beacon_blocks_by_range"
BeaconBlocksByRangeMessageName specifies the name for the beacon blocks by range message topic.
const BeaconBlocksByRootsMessageName = "/beacon_blocks_by_root"
BeaconBlocksByRootsMessageName specifies the name for the beacon blocks by root message topic.
const GoodbyeMessageName = "/goodbye"
GoodbyeMessageName specifies the name for the goodbye message topic.
const MetadataMessageName = "/metadata"
MetadataMessageName specifies the name for the metadata message topic.
const PingMessageName = "/ping"
PingMessageName Specifies the name for the ping message topic.
const SchemaVersionV1 = "/1"
SchemaVersionV1 specifies the schema version for our rpc protocol ID.
const SchemaVersionV2 = "/2"
SchemaVersionV2 specifies the next schema version for our rpc protocol ID.
const StatusMessageName = "/status"
StatusMessageName specifies the name for the status message topic.
Variables ¶
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
ErrMessageNotMapped occurs on a Broadcast attempt when a message has not been defined in the GossipTypeMapping.
GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message can be mapped to a protocol ID string.
var RPCTopicMappings = map[string]interface{}{ RPCStatusTopicV1: new(pb.Status), RPCGoodByeTopicV1: new(primitives.SSZUint64), RPCBlocksByRangeTopicV1: new(pb.BeaconBlocksByRangeRequest), RPCBlocksByRangeTopicV2: new(pb.BeaconBlocksByRangeRequest), RPCBlocksByRootTopicV1: new(p2ptypes.BeaconBlockByRootsReq), RPCBlocksByRootTopicV2: new(p2ptypes.BeaconBlockByRootsReq), RPCPingTopicV1: new(primitives.SSZUint64), RPCMetaDataTopicV1: new(interface{}), RPCMetaDataTopicV2: new(interface{}), }
RPCTopicMappings map the base message type to the rpc request.
Functions ¶
func AllTopics ¶
func AllTopics() []string
AllTopics returns all topics stored in our gossip mapping.
func ExtractGossipDigest ¶
ExtractGossipDigest extracts the relevant fork digest from the gossip topic. Topics are in the form of /eth2/{fork-digest}/{topic} and this method extracts the fork digest from the topic string to a 4 byte array.
func GossipTopicMappings ¶
func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message
GossipTopicMappings is a function to return the assigned data type versioned by epoch.
func MsgID ¶
MsgID is a content addressable ID function.
Ethereum Beacon Chain spec defines the message ID as:
The `message-id` of a gossipsub message MUST be the following 20 byte value computed from the message data: If `message.data` has a valid snappy decompression, set `message-id` to the first 20 bytes of the `SHA256` hash of the concatenation of `MESSAGE_DOMAIN_VALID_SNAPPY` with the snappy decompressed message data, i.e. `SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20]`. Otherwise, set `message-id` to the first 20 bytes of the `SHA256` hash of the concatenation of `MESSAGE_DOMAIN_INVALID_SNAPPY` with the raw message data, i.e. `SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + message.data)[:20]`.
func MultiAddressBuilder ¶
MultiAddressBuilder takes in an ip address string and port to produce a go multiaddr format.
func PeersFromStringAddrs ¶
PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
func SerializeENR ¶
SerializeENR takes the enr record in its key-value form and serializes it.
func TopicDeconstructor ¶
TopicDeconstructor splits the provided topic to its logical sub-sections. It is assumed all input topics will follow the specific schema: /protocol-prefix/message-name/schema-version/... For the purposes of deconstruction, only the first 3 components are relevant.
func TopicFromMessage ¶
func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error)
TopicFromMessage constructs the rpc topic from the provided message type and epoch.
func VerifyTopicMapping ¶
VerifyTopicMapping verifies that the topic and its accompanying message type is correct.
Types ¶
type Broadcaster ¶
type Broadcaster interface { Broadcast(context.Context, proto.Message) error BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error }
Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Config ¶
type Config struct { NoDiscovery bool EnableUPnP bool StaticPeerID bool StaticPeers []string BootstrapNodeAddr []string Discv5BootStrapAddr []string RelayNodeAddr string LocalIP string HostAddress string HostDNS string PrivateKey string DataDir string MetaDataDir string TCPPort uint UDPPort uint MaxPeers uint AllowListCIDR string DenyListCIDR []string StateNotifier statefeed.Notifier DB db.ReadOnlyDatabase ClockWaiter startup.ClockWaiter }
Config for the p2p service. These parameters are set from application level flags to initialize the p2p service.
type ConnectionHandler ¶
type ConnectionHandler interface { AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, j func(ctx context.Context, id peer.ID) error) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) connmgr.ConnectionGater }
ConnectionHandler configures p2p to handle connections with a peer.
type EncodingProvider ¶
type EncodingProvider interface {
Encoding() encoder.NetworkEncoding
}
EncodingProvider provides p2p network encoding.
type Listener ¶
type Listener interface { Self() *enode.Node Close() Lookup(enode.ID) []*enode.Node Resolve(*enode.Node) *enode.Node RandomNodes() enode.Iterator Ping(*enode.Node) error RequestENR(*enode.Node) (*enode.Node, error) LocalNode() *enode.LocalNode }
Listener defines the discovery V5 network interface that is used to communicate with other peers.
type MetadataProvider ¶
MetadataProvider returns the metadata related information for the local peer.
type P2P ¶
type P2P interface { Broadcaster SetStreamHandler PubSubProvider PubSubTopicUser SenderEncoder PeerManager ConnectionHandler PeersProvider MetadataProvider }
P2P represents the full p2p interface composed of all of the sub-interfaces.
type PeerManager ¶
type PeerManager interface { Disconnect(peer.ID) error PeerID() peer.ID Host() host.Host ENR() *enr.Record DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshENR() FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) }
PeerManager abstracts some peer management methods from libp2p.
type PeersProvider ¶
PeersProvider abstracts obtaining our current list of known peers status.
type PubSubProvider ¶
PubSubProvider provides the p2p pubsub protocol.
type PubSubTopicUser ¶
type PubSubTopicUser interface { JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) LeaveTopic(topic string) error PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) }
PubSubTopicUser provides way to join, use and leave PubSub topics.
type RPCTopic ¶
type RPCTopic string
RPCTopic is a type used to denote and represent a req/resp topic.
func (RPCTopic) MessageType ¶
MessageType returns the message type of the rpc topic.
func (RPCTopic) ProtocolPrefix ¶
ProtocolPrefix returns the protocol prefix of the rpc topic.
type Sender ¶
type Sender interface {
Send(context.Context, interface{}, string, peer.ID) (network.Stream, error)
}
Sender abstracts the sending functionality from libp2p.
type SenderEncoder ¶
type SenderEncoder interface { EncodingProvider Sender }
SenderEncoder allows sending functionality from libp2p as well as encoding for requests and responses.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service for managing peer to peer (p2p) networking.
func NewService ¶
NewService initializes a new p2p service compatible with shared.Service interface. No connections are made until the Start function is called during the service registry startup.
func (*Service) AddConnectionHandler ¶
func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Context, id peer.ID) error)
AddConnectionHandler adds a callback function which handles the connection with a newly added peer. It performs a handshake with that peer by sending a hello request and validating the response from the peer.
func (*Service) AddDisconnectionHandler ¶
AddDisconnectionHandler disconnects from peers. It handles updating the peer status. This also calls the handler responsible for maintaining other parts of the sync or p2p system.
func (*Service) AddPingMethod ¶
AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can be used to refresh ENR.
func (*Service) Broadcast ¶
Broadcast a message to the p2p network, the message is assumed to be broadcasted to the current fork.
func (*Service) BroadcastAttestation ¶
func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be broadcasted to the current fork.
func (*Service) BroadcastSyncCommitteeMessage ¶
func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastSyncCommitteeMessage broadcasts a sync committee message to the p2p network, the message is assumed to be broadcasted to the current fork.
func (*Service) CanSubscribe ¶
CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (*Service) Disconnect ¶
Disconnect from a peer.
func (*Service) DiscoveryAddresses ¶
DiscoveryAddresses represents our enr addresses as multiaddresses.
func (*Service) Encoding ¶
func (_ *Service) Encoding() encoder.NetworkEncoding
Encoding returns the configured networking encoding.
func (*Service) FilterIncomingSubscriptions ¶
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error)
FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications. This method returns only the topics of interest and may return an error if the subscription request contains too many topics.
func (*Service) FindPeersWithSubnet ¶
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, index uint64, threshold int) (bool, error)
FindPeersWithSubnet performs a network search for peers subscribed to a particular subnet. Then we try to connect with those peers. This method will block until the required amount of peers are found, the method only exits in the event of context timeouts.
func (*Service) InfoHandler ¶
func (s *Service) InfoHandler(w http.ResponseWriter, _ *http.Request)
InfoHandler is a handler to serve /p2p page in metrics.
func (*Service) InterceptAccept ¶
func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool)
InterceptAccept checks whether the incidental inbound connection is allowed.
func (*Service) InterceptAddrDial ¶
InterceptAddrDial tests whether we're permitted to dial the specified multiaddr for the given peer.
func (*Service) InterceptPeerDial ¶
InterceptPeerDial tests whether we're permitted to Dial the specified peer.
func (*Service) InterceptSecured ¶
func (_ *Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool)
InterceptSecured tests whether a given connection, now authenticated, is allowed.
func (*Service) InterceptUpgraded ¶
InterceptUpgraded tests whether a fully capable connection is allowed.
func (*Service) LeaveTopic ¶
LeaveTopic closes topic and removes corresponding handler from list of joined topics. This method will return error if there are outstanding event handlers or subscriptions.
func (*Service) MetadataSeq ¶
MetadataSeq returns the metadata sequence number.
func (*Service) PublishToTopic ¶
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error
PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (*Service) RefreshENR ¶
func (s *Service) RefreshENR()
RefreshENR uses an epoch to refresh the enr entry for our node with the tracked committee ids for the epoch, allowing our node to be dynamically discoverable by others given our tracked committee ids.
func (*Service) Send ¶
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error)
Send a message to a specific peer. The returned stream may be used for reading, but has been closed for writing.
When done, the caller must Close or Reset on the stream.
func (*Service) SetStreamHandler ¶
func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler)
SetStreamHandler sets the protocol handler on the p2p host multiplexer. This method is a pass through to libp2pcore.Host.SetStreamHandler.
func (*Service) Status ¶
Status of the p2p service. Will return an error if the service is considered unhealthy to indicate that this node should not serve traffic until the issue has been resolved.
func (*Service) SubscribeToTopic ¶
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error)
SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
type SetStreamHandler ¶
type SetStreamHandler interface {
SetStreamHandler(topic string, handler network.StreamHandler)
}
SetStreamHandler configures p2p to handle streams of a certain topic ID.
Source Files ¶
- addr_factory.go
- broadcaster.go
- config.go
- connection_gater.go
- dial_relay_node.go
- discovery.go
- doc.go
- fork.go
- fork_watcher.go
- gossip_scoring_params.go
- gossip_topic_mappings.go
- handshake.go
- info.go
- interfaces.go
- iterator.go
- log.go
- message_id.go
- monitoring.go
- options.go
- pubsub.go
- pubsub_filter.go
- pubsub_tracer.go
- rpc_topic_mappings.go
- sender.go
- service.go
- subnets.go
- topics.go
- utils.go
- watch_peers.go
Directories ¶
Path | Synopsis |
---|---|
Package encoder allows for registering custom data encoders for information sent as raw bytes over the wire via p2p to other nodes.
|
Package encoder allows for registering custom data encoders for information sent as raw bytes over the wire via p2p to other nodes. |
Package peers provides information about peers at the Ethereum consensus protocol level.
|
Package peers provides information about peers at the Ethereum consensus protocol level. |
Package types contains all the respective p2p types that are required for sync but cannot be represented as a protobuf schema.
|
Package types contains all the respective p2p types that are required for sync but cannot be represented as a protobuf schema. |