swarm

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: MIT Imports: 27 Imported by: 0

README

go-libp2p-swarm

Coverage Status Travis CI Discourse posts

The libp2p swarm manages groups of connections to peers, and handles incoming and outgoing streams.

The libp2p swarm is the 'low level' interface for working with a given libp2p network. It gives you more fine grained control over various aspects of the system. Most applications don't need this level of access, so the Swarm is generally wrapped in a Host abstraction that provides a more friendly interface. See the host interface for more info on that.

Table of Contents

Install

make install

Usage

Creating a swarm

To construct a swarm, you'll be calling NewSwarm. That function looks like this:

swarm, err := NewSwarm(ctx, laddrs, pid, pstore, bwc)

It takes five items to fully construct a swarm, the first is a go context.Context. This controls the lifetime of the swarm, and all swarm processes have their lifespan derived from the given context. You can just use context.Background() if you're not concerned with that.

The next argument is an array of multiaddrs that the swarm will open up listeners for. Once started, the swarm will start accepting and handling incoming connections on every given address. This argument is optional, you can pass nil and the swarm will not listen for any incoming connections (but will still be able to dial out to other peers).

After that, you'll need to give the swarm an identity in the form of a peer.ID. If you're not wanting to enable secio (libp2p's transport layer encryption), then you can pick any string for this value. For example peer.ID("FooBar123") would work. Note that passing a random string ID will result in your node not being able to communicate with other peers that have correctly generated IDs. To see how to generate a proper ID, see the below section on "Identity Generation".

The fourth argument is a peerstore. This is essentially a database that the swarm will use to store peer IDs, addresses, public keys, protocol preferences and more. You can construct one by importing github.com/libp2p/go-libp2p-peerstore and calling peerstore.NewPeerstore().

The final argument is a bandwidth metrics collector, This is used to track incoming and outgoing bandwidth on connections managed by this swarm. It is optional, and passing nil will simply result in no metrics for connections being available.

Identity Generation

A proper libp2p identity is PKI based. We currently have support for RSA and ed25519 keys. To create a 'correct' ID, you'll need to either load or generate a new keypair. Here is an example of doing so:

import (
	"fmt"
	"crypto/rand"

	ci "github.com/libp2p/go-libp2p-crypto"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	peer "github.com/libp2p/go-libp2p-peer"
)

func demo() {
	// First, select a source of entropy. We're using the stdlib's crypto reader here
	src := rand.Reader

	// Now create a 2048 bit RSA key using that
	priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 2048, src)
	if err != nil {
		panic(err) // oh no!
	}

	// Now that we have a keypair, lets create our identity from it
	pid, err := peer.IDFromPrivateKey(priv)
	if err != nil {
		panic(err)
	}

	// Woo! Identity acquired!
	fmt.Println("I am ", pid)

	// Now, for the purposes of building a swarm, lets add this all to a peerstore.
	ps := pstore.NewPeerstore()
	ps.AddPubKey(pid, pub)
	ps.AddPrivKey(pid, priv)

	// Once you've got all that, creating a basic swarm can be as easy as
	ctx := context.Background()
	swarm, err := NewSwarm(ctx, nil, pid, ps, nil)

	// voila! A functioning swarm!
}
Streams

The swarm is designed around using multiplexed streams to communicate with other peers. When working with a swarm, you will want to set a function to handle incoming streams from your peers:

swrm.SetStreamHandler(func(s inet.Stream) {
	defer s.Close()
	fmt.Println("Got a stream from: ", s.SwarmConn().RemotePeer())
	fmt.Fprintln(s, "Hello Friend!")
})

Tip: Always make sure to close streams when you're done with them.

Opening streams is also pretty simple:

s, err := swrm.NewStreamWithPeer(ctx, rpid)
if err != nil {
	panic(err)
}
defer s.Close()

io.Copy(os.Stdout, s) // pipe the stream to stdout

Just pass a context and the ID of the peer you want a stream to, and you'll get back a stream to read and write on.

Contribute

PRs are welcome!

Small note: If editing the Readme, please conform to the standard-readme specification.

License

MIT © Jeromy Johnson


The last gx published version of this module was: 3.0.35: QmQVoMEL1CxrVusTSUdYsiJXVBnvSqNUpBsGybkwSfksEF

Documentation

Index

Constants

View Source
const ConcurrentFdDials = 160

ConcurrentFdDials is the number of concurrent outbound dials over transports that consume file descriptors

View Source
const DefaultPerPeerRateLimit = 8

DefaultPerPeerRateLimit is the number of concurrent outbound dials to make per peer

View Source
const DialAttempts = 1

DialAttempts governs how many times a goroutine will try to dial a given peer. Note: this is down to one, as we have _too many dials_ atm. To add back in, add loop back in Dial(.)

Variables

View Source
var (
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
	ErrDialBackoff = errors.New("dial backoff")

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")

	// ErrAllDialsFailed is returned when connecting to a peer has ultimately failed
	ErrAllDialsFailed = errors.New("all dials failed")

	// ErrNoAddresses is returned when we fail to find any addresses for a
	// peer we're trying to dial.
	ErrNoAddresses = errors.New("no addresses")

	// ErrNoGoodAddresses is returned when we find addresses for a peer but
	// can't use any of them.
	ErrNoGoodAddresses = errors.New("no good addresses")

	// ErrGaterDisallowedConnection is returned when the gater prevents us from
	// forming a connection with a peer.
	ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer")
)
View Source
var BackoffBase = time.Second * 5

BackoffBase is the base amount of time to backoff (default: 5s).

View Source
var BackoffCoef = time.Second

BackoffCoef is the backoff coefficient (default: 1s).

View Source
var BackoffMax = time.Minute * 5

BackoffMax is the maximum backoff time (default: 5m).

View Source
var DialTimeoutLocal = 5 * time.Second

DialTimeoutLocal is the maximum duration a Dial to local network address is allowed to take. This includes the time between dialing the raw network connection, protocol selection as well the handshake, if applicable.

View Source
var ErrAddrFiltered = errors.New("address filtered")

ErrAddrFiltered is returned when trying to register a connection to a filtered address. You shouldn't see this error unless some underlying transport is misbehaving.

View Source
var ErrConnClosed = errors.New("connection closed")

ErrConnClosed is returned when operating on a closed connection.

View Source
var ErrDialTimeout = errors.New("dial timed out")

ErrDialTimeout is returned when one a dial times out due to the global timeout

View Source
var ErrSwarmClosed = errors.New("swarm closed")

ErrSwarmClosed is returned when one attempts to operate on a closed swarm.

Functions

This section is empty.

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn is the connection type used by swarm. In general, you won't use this type directly.

func (*Conn) Close

func (c *Conn) Close() error

Close closes this connection.

Note: This method won't wait for the close notifications to finish as that would create a deadlock when called from an open notification (because all open notifications must finish before we can fire off the close notifications).

func (*Conn) GetStreams

func (c *Conn) GetStreams() []network.Stream

GetStreams returns the streams associated with this connection.

func (*Conn) ID

func (c *Conn) ID() string

func (*Conn) LocalMultiaddr

func (c *Conn) LocalMultiaddr() ma.Multiaddr

LocalMultiaddr is the Multiaddr on this side

func (*Conn) LocalPeer

func (c *Conn) LocalPeer() peer.ID

LocalPeer is the Peer on our side of the connection

func (*Conn) LocalPrivateKey

func (c *Conn) LocalPrivateKey() ic.PrivKey

LocalPrivateKey is the public key of the peer on this side

func (*Conn) NewStream

func (c *Conn) NewStream() (network.Stream, error)

NewStream returns a new Stream from this connection

func (*Conn) RemoteMultiaddr

func (c *Conn) RemoteMultiaddr() ma.Multiaddr

RemoteMultiaddr is the Multiaddr on the remote side

func (*Conn) RemotePeer

func (c *Conn) RemotePeer() peer.ID

RemotePeer is the Peer on the remote side

func (*Conn) RemotePublicKey

func (c *Conn) RemotePublicKey() ic.PubKey

RemotePublicKey is the public key of the peer on the remote side

func (*Conn) Stat

func (c *Conn) Stat() network.Stat

Stat returns metadata pertaining to this connection

func (*Conn) String

func (c *Conn) String() string

type DialBackoff

type DialBackoff struct {
	// contains filtered or unexported fields
}

DialBackoff is a type for tracking peer dial backoffs.

* It's safe to use its zero value. * It's thread-safe. * It's *not* safe to move this type after using.

func (*DialBackoff) AddBackoff

func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr)

AddBackoff lets other nodes know that we've entered backoff with peer p, so dialers should not wait unnecessarily. We still will attempt to dial with one goroutine, in case we get through.

Backoff is not exponential, it's quadratic and computed according to the following formula:

BackoffBase + BakoffCoef * PriorBackoffs^2

Where PriorBackoffs is the number of previous backoffs.

func (*DialBackoff) Backoff

func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool)

Backoff returns whether the client should backoff from dialing peer p at address addr

func (*DialBackoff) Clear

func (db *DialBackoff) Clear(p peer.ID)

Clear removes a backoff record. Clients should call this after a successful Dial.

type DialError

type DialError struct {
	Peer       peer.ID
	DialErrors []TransportError
	Cause      error
	Skipped    int
}

DialError is the error type returned when dialing.

func (*DialError) Error

func (e *DialError) Error() string

func (*DialError) Timeout

func (e *DialError) Timeout() bool

func (*DialError) Unwrap

func (e *DialError) Unwrap() error

Unwrap implements https://godoc.org/golang.org/x/xerrors#Wrapper.

type DialFunc

type DialFunc func(context.Context, peer.ID) (*Conn, error)

DialFunc is the type of function expected by DialSync.

type DialSync

type DialSync struct {
	// contains filtered or unexported fields
}

DialSync is a dial synchronization helper that ensures that at most one dial to any given peer is active at any given time.

func NewDialSync

func NewDialSync(dfn DialFunc) *DialSync

NewDialSync constructs a new DialSync

func (*DialSync) CancelDial

func (ds *DialSync) CancelDial(p peer.ID)

CancelDial cancels all in-progress dials to the given peer.

func (*DialSync) DialLock

func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error)

DialLock initiates a dial to the given peer if there are none in progress then waits for the dial to that peer to complete.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is the stream type used by swarm. In general, you won't use this type directly.

func (*Stream) Close

func (s *Stream) Close() error

Close closes the stream, indicating this side is finished with the stream.

func (*Stream) Conn

func (s *Stream) Conn() network.Conn

Conn returns the Conn associated with this stream, as an network.Conn

func (*Stream) ID

func (s *Stream) ID() string

func (*Stream) Protocol

func (s *Stream) Protocol() protocol.ID

Protocol returns the protocol negotiated on this stream (if set).

func (*Stream) Read

func (s *Stream) Read(p []byte) (int, error)

Read reads bytes from a stream.

func (*Stream) Reset

func (s *Stream) Reset() error

Reset resets the stream, closing both ends.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines for this stream.

func (*Stream) SetProtocol

func (s *Stream) SetProtocol(p protocol.ID)

SetProtocol sets the protocol for this stream.

This doesn't actually *do* anything other than record the fact that we're speaking the given protocol over this stream. It's still up to the user to negotiate the protocol. This is usually done by the Host.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline for this stream.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline for this stream.

func (*Stream) Stat

func (s *Stream) Stat() network.Stat

Stat returns metadata information for this stream.

func (*Stream) String

func (s *Stream) String() string

func (*Stream) Write

func (s *Stream) Write(p []byte) (int, error)

Write writes bytes to a stream, flushing for each call.

type Swarm

type Swarm struct {
	// contains filtered or unexported fields
}

Swarm is a connection muxer, allowing connections to other peers to be opened and closed, while still using the same Chan for all communication. The Chan sends/receives Messages, which note the destination or source Peer.

func NewSwarm

func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm

NewSwarm constructs a Swarm.

NOTE: go-libp2p will be moving to dependency injection soon. The variadic `extra` interface{} parameter facilitates the future migration. Supported elements are:

  • connmgr.ConnectionGater

func (*Swarm) AddListenAddr

func (s *Swarm) AddListenAddr(a ma.Multiaddr) error

AddListenAddr tells the swarm to listen on a single address. Unlike Listen, this method does not attempt to filter out bad addresses.

func (*Swarm) AddTransport

func (s *Swarm) AddTransport(t transport.Transport) error

AddTransport adds a transport to this swarm.

Satisfies the Network interface from go-libp2p-transport.

func (*Swarm) Backoff

func (s *Swarm) Backoff() *DialBackoff

Backoff returns the DialBackoff object for this swarm.

func (*Swarm) Close

func (s *Swarm) Close() error

Close stops the Swarm.

func (*Swarm) ClosePeer

func (s *Swarm) ClosePeer(p peer.ID) error

ClosePeer closes all connections to the given peer.

func (*Swarm) ConnHandler

func (s *Swarm) ConnHandler() network.ConnHandler

ConnHandler gets the handler for new connections.

func (*Swarm) Connectedness

func (s *Swarm) Connectedness(p peer.ID) network.Connectedness

Connectedness returns our "connectedness" state with the given peer.

To check if we have an open connection, use `s.Connectedness(p) == network.Connected`.

func (*Swarm) Conns

func (s *Swarm) Conns() []network.Conn

Conns returns a slice of all connections.

func (*Swarm) ConnsToPeer

func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn

ConnsToPeer returns all the live connections to peer.

func (*Swarm) Context

func (s *Swarm) Context() context.Context

Context returns the context of the swarm

func (*Swarm) DialPeer

func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error)

DialPeer connects to a peer.

The idea is that the client of Swarm does not need to know what network the connection will happen over. Swarm can use whichever it choses. This allows us to use various transport protocols, do NAT traversal/relay, etc. to achieve connection.

func (*Swarm) InterfaceListenAddresses

func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error)

InterfaceListenAddresses returns a list of addresses at which this swarm listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to use the known local interfaces.

func (*Swarm) IsFdConsumingAddr

func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool

TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport. This function checks if any of the transport protocols in the address requires a file descriptor. For now: A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming. For a circuit-relay address, we look at the address of the relay server/proxy and use the same logic as above to decide.

func (*Swarm) Listen

func (s *Swarm) Listen(addrs ...ma.Multiaddr) error

Listen sets up listeners for all of the given addresses. It returns as long as we successfully listen on at least *one* address.

func (*Swarm) ListenAddresses

func (s *Swarm) ListenAddresses() []ma.Multiaddr

ListenAddresses returns a list of addresses at which this swarm listens.

func (*Swarm) LocalPeer

func (s *Swarm) LocalPeer() peer.ID

LocalPeer returns the local peer swarm is associated to.

func (*Swarm) NewStream

func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error)

NewStream creates a new stream on any available connection to peer, dialing if necessary.

func (*Swarm) Notify

func (s *Swarm) Notify(f network.Notifiee)

Notify signs up Notifiee to receive signals when events happen

func (*Swarm) Peers

func (s *Swarm) Peers() []peer.ID

Peers returns a copy of the set of peers swarm is connected to.

func (*Swarm) Peerstore

func (s *Swarm) Peerstore() peerstore.Peerstore

Peerstore returns this swarms internal Peerstore.

func (*Swarm) Process

func (s *Swarm) Process() goprocess.Process

Process returns the Process of the swarm

func (*Swarm) SetConnHandler

func (s *Swarm) SetConnHandler(handler network.ConnHandler)

SetConnHandler assigns the handler for new connections. You will rarely use this. See SetStreamHandler

func (*Swarm) SetStreamHandler

func (s *Swarm) SetStreamHandler(handler network.StreamHandler)

SetStreamHandler assigns the handler for new streams.

func (*Swarm) StopNotify

func (s *Swarm) StopNotify(f network.Notifiee)

StopNotify unregisters Notifiee fromr receiving signals

func (*Swarm) StreamHandler

func (s *Swarm) StreamHandler() network.StreamHandler

StreamHandler gets the handler for new streams.

func (*Swarm) String

func (s *Swarm) String() string

String returns a string representation of Network.

func (*Swarm) TransportForDialing

func (s *Swarm) TransportForDialing(a ma.Multiaddr) transport.Transport

TransportForDialing retrieves the appropriate transport for dialing the given multiaddr.

func (*Swarm) TransportForListening

func (s *Swarm) TransportForListening(a ma.Multiaddr) transport.Transport

TransportForListening retrieves the appropriate transport for listening on the given multiaddr.

type TransportError

type TransportError struct {
	Address ma.Multiaddr
	Cause   error
}

TransportError is the error returned when dialing a specific address.

func (*TransportError) Error

func (e *TransportError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL