p2p

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2021 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 4 more Imports: 26 Imported by: 39

README

Standalone Implementation of Factom's P2P Network

Summary

This package implements a partial gossip network, limited to peer connectivity and delivery of messages. The application is responsible for triggering fanout, rounds, and message repetition detection.

This is a complete rework of the code that aims to get rid of uncertainty of connections as well as polling strategies and also add support for handling multiple protocol versions. This will open up opportunities to improve network flow through generational increments without breaking backward compatibility.

Goals:

  • Fully configurable, independent, isolated instances
  • Add handshake process to connections to distinguish and identify nodes
  • Ability to handle multiple protocol schemes starting from version 9
  • Reduced network packet overhead in scheme version 10
  • Simple integration
  • Easier to read code with comprehensive documentation

Motivation

  • Peers are not identifiable beyond ip address. Multiple connections from the same node are not differentiated and assigned random numbers
  • Peers have inconsistent information (some data is only available after the first transmission)
  • Nodes depend on the seed file to join the network even across reboots; peers are not persisted
  • Connections are polled in a single loop to check for new data causing unnecessary delays
  • Connections have a complicated state machine and correlation with a Peer object
  • Complicated program flow with a lot of mixed channels, some of which are polled

Tackling some of these would require significant overhaul of the existing code to the point where it seemed like it would be easier to start from scratch.

Specification

Terminology

  • Peer/Node: A node is any application that uses the same p2p protocol to connect to the network. A peer is a node that is connected to us.
  • Endpoint: An internet location a node can connect to. Consists of an IP address and port.
  • Peer Hash: Each peer receives a unique peer hash using the format [ip]:[port] [hex nodeid], where [ip] is taken from the TCP connection itself, and [port] and [hex nodeid] are determined from the peer's handshake.
  • Parcel: A parcel is a container for network messages that has a type and a payload. Parcels of the application type are delivered to the application, others are used internally.

Package Structure

The P2P package consists of these major components:

  1. Network is the publicly visible interface that applications use to interact with the p2p network. It's initialized with a Configuration object and other components all have a back reference to the network so they can interact with each other.
  2. Controller is the heart of the p2p package. It's split over several files, separated by area of responsibility. The controller handles accepting/creating new connections, peer management, and data routing. The controller has a PeerStore that holds all active peer connections.
  3. Peers are connections to another node. Each peer has an active TCP connection and a Protocol, which translates Parcels into a format the peer can understand.

Overview

Quick Overview

Lifecycles
Peer

The foundation of a peer is a TCP connection, which are created either through an incoming connection, or a dial attempt. The peer object is initialized and the TCP connection is given to the handshake process (see below for more info). If the handshake process is unsuccessful, the tcp connection is torn down and the peer object is destroyed. If the handshake process is sucessful, the peer's read/send loops are started and it sends an online notification to the controller's status channel.

Upon receiving the online notification, the controller will include that peer in the PeerStore and make it available for routing. The read loop reads data from the connection and sends it to the controller. The send loops takes data from the controller and sends it to the connection. If an error occurs during the read/write process or the peer's Stop() function is called, the peer stops its internal loops and also sends an offline notification to the controller's status channel.

Upon receiving the offline notification, the controller will remove that peer from the PeerStore and destroy the object.

If the controller dials the same node, a new Peer object will be created rather than recycling the old one. If an error during read or write occurs, the Peer will call its own Stop() function.

Parcel (Application -> Remote Node)

The application creates a new parcel with the application type, a payload, and a target, which may either be a peer's hash, or one of the predefined flags: Broadcast, Full Broadcast, or Random Peer. The parcel is given to the ToNetwork channel.

The controller routes all parcels from the ToNetwork channel to individual Peer's send channels based on their target:

  1. Peer's Hash: parcel is given directly to that peer
  2. Random Peer: a random peer is given the parcel
  3. Broadcast: 16 peers (config: Fanout) are randomly selected from the list of non-special peers. Those 16 peers and all the special peers are given the parcel
  4. Full Broadcast: all peers are given the parcel

Each Peer monitors their send channel. If a parcel arrives, it is given to the Protocol. The Protocol reads the parcel and creates a corresponding protocol message, which is then written to the connection in a manner dictated by the protocol. For more information on the protocols, see below.

Parcel (Remote Node -> Application)

A Peer's Protocol reads a protocol message from the connection and turns it into a Parcel. The parcel is then given to the controller's peerData channel. The controller separates p2p parcels from application parcels. Application parcels are given to the FromNetwork channel without any further processing.

Protocol

CAT Peering

The CAT (Cyclic Auto Truncate) is a cyclic peering strategy to prevent a rigid network structure. It's based on rounds, during which random peers are indiscriminately dropped to make room for new connections. If a node is full it will reject incoming connections and provide a list of alternative nodes to try and peer with. There are three components, Rounds, Replenish, and Listen.

Round

Rounds run once every 15 minutes (config: RoundTime) and does the following:

  1. Persist current peer endpoints and bans in the peer file (config: PersistFile)
  2. If there are more than 30 peers (config: DropTo), it randomly selects non-special peers to drop to reach 30 peers.
Replenish

The goal of Replenish is to reach 32 (config: TargetPeers) active connections. If there are 32 or more connections, Replenish waits. Otherwise, it runs once a second.

Once on startup, if the peer file is less than an hour old (config: PersistAge) Replenish will try to re-establish those connections first. This improves reconnection speeds after rebooting a node.

The first step is to pick a list of peers to connect to: If there are fewer than 10 (config: MinReseed) connections, replenish retrieves the peers from the seed file to connect to. Otherwise, it sends a Peer-Request message to a random peer in the connection pool. If it receives a response within 5 seconds, it will select one random peer from the provided list.

The second step is to dial the peers in the list. If a peer in the list rejects the connection with alternatives, the alternatives are added to the list. It dials to the list sequentially until either 32 connections are reached, the list is empty, or 4 connection attempts (working or failed) have been made.

Listen

When a new TCP connection arrives, the node checks if the IP is banned, if there are more than 36 (config: Incoming) connections, or (if conf.PeerIPLimitIncoming > 0) there are more than conf.PeerIPLimitIncoming connections from that specific IP. If any of those are true, the connection is rejected. Otherwise, it continues with a Handshake.

Peers that are rejected are given a list of 3 (conf: PeerShareAmount) random peers the node is connected to in a Reject-Alternative message.

Handshake

The handshake follows establishing a TCP connection. The "outgoing" handshake is performed by the node dialing into another node. The format of the Handshake struct is protocol-dependent but it contains the following information:

Name Type Description
Network NetworkID The network id of the network (ie, MainNet = 0xfeedbeef) (conf: Network)
Version uint16 The version of the protocol we want to use. (conf: ProtocolVersion)
Type ParcelType For V10 and up, this is either type "Handshake" (0x8) or "Reject with Alternatives" (0x9). For V9, this is "Peer Request" (0x3)
NodeID uint32 An application-defined value that can persist across restarts (conf: NodeID)
ListenPort string The port the node is defined to listen at (conf: ListenPort)
Loopback uint64 A unique nonce to detect loopback connections
Alternatives slice of Endpoints If the connection is rejected, a list of alternative endpoints to connect to
Outgoing Handshake
  1. Set a deadline of 10 seconds (conf: HandshakeTimeout)
  2. Select the desired protocol
  3. Encode the handshake data using the desired protocol
  4. Send the handshake
  5. Wait for a response
  6. Attempt to identify the encoding scheme and decode the first message as handshake
  7. Validate the handshake response to see if the network, loopback, and types are desired
  8. Use the protocol that matches the reply's encoding and version

If any step fails, the handshake is considered failed.

Incoming Handshake
  1. Set a deadline of 10 seconds (conf: HandshakeTimeout)
  2. Attempt to identify the encoding scheme and decode the first message as handshake
  3. Validate the handshake response to see if the network and types are desired
  4. (Optional) Propose an alternative protocol
  5. Create a handshake, copying the loopback value from 3.
  6. Send the handshake

If any step fails, the handshake is considered failed. In most cases, the node should use the same protocol that it received the handshake for. However, p2p1 nodes are unable to understand the newer protocol and will always send a message containing protocol version 9. In this case, nodes are expected to downgrade the protocol to V9.

9

Protocol 9 is the legacy (Factomd v6.6 and lower) protocol with the ability to split messages into parts disabled. V9 has the disadvantage of sending unwanted overhead with every message, namely Network, Version, Length, Address, Part info, NodeID, Address, Port. In the old p2p system this was used to post-load information but now has been shifted to the handshake.

Data is serialized via Golang's gob.

10

Protocol 10 is the slimmed down version of V9, containing only the Type, CRC32 of the payload, and the payload itself. Data is also serialized via Golang's gob. The handshake is encoded using V9's format.

11

Protocol 11 uses Protobuf (protocolV11.proto) to define the Handshake and message. To signal that the connection is used for V11, the 4-byte sequence 0x70627566 (ASCII for "pbuf") is transmitted first. Protobufs are transmitted by sending the size of the marshalled protobuf first, encoded as uint32 in Big Endian format, followed by the protobuf byte sequence itself.

V11 has a maximum parcel size of 128 Mebibytes.

Usage

Setting up a Network

In order to set up a network, you need two things: a network id, and a bootstrap file.

The network ID can be generated with p2p.NewNetworkID(string), with your preferred name as input. For example, "myNetwork" results in 0x29cb7175. There are also predefined networks, like p2p.MainNet that are used for Factom specific networks.

The bootstrap seed file contains the addresses of your seed nodes, the ones that every new node will attempt to connect to. Plaintext, one ip:port address per line. An example is Factom's mainnet seed file:

52.17.183.121:8108
52.17.153.126:8108
52.19.117.149:8108
52.18.72.212:8108
52.19.44.249:8108
52.214.189.110:8108
34.249.228.82:8108
34.248.202.6:8108
52.19.181.120:8108
34.248.6.133:8108
Connecting to a Network

First, you need to create the configuration:

config := p2p.DefaultP2PConfiguration()
config.Network = p2p.NewNetworkID("myNetwork")
config.SeedURL = "http://url/of/seed/file.txt"
config.PersistFile = "/path/to/peerfile.json"

The default values are derived from Factom's network and described in the Configuration file. The config.NodeID is a unique number tied to a node's ip and port. The same node should use the same NodeID between restarts, but two nodes running at the same time and using the same ip and listen port should have different NodeIDs. The latter is the case if you have multiple nodes behind a NAT connecting to a public network.

The config.PersistFile setting can be blank to not save peers and bans to disk. Enabling this makes a node able to restart the network faster and re-establish old connections.

Starting the Network

Once you have the config, the rest is easy.

network, err := p2p.NewNetwork(config)
if err != nil {
    // handle err, typically related to the peer file or unable to bind to a listen port
}

network.Run() // nonblocking, starts its own goroutines

You can start reading and writing to the network immediately, though no peers may be connected at first. You can check how many connections are established via network.Total().

Reading and Writing

To send an application message to the network, you need to create a Parcel with a target and a payload:

parcel := p2p.NewParcel(p2p.Broadcast, byteSequence)
network.ToNetwork.Send(parcel)

The target can be either a peer's hash, or one of the predefined flags of p2p.RandomPeer, p2p.Broadcast, or p2p.FullBroadcast. The functions of these are described in detail in the Lifecycle section "Parcel (Application -> Remote Node)". The p2p package is data agnostic and any interpretation of the byte sequence is left up to the application.

To read incoming Parcels:

for parcel := range network.FromNetwork.Reader() {
    // parcel.Address is the sender's peer hash
    // parcel.Payload is the application data
}

If you want to return a message to the sender, use the parcel's Address as the target of a new parcel.

Documentation

Index

Constants

View Source
const (
	// Broadcast sends a parcel to multiple peers (randomly selected based on fanout and special peers)
	Broadcast = "<BROADCAST>"
	// FullBroadcast sends a parcel to all peers
	FullBroadcast = "<FULLBORADCAST>"
	// RandomPeer sends a parcel to one randomly selected peer
	RandomPeer = "<RANDOMPEER>"
)
View Source
const V11MaxParcelSize = 134217728

V11MaxParcelSize limits the amount of ram allocated for a parcel to 128 MiB

Variables

View Source
var (
	ErrInvalidLengthProtocolV11        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowProtocolV11          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupProtocolV11 = fmt.Errorf("proto: unexpected end of group")
)
View Source
var V11Signature = []byte{0x70, 0x62, 0x75, 0x66} // ascii for "pbuf"

V11Signature is the 4-byte sequence that indicates the remote connection wants to use V11

Functions

func IP2Location

func IP2Location(addr string) (uint32, error)

IP2Location converts an ip address to a uint32

If the address is a hostmask, it attempts to resolve the address first

func IP2LocationQuick

func IP2LocationQuick(addr string) uint32

IP2LocationQuick converts an ip address to a uint32 without a hostmask lookup

func StringToUint32

func StringToUint32(input string) uint32

StringToUint32 hashes the input to generate a deterministic number representation

func WebScanner

func WebScanner(url string, f func(line string)) error

WebScanner is a wrapper that applies the closure f to the response body

Types

type Configuration

type Configuration struct {
	// Network is the NetworkID of the network to use, eg. MainNet, TestNet, etc
	Network NetworkID

	// NodeID is this node's id
	NodeID uint32
	// NodeName is the internal name of the node
	NodeName string

	// === Peer Management Settings ===
	// PeerRequestInterval dictates how often neighbors should be asked for an
	// updated peer list
	PeerRequestInterval time.Duration
	// PeerReseedInterval dictates how often the seed file should be accessed
	// to check for changes
	PeerReseedInterval time.Duration
	// PeerIPLimit specifies the maximum amount of peers to accept from a single
	// ip address
	// 0 for unlimited
	PeerIPLimitIncoming uint
	PeerIPLimitOutgoing uint

	// Special is a list of special peers, separated by comma. If no port is specified, the entire
	// ip is considered special
	Special string

	// PeerCacheFile is the filepath to the file to save peers. It is persisted in every CAT round
	PeerCacheFile string
	// PeerCacheAge is the maximum age of the peer file to try and bootstrap peers from
	PeerCacheAge time.Duration

	// PeerShareAmount is the number of peers we share with others peers when they request a peer share
	PeerShareAmount uint

	// PeerShareTimeout is the maximum time to wait for an asynchronous reply to a peer share
	PeerShareTimeout time.Duration

	// CAT Settings
	// How often to do cat rounds
	RoundTime time.Duration
	// Desired amount of peers
	TargetPeers uint
	// Hard cap of connections
	MaxPeers uint
	// Amount of peers to drop down to
	DropTo uint
	// Reseed if there are fewer than this peers
	MinReseed   uint
	MaxIncoming uint // maximum inbound connections, 0 <= MaxIncoming <= MaxPeers

	// Fanout controls how many random peers are selected for propagating messages
	// Higher values increase fault tolerance but also increase network congestion
	Fanout uint

	// SeedURL is the URL of the remote seed file
	SeedURL string // URL to a source of peer info

	// BindIP is the ip address to bind to for listening and connecting
	//
	// leave blank to bind to all
	BindIP string
	// ListenPort is the port to listen to incoming tcp connections on
	ListenPort string
	// ListenLimit is the lockout period of accepting connections from a single
	// ip after having a successful connection from that ip
	ListenLimit time.Duration

	// PingInterval dictates the maximum amount of time a connection can be
	// silent (no writes) before sending a Ping.
	// Values under one second are normalized to one second.
	PingInterval time.Duration

	// RedialInterval dictates how long to wait between connection attempts
	RedialInterval time.Duration

	// ManualBan is the duration to ban an address for when banned manually
	ManualBan time.Duration

	// HandshakeDeadline is the maximum acceptable time for an incoming conneciton
	// to send the first parcel after connecting
	HandshakeTimeout time.Duration
	DialTimeout      time.Duration

	// ReadDeadline is the maximum acceptable time to read a single parcel
	// if a connection takes longer, it is disconnected
	ReadDeadline time.Duration

	// WriteDeadline is the maximum acceptable time to send a single parcel
	// if a connection takes longer, it is disconnected
	WriteDeadline time.Duration

	ProtocolVersion uint16
	// ProtocolVersionMinimum is the earliest version this package supports
	ProtocolVersionMinimum uint16

	// ChannelCapacity dictates how large each peer's send channel is.
	// Should be large enough to accomodate bursts of traffic.
	ChannelCapacity uint

	EnablePrometheus bool // Enable prometheus logging. Disable if you run multiple instances

	// PeerResend turns on tracking of application parcels to prevent sending the same
	// application parcel to peers who already sent it to us
	PeerResendFilter bool
	// PeerResendBuckets controls the number of buckets to keep. The coverage of Resend messages
	// equals Buckets * time.Duration
	PeerResendBuckets int
	// PeerResendInterval controls how wide each bucket is
	PeerResendInterval time.Duration
}

Configuration defines the behavior of the gossip network protocol

func DefaultP2PConfiguration

func DefaultP2PConfiguration() (c Configuration)

DefaultP2PConfiguration returns a network configuration with base values These should be overwritten with command line and config parameters

func (Configuration) Check

func (c Configuration) Check() error

Check will return an error if there is a configuration value set in a way that would prevent the normal functions of a node

func (*Configuration) Sanitize

func (c *Configuration) Sanitize()

Sanitize automatically adjusts some variables that are dependent on others

type Dialer

type Dialer struct {
	// contains filtered or unexported fields
}

Dialer is a construct to throttle dialing and limit by attempts

func NewDialer

func NewDialer(bindTo string, interval, timeout time.Duration) (*Dialer, error)

NewDialer creates a new Dialer

func (*Dialer) Bind

func (d *Dialer) Bind(to string) error

func (*Dialer) CanDial

func (d *Dialer) CanDial(ep Endpoint) bool

CanDial checks if the given ip can be dialed yet

func (*Dialer) Dial

func (d *Dialer) Dial(ep Endpoint) (net.Conn, error)

Dial an ip. Returns the active TCP connection or error if it failed to connect

type Endpoint

type Endpoint struct {
	IP   string `json:"ip"`
	Port string `json:"port"`
}

func NewEndpoint

func NewEndpoint(ip, port string) (Endpoint, error)

NewEndpoint creates an Endpoint struct from a given ip and port, throws error if ip could not be resolved

func ParseEndpoint

func ParseEndpoint(s string) (Endpoint, error)

ParseEndpoint takes input in the form of "ip:port" and returns its IP

func (Endpoint) Equal

func (ep Endpoint) Equal(o Endpoint) bool

Equals returns true if both endpoints are the same

func (Endpoint) String

func (ep Endpoint) String() string

func (Endpoint) Valid

func (ep Endpoint) Valid() bool

Verify checks if the data is usable. Does not check if the remote address works

type Handshake

type Handshake struct {
	Network      NetworkID
	Version      uint16
	Type         ParcelType
	NodeID       uint32
	ListenPort   string
	Loopback     uint64
	Alternatives []Endpoint
}

Handshake is the protocol independent data that is required to authenticate a peer.

func (*Handshake) Valid

func (h *Handshake) Valid(conf *Configuration) error

Valid checks the Handshake's data against a configuration. Loopback is checked outside of this function.

type Info

type Info struct {
	Peers     int     // number of peers connected
	Receiving float64 // download rate in Messages/s
	Sending   float64 // upload rate in Messages/s
	Download  float64 // download rate in Bytes/s
	Upload    float64 // upload rate in Bytes/s
	Dropped   uint64  // number of parcels dropped due to low speed
}

Info holds the data that can be queried from the Network

type LimitedListener

type LimitedListener struct {
	// contains filtered or unexported fields
}

LimitedListener will block multiple connection attempts from a single ip within a specific timeframe

func NewLimitedListener

func NewLimitedListener(address string, limit time.Duration) (*LimitedListener, error)

NewLimitedListener initializes a new listener for the specified address (host:port) throttling incoming connections

func (*LimitedListener) Accept

func (ll *LimitedListener) Accept() (net.Conn, error)

Accept accepts a connection if no other connection attempt from that ip has been made in the specified time frame

func (*LimitedListener) Addr

func (ll *LimitedListener) Addr() net.Addr

Addr returns the address the listener is listening to

func (*LimitedListener) Close

func (ll *LimitedListener) Close()

Close closes the associated net.Listener

type MetricsReadWriter

type MetricsReadWriter struct {
	// contains filtered or unexported fields
}

MetricsReadWriter is a wrapper for net.Conn that allows the package to observe the actual amount of bytes passing through it

func NewMetricsReadWriter

func NewMetricsReadWriter(rw io.ReadWriter) *MetricsReadWriter

func (*MetricsReadWriter) Collect

func (sc *MetricsReadWriter) Collect() (mw uint64, mr uint64, bw uint64, br uint64)

func (*MetricsReadWriter) Read

func (sc *MetricsReadWriter) Read(p []byte) (int, error)

func (*MetricsReadWriter) Write

func (sc *MetricsReadWriter) Write(p []byte) (int, error)

type Network

type Network struct {
	// contains filtered or unexported fields
}

Network is the main access point for outside applications.

ToNetwork is the channel over which to send parcels to the network layer

FromNetwork is the channel that gets filled with parcels arriving from the network layer

func NewNetwork

func NewNetwork(conf Configuration) (*Network, error)

NewNetwork initializes a new network with the given configuration. The passed Configuration is copied and cannot be modified afterwards. Does not start the network automatically.

func (*Network) Ban

func (n *Network) Ban(hash string)

Ban removes a peer as well as any other peer from that address and prevents any connection being established for the amount of time set in the configuration (default one week)

func (*Network) BlockingSend

func (n *Network) BlockingSend(p *Parcel)

BlockingSend accepts a parcel and sends it to the appropriate parties. This function blocks after the queue fills up.

func (*Network) Disconnect

func (n *Network) Disconnect(hash string)

Disconnect severs connection for a specific peer. They are free to connect again afterward

func (*Network) GetInfo

func (n *Network) GetInfo() Info

func (*Network) GetPeerMetrics

func (n *Network) GetPeerMetrics() map[string]PeerMetrics

func (*Network) Reader

func (n *Network) Reader() <-chan *Parcel

Reader returns a read-only channel containing application parcels arriving from the network.

func (*Network) Rounds

func (n *Network) Rounds() int

Rounds returns the total number of CAT rounds that have occurred

func (*Network) Run

func (n *Network) Run() error

Run starts the network. Listens to incoming connections on the specified port and connects to other peers

func (*Network) Send

func (n *Network) Send(p *Parcel)

Send accepts a parcel and sends it to the appropriate parties. This function is non-blocking. If the network goes down, older messages are dropped first.

func (*Network) SetMetricsHook

func (n *Network) SetMetricsHook(f func(pm map[string]PeerMetrics))

SetMetricsHook allows you to read peer metrics. Gets called approximately once a second and transfers the metrics of all CONNECTED peers in the format "identifying hash" -> p2p.PeerMetrics

func (*Network) SetSpecial

func (n *Network) SetSpecial(raw string)

SetSpecial takes a set of ip addresses that should be treated as special. Network will always attempt to have a connection to a special peer. Format is a single line of ip addresses and ports, separated by semicolon, eg "127.0.0.1:8088;8.0.8.8:8088;192.168.0.1:8110"

func (*Network) Stop

func (n *Network) Stop() error

Stop shuts down the network Note that the network object will become unusable after it is stopped

func (*Network) Total

func (n *Network) Total() int

Total returns the number of active connections

type NetworkID

type NetworkID uint32

NetworkID represents the P2P network we are participating in (eg: test, nmain, etc.)

const (
	MainNet  NetworkID = 0xfeedbeef
	TestNet  NetworkID = 0xdeadbeef
	LocalNet NetworkID = 0xbeaded
)

NetworkID are specific uint32s to identify separate networks

The default identifiers are MainNet (the main production network), TestNet (for network=TESTNET) and LocalNet (for network=LOCAL).

Custom NetworkIDs (network=CUSTOM) are generated from the "customnet" command line flag

func NewNetworkID

func NewNetworkID(name string) NetworkID

NewNetworkID converts a string to a network id

func (NetworkID) String

func (n NetworkID) String() string

type Parcel

type Parcel struct {
	Address string // ? bytes - "" or nil for broadcast, otherwise the destination peer's hash.
	Payload []byte
	// contains filtered or unexported fields
}

Parcel is the raw data interface between the network, the p2p package, and the application.

Type indicates the network or application type. Messages routed to and from the application will only have application types

Address is a unique internal identifier for origin or target of the parcel. For messages from the network to the application, the address will the id of the sender. Messages intended to be returned to the sender should bear the same address.

There are three special address constants:

Broadcast: The message will be sent to multiple peers as specified by the fanout
FullBroadcast: The message will be sent to all peers
RandomPeer: The message will be sent to one peer picked at random

The payload is arbitrary data defined at application level

func NewParcel

func NewParcel(target string, payload []byte) *Parcel

NewParcel creates a new application message. The target should be either an identifier from a previous message, or one of the custom flags: Broadcast, BroadcastFull, RandomPeer

func (*Parcel) IsApplicationMessage

func (p *Parcel) IsApplicationMessage() bool

IsApplicationMessage checks if the message is intended for the application

func (*Parcel) String

func (p *Parcel) String() string

func (*Parcel) Valid

func (p *Parcel) Valid() error

Valid checks header for inconsistencies

type ParcelChannel

type ParcelChannel chan *Parcel

ParcelChannel is a channel that supports non-blocking sends

func (ParcelChannel) FillRatio

func (pc ParcelChannel) FillRatio() float64

FillRatio returns a percentage [0.0,1.0] of how full the channel is

func (ParcelChannel) Reader

func (pc ParcelChannel) Reader() <-chan *Parcel

Reader returns a read-only channel

func (ParcelChannel) Send

func (pc ParcelChannel) Send(parcel *Parcel) (bool, int)

Send a parcel along this channel. Non-blocking. If full, half of messages are dropped.

type ParcelType

type ParcelType uint16

ParcelType is a list of parcel types that this node understands

const (
	// TypeHeartbeat is deprecated
	TypeHeartbeat ParcelType = iota
	// TypePing is sent if no other parcels have been sent in a while
	TypePing
	// TypePong is a response to a Ping
	TypePong
	// TypePeerRequest indicates a peer wants to be be updated of endpoints
	TypePeerRequest
	// TypePeerResponse carries a payload with protocol specific endpoints
	TypePeerResponse
	// TypeAlert is deprecated
	TypeAlert
	// TypeMessage carries an application message in the payload
	TypeMessage
	// TypeMessagePart is a partial message. deprecated in p2p 2.0
	TypeMessagePart
	// TypeHandshake is the first parcel sent after making a connection
	TypeHandshake
	// TypeRejectAlternative is sent instead of a handshake if the server refuses connection
	TypeRejectAlternative
)

func (ParcelType) String

func (t ParcelType) String() string

type Peer

type Peer struct {

	// current state, read only "constants" after the handshake
	IsIncoming bool
	Endpoint   Endpoint
	Hash       string // This is more of a connection ID than hash right now.
	// contains filtered or unexported fields
}

Peer is an active connection to an endpoint in the network. Represents one lifetime of a connection and should not be restarted

func (*Peer) GetMetrics

func (p *Peer) GetMetrics() PeerMetrics

GetMetrics returns live metrics for this connection

func (*Peer) LastSendAge

func (p *Peer) LastSendAge() time.Duration

func (*Peer) Send

func (p *Peer) Send(parcel *Parcel)

func (*Peer) SendFillRatio

func (p *Peer) SendFillRatio() float64

SendFillRatio is a wrapper for the send channel's FillRatio

func (*Peer) Stop

func (p *Peer) Stop()

Stop disconnects the peer from its active connection

func (*Peer) String

func (p *Peer) String() string

type PeerCache

type PeerCache struct {
	Bans  map[string]time.Time `json:"bans"` // can be ip or ip:port
	Peers []Endpoint           `json:"peers"`
}

PeerCache is the object that gets json-marshalled and written to disk

func (*PeerCache) WriteToFile

func (pc *PeerCache) WriteToFile(path string) error

type PeerHashCache

type PeerHashCache struct {
	// contains filtered or unexported fields
}

PeerHashCache keeps a list of the last X hashes for a specified interval. In this package, it is used to keep track of which messages were sent by a specific peer. Each peer has their own instance of PeerHashCache.

func NewPeerHashCache

func NewPeerHashCache(buckets int, interval time.Duration) *PeerHashCache

NewPeerHashCache creates a new cache covering the time range of buckets * interval.

func (*PeerHashCache) Add

func (pr *PeerHashCache) Add(hash [sha1.Size]byte)

Add a hash to the cache.

func (*PeerHashCache) Has

func (pr *PeerHashCache) Has(hash [sha1.Size]byte) bool

Has returns true if the specified hash was seen.

func (*PeerHashCache) Stop

func (pr *PeerHashCache) Stop()

Stop the PeerHashCache. Panics if called twice.

type PeerMetrics

type PeerMetrics struct {
	Hash             string
	PeerAddress      string
	MomentConnected  time.Time
	PeerQuality      int32
	LastReceive      time.Time
	LastSend         time.Time
	MessagesSent     uint64
	BytesSent        uint64
	MessagesReceived uint64
	BytesReceived    uint64
	Incoming         bool
	PeerType         string
	ConnectionState  string
	MPSDown          float64
	MPSUp            float64
	BPSDown          float64
	BPSUp            float64
	SendFillRatio    float64
	Dropped          uint64
}

PeerMetrics is the data shared to the metrics hook

type PeerStore

type PeerStore struct {
	// contains filtered or unexported fields
}

PeerStore holds active Peers, managing them in a concurrency safe manner and providing lookup via various functions

func NewPeerStore

func NewPeerStore() *PeerStore

NewPeerStore initializes a new peer store

func (*PeerStore) Add

func (ps *PeerStore) Add(p *Peer) error

Add a peer to be managed. Returns an error if a peer with that hash is already tracked

func (*PeerStore) Connected

func (ps *PeerStore) Connected(ep Endpoint) bool

Connected returns if there is a peer from that specific endpoint

func (*PeerStore) Connections

func (ps *PeerStore) Connections(addr string) int

Connections tests whether there is a peer connected from a specified ip address

func (*PeerStore) Count

func (ps *PeerStore) Count(addr string) int

Count returns the amount of peers connected from a specified ip address

func (*PeerStore) Get

func (ps *PeerStore) Get(hash string) *Peer

Get retrieves a Peer with a specific hash, nil if it doesn't exist

func (*PeerStore) Incoming

func (ps *PeerStore) Incoming() int

Incoming is the amount of incoming peers connected

func (*PeerStore) Outgoing

func (ps *PeerStore) Outgoing() int

Outgoing is the amount of outgoing peers connected

func (*PeerStore) Remove

func (ps *PeerStore) Remove(p *Peer)

Remove a specific peer if it exists. This checks by pointer reference and not by hash. If you have two distinct peer instances (A and B) with the same hash and add A, removing B has no effect, even if they have the same values

func (*PeerStore) Slice

func (ps *PeerStore) Slice() []*Peer

Slice returns a slice of the current peers that is considered concurrency safe for reading operations. The slice should not be modified. Peers are randomly ordered

func (*PeerStore) Total

func (ps *PeerStore) Total() int

Total amount of peers connected

type Prometheus

type Prometheus struct {
	Connections      prometheus.Gauge
	Connecting       prometheus.Gauge
	Incoming         prometheus.Gauge
	Outgoing         prometheus.Gauge
	ToNetwork        prometheus.Gauge
	ToNetworkRatio   prometheus.Gauge
	FromNetwork      prometheus.Gauge
	FromNetworkRatio prometheus.Gauge
	CatRounds        prometheus.Counter

	SendRoutines    prometheus.Gauge
	ReceiveRoutines prometheus.Gauge

	ParcelsSent        prometheus.Counter
	ParcelsReceived    prometheus.Counter
	Invalid            prometheus.Counter
	AppSent            prometheus.Counter
	AppReceived        prometheus.Counter
	ByteRateDown       prometheus.Gauge
	ByteRateUp         prometheus.Gauge
	MessageRateDown    prometheus.Gauge
	MessageRateUp      prometheus.Gauge
	DroppedPeerSend    prometheus.Counter
	DroppedFromNetwork prometheus.Counter
	DroppedToNetwork   prometheus.Counter

	ParcelSize prometheus.Histogram
}

Prometheus holds all of the prometheus recording instruments

func (*Prometheus) Setup

func (p *Prometheus) Setup()

Setup registers all of the instruments with prometheus once

type Protocol

type Protocol interface {
	SendHandshake(*Handshake) error
	ReadHandshake() (*Handshake, error)
	Send(p *Parcel) error
	Receive() (*Parcel, error)
	MakePeerShare([]Endpoint) ([]byte, error)
	ParsePeerShare([]byte) ([]Endpoint, error)
	Version() uint16
	String() string
}

Protocol is the interface for reading and writing parcels to the underlying connection. The job of a protocol is to encode a Parcel and send it over TCP to another instance of the same Protocol on the other end

Send: Parcel => Protocol Encoder => Protocol Format => TCP Receive: TCP => Protocol Format => Protocol Decoder => Parcel

Peer Sharing creates the protocol specific payload for a TypePeerShare Parcel

type ProtocolV10

type ProtocolV10 struct {
	// contains filtered or unexported fields
}

ProtocolV10 is the protocol introduced by p2p 2.0. It is a slimmed down version of V9, reducing overhead

func (*ProtocolV10) MakePeerShare

func (v10 *ProtocolV10) MakePeerShare(share []Endpoint) ([]byte, error)

MakePeerShare serializes a list of ips via json

func (*ProtocolV10) ParsePeerShare

func (v10 *ProtocolV10) ParsePeerShare(payload []byte) ([]Endpoint, error)

ParsePeerShare parses a peer share payload

func (*ProtocolV10) ReadHandshake

func (v10 *ProtocolV10) ReadHandshake() (*Handshake, error)

ReadHandshake for v10 is using the identical format to V9 for backward compatibility. It can't be easily told apart without first decoding the message, so the code is only implemented in v9, then upgraded to V10 based on the values

func (*ProtocolV10) Receive

func (v10 *ProtocolV10) Receive() (*Parcel, error)

Receive converts a V10Msg back to a Parcel

func (*ProtocolV10) Send

func (v10 *ProtocolV10) Send(p *Parcel) error

Send encodes a Parcel as V10Msg, calculates the crc and encodes it as gob

func (*ProtocolV10) SendHandshake

func (v10 *ProtocolV10) SendHandshake(h *Handshake) error

func (*ProtocolV10) String

func (v10 *ProtocolV10) String() string

String 10

func (*ProtocolV10) Version

func (v10 *ProtocolV10) Version() uint16

Version 10

type ProtocolV11

type ProtocolV11 struct {
	// contains filtered or unexported fields
}

func (*ProtocolV11) MakePeerShare

func (v11 *ProtocolV11) MakePeerShare(ps []Endpoint) ([]byte, error)

func (*ProtocolV11) ParsePeerShare

func (v11 *ProtocolV11) ParsePeerShare(payload []byte) ([]Endpoint, error)

func (*ProtocolV11) ReadHandshake

func (v11 *ProtocolV11) ReadHandshake() (*Handshake, error)

func (*ProtocolV11) Receive

func (v11 *ProtocolV11) Receive() (*Parcel, error)

func (*ProtocolV11) Send

func (v11 *ProtocolV11) Send(p *Parcel) error

func (*ProtocolV11) SendHandshake

func (v11 *ProtocolV11) SendHandshake(hs *Handshake) error

func (*ProtocolV11) String

func (v11 *ProtocolV11) String() string

func (*ProtocolV11) Version

func (v11 *ProtocolV11) Version() uint16

type ProtocolV9

type ProtocolV9 struct {
	// contains filtered or unexported fields
}

ProtocolV9 is the legacy format of the old p2p package which sends Parcels over the wire using gob. The V9Msg struct is equivalent to the old package's "Parcel" and "ParcelHeader" structure

func (*ProtocolV9) MakePeerShare

func (v9 *ProtocolV9) MakePeerShare(ps []Endpoint) ([]byte, error)

MakePeerShare serializes the given endpoints to a V9Share encoded in json

func (*ProtocolV9) ParsePeerShare

func (v9 *ProtocolV9) ParsePeerShare(payload []byte) ([]Endpoint, error)

ParsePeerShare unserializes the json V9Share

func (*ProtocolV9) ReadHandshake

func (v9 *ProtocolV9) ReadHandshake() (*Handshake, error)

func (*ProtocolV9) Receive

func (v9 *ProtocolV9) Receive() (*Parcel, error)

Receive a parcel from the network. Blocking.

func (*ProtocolV9) Send

func (v9 *ProtocolV9) Send(p *Parcel) error

Send a parcel over the connection

func (*ProtocolV9) SendHandshake

func (v9 *ProtocolV9) SendHandshake(h *Handshake) error

SendHandshake sends out a v9 structured handshake transform handshake into peer request

func (*ProtocolV9) String

func (v9 *ProtocolV9) String() string

func (*ProtocolV9) Version

func (v9 *ProtocolV9) Version() uint16

Version of the protocol

type ReadWriteCollector

type ReadWriteCollector interface {
	io.Reader
	io.Writer
	StatsCollector
}

type StatsCollector

type StatsCollector interface {
	Collect() (mw uint64, mr uint64, bw uint64, br uint64)
}

type V10Msg

type V10Msg struct {
	Type    ParcelType
	Payload []byte
}

V10Msg is the barebone message

type V10Share

type V10Share Endpoint

V10Share is an alias of Endpoint

type V11Endpoint

type V11Endpoint struct {
	Host                 string   `protobuf:"bytes,1,opt,name=Host,proto3" json:"Host,omitempty"`
	Port                 string   `protobuf:"bytes,2,opt,name=Port,proto3" json:"Port,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*V11Endpoint) Descriptor

func (*V11Endpoint) Descriptor() ([]byte, []int)

func (*V11Endpoint) GetHost

func (m *V11Endpoint) GetHost() string

func (*V11Endpoint) GetPort

func (m *V11Endpoint) GetPort() string

func (*V11Endpoint) Marshal

func (m *V11Endpoint) Marshal() (dAtA []byte, err error)

func (*V11Endpoint) MarshalTo

func (m *V11Endpoint) MarshalTo(dAtA []byte) (int, error)

func (*V11Endpoint) MarshalToSizedBuffer

func (m *V11Endpoint) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*V11Endpoint) ProtoMessage

func (*V11Endpoint) ProtoMessage()

func (*V11Endpoint) Reset

func (m *V11Endpoint) Reset()

func (*V11Endpoint) Size

func (m *V11Endpoint) Size() (n int)

func (*V11Endpoint) String

func (m *V11Endpoint) String() string

func (*V11Endpoint) Unmarshal

func (m *V11Endpoint) Unmarshal(dAtA []byte) error

func (*V11Endpoint) XXX_DiscardUnknown

func (m *V11Endpoint) XXX_DiscardUnknown()

func (*V11Endpoint) XXX_Marshal

func (m *V11Endpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*V11Endpoint) XXX_Merge

func (m *V11Endpoint) XXX_Merge(src proto.Message)

func (*V11Endpoint) XXX_Size

func (m *V11Endpoint) XXX_Size() int

func (*V11Endpoint) XXX_Unmarshal

func (m *V11Endpoint) XXX_Unmarshal(b []byte) error

type V11Handshake

type V11Handshake struct {
	Type                 uint32         `protobuf:"varint,1,opt,name=Type,proto3" json:"Type,omitempty"`
	Network              uint32         `protobuf:"varint,2,opt,name=Network,proto3" json:"Network,omitempty"`
	Version              uint32         `protobuf:"varint,3,opt,name=Version,proto3" json:"Version,omitempty"`
	NodeID               uint32         `protobuf:"varint,4,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
	ListenPort           string         `protobuf:"bytes,5,opt,name=ListenPort,proto3" json:"ListenPort,omitempty"`
	Loopback             uint64         `protobuf:"varint,6,opt,name=Loopback,proto3" json:"Loopback,omitempty"`
	Alternatives         []*V11Endpoint `protobuf:"bytes,7,rep,name=Alternatives,proto3" json:"Alternatives,omitempty"`
	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
	XXX_unrecognized     []byte         `json:"-"`
	XXX_sizecache        int32          `json:"-"`
}

func (*V11Handshake) Descriptor

func (*V11Handshake) Descriptor() ([]byte, []int)

func (*V11Handshake) GetAlternatives

func (m *V11Handshake) GetAlternatives() []*V11Endpoint

func (*V11Handshake) GetListenPort

func (m *V11Handshake) GetListenPort() string

func (*V11Handshake) GetLoopback

func (m *V11Handshake) GetLoopback() uint64

func (*V11Handshake) GetNetwork

func (m *V11Handshake) GetNetwork() uint32

func (*V11Handshake) GetNodeID

func (m *V11Handshake) GetNodeID() uint32

func (*V11Handshake) GetType

func (m *V11Handshake) GetType() uint32

func (*V11Handshake) GetVersion

func (m *V11Handshake) GetVersion() uint32

func (*V11Handshake) Marshal

func (m *V11Handshake) Marshal() (dAtA []byte, err error)

func (*V11Handshake) MarshalTo

func (m *V11Handshake) MarshalTo(dAtA []byte) (int, error)

func (*V11Handshake) MarshalToSizedBuffer

func (m *V11Handshake) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*V11Handshake) ProtoMessage

func (*V11Handshake) ProtoMessage()

func (*V11Handshake) Reset

func (m *V11Handshake) Reset()

func (*V11Handshake) Size

func (m *V11Handshake) Size() (n int)

func (*V11Handshake) String

func (m *V11Handshake) String() string

func (*V11Handshake) Unmarshal

func (m *V11Handshake) Unmarshal(dAtA []byte) error

func (*V11Handshake) XXX_DiscardUnknown

func (m *V11Handshake) XXX_DiscardUnknown()

func (*V11Handshake) XXX_Marshal

func (m *V11Handshake) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*V11Handshake) XXX_Merge

func (m *V11Handshake) XXX_Merge(src proto.Message)

func (*V11Handshake) XXX_Size

func (m *V11Handshake) XXX_Size() int

func (*V11Handshake) XXX_Unmarshal

func (m *V11Handshake) XXX_Unmarshal(b []byte) error

type V11Msg

type V11Msg struct {
	Type                 uint32   `protobuf:"varint,1,opt,name=Type,proto3" json:"Type,omitempty"`
	Payload              []byte   `protobuf:"bytes,2,opt,name=Payload,proto3" json:"Payload,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*V11Msg) Descriptor

func (*V11Msg) Descriptor() ([]byte, []int)

func (*V11Msg) GetPayload

func (m *V11Msg) GetPayload() []byte

func (*V11Msg) GetType

func (m *V11Msg) GetType() uint32

func (*V11Msg) Marshal

func (m *V11Msg) Marshal() (dAtA []byte, err error)

func (*V11Msg) MarshalTo

func (m *V11Msg) MarshalTo(dAtA []byte) (int, error)

func (*V11Msg) MarshalToSizedBuffer

func (m *V11Msg) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*V11Msg) ProtoMessage

func (*V11Msg) ProtoMessage()

func (*V11Msg) Reset

func (m *V11Msg) Reset()

func (*V11Msg) Size

func (m *V11Msg) Size() (n int)

func (*V11Msg) String

func (m *V11Msg) String() string

func (*V11Msg) Unmarshal

func (m *V11Msg) Unmarshal(dAtA []byte) error

func (*V11Msg) XXX_DiscardUnknown

func (m *V11Msg) XXX_DiscardUnknown()

func (*V11Msg) XXX_Marshal

func (m *V11Msg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*V11Msg) XXX_Merge

func (m *V11Msg) XXX_Merge(src proto.Message)

func (*V11Msg) XXX_Size

func (m *V11Msg) XXX_Size() int

func (*V11Msg) XXX_Unmarshal

func (m *V11Msg) XXX_Unmarshal(b []byte) error

type V11Share

type V11Share struct {
	Share                []*V11Endpoint `protobuf:"bytes,1,rep,name=Share,proto3" json:"Share,omitempty"`
	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
	XXX_unrecognized     []byte         `json:"-"`
	XXX_sizecache        int32          `json:"-"`
}

func (*V11Share) Descriptor

func (*V11Share) Descriptor() ([]byte, []int)

func (*V11Share) GetShare

func (m *V11Share) GetShare() []*V11Endpoint

func (*V11Share) Marshal

func (m *V11Share) Marshal() (dAtA []byte, err error)

func (*V11Share) MarshalTo

func (m *V11Share) MarshalTo(dAtA []byte) (int, error)

func (*V11Share) MarshalToSizedBuffer

func (m *V11Share) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*V11Share) ProtoMessage

func (*V11Share) ProtoMessage()

func (*V11Share) Reset

func (m *V11Share) Reset()

func (*V11Share) Size

func (m *V11Share) Size() (n int)

func (*V11Share) String

func (m *V11Share) String() string

func (*V11Share) Unmarshal

func (m *V11Share) Unmarshal(dAtA []byte) error

func (*V11Share) XXX_DiscardUnknown

func (m *V11Share) XXX_DiscardUnknown()

func (*V11Share) XXX_Marshal

func (m *V11Share) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*V11Share) XXX_Merge

func (m *V11Share) XXX_Merge(src proto.Message)

func (*V11Share) XXX_Size

func (m *V11Share) XXX_Size() int

func (*V11Share) XXX_Unmarshal

func (m *V11Share) XXX_Unmarshal(b []byte) error

type V9Header

type V9Header struct {
	Network     NetworkID
	Version     uint16
	Type        ParcelType
	Length      uint32
	TargetPeer  string
	Crc32       uint32
	PartNo      uint16
	PartsTotal  uint16
	NodeID      uint64
	PeerAddress string
	PeerPort    string
	AppHash     string
	AppType     string
}

V9Header carries meta information about the parcel

type V9Msg

type V9Msg struct {
	Header  V9Header
	Payload []byte
}

V9Msg is the legacy format of protocol 9

func (V9Msg) Valid

func (msg V9Msg) Valid() error

Valid checks header for inconsistencies

type V9Share

type V9Share struct {
	QualityScore int32
	Address      string
	Port         string
	NodeID       uint64
	Hash         string
	Location     uint32
	Network      NetworkID
	Type         uint8
	Connections  int
	LastContact  time.Time
	Source       map[string]time.Time
}

V9Share is the legacy code's "Peer" struct. Resets QualityScore and Source list when decoding, filters out wrong Networks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL