libp2p

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 67 Imported by: 0

README

libp2p

Table of Contents

  1. Description
  2. Structure and Organisation
  3. Class Diagram
  4. Functionality
  5. Data Types
  6. Testing
  7. Proposed Functionality/Requirements
  8. References

Specification

Description

This package implements Network interface defined in root level network dir.

proposed Requirements

proposed: @sam

Peer discovery and handshake

Instead of keeping track of all the peers. Peers should only in touch with peers of their types in terms of network latency, resources, or uptime.

A reason for this is, if some low performing peer is with some high performing peers, and job is distributed among them, it can slow others peers as well overall.

Max number of handshake peers

Different nodes will have different requirements regarding the number of peers that they should remain handshaking with. e.g. a small node on a mobile network will not need to maintain a large list of peers. But, a node acting as network load balancer in a data center might need to maintain a large list of peers.

Filter list

We can have filters that ensures that the only peers that are handshaked with are ones that meet certain criteria. The following list is not exhaustive:

  1. Network latency. Have a set of fastest peers.
  2. Resource. Relates to job types.
  3. Uptime. Connect to peers who are online for certain period of time.
Network latency

For the network latency part, DMS should also be able to keep latency table between ongoing jobs on different CPs. The network package should be able to report it to the master node (SP). Orchestrator can then make decision on whether to replace workers or not.

Thoughts:

  • Filter peers at the time of discovery. Based on above parameters.
  • SP/orchestrator specifies what pool of CP it is looking for.
  • CP connects to same kind of CP.
  • Can use gossipsub.
Structure and Organisation

Here is quick overview of the contents of this pacakge:

  • README: Current file which is aimed towards developers who wish to use and modify the package functionality.

  • conn: This file defines the method to ping a peer.

  • dht: This file contains functionalities of a libp2p node. It includes functionalities for setting up libp2p hosts, performing pings between peers, fetching DHT content, checking the status of a peer and validating data against their signatures.

  • discover: This file contains methods for peer discovery in a libp2p node.

  • filter: This file defines functionalities for peer filtering and connection management in a libp2p node.

  • init: This file defines configurations and initialization logic for a libp2p node.

  • libp2p: This file defines stubs for Libp2p peer management functionalities, including configuration, initialization, events, status, stopping, cleanup, ping, and DHT dump.

  • p2p: This file defines a Libp2p node with core functionalities including discovery, peer management, DHT interaction, and communication channels, with several stub implementations.

Class Diagram

The class diagram for the libp2p sub-package is shown below.

Source file

libp2p Class Diagram

Rendered from source file
!$rootUrlGitlab = "https://gitlab.com/nunet/device-management-service/-/raw/main"
!$packageRelativePath = "/network/libp2p"
!$packageUrlGitlab = $rootUrlGitlab + $packageRelativePath
 
!include $packageUrlGitlab/specs/class_diagram.puml
Functionality

As soon as DMS starts, and if it is onboarded to the network, libp2p.RunNode is executed. This gets up entire thing related to libp2p. Let's run down through it to see what it does.

  1. RunNode calls NewHost. NewHost in itself does a lot of things. Let's dive into the NewHost:
  • It creates a connection manager. This defines what is the upper and lower limit of peers current peer will connect to.
  • It then defines a multiaddr filter which is used to deny discovering on local network. This was added to stop scanning local network in a data center.
  • NewHost then sets various options for the and passes it to libp2p.New to create a new host. Options such as NAT traversal is configured at this point.
  1. Getting back to other RunNode, it calls p2p.BootstrapNode(ctx). Bootstrapping basically is connecting to initial peers.

  2. Then the function continues setting up streams. Streams are bidirectional connection between peers. More on this in next section. Here is an example of setting up a stream handler on host for particular protocol:

host.SetStreamHandler(protocol.ID(DepReqProtocolID), depReqStreamHandler)
  1. After that, we have discoverPeers,
go p2p.StartDiscovery(ctx, utils.GetChannelName())
  1. After that, we have DHT update and get functions to store information about peer in peerstore.
Streams

Communication between libp2p peers, or more generally DMS happens using libp2p streams. A DMS can have one or many stream with one or more peer. We currently we have adopted following streams for our usecases.

  1. Ping

We can count this as internal to libp2p and is used for operational purposes. Unlike ICMP pings, libp2p pings works on streams, and is closed after the ping.

  1. Chat

A utility functionality to enable chat between peers.

  1. VPN

Most recent addition to DMS, where we send IP packets through libp2p stream.

  1. File Transfer

File transfer is generally used to carry files from one DMS to another. Most notably used to carry checkpoint files from a job from CP to SP.

  1. Deployment Request (DepReq)

Used for deployment of a job and for getting their progress.

Current DepReq Stream Handler

Each stream need to have a handler attached to it. Let's get to know more about deployment request stream handler. Deployment request handler handles incoming deployment request from the service provider side. Similarly, some function has to listen for update from the service provider side as well. More on that in the next in a minute.

Following is a sequence of event happening on compute provider side:

  1. Checks if InboundDepReqStream variable is set. And if it is, reply to service provider: DepReq open stream length exceeded. Currently we have only 1 job allowed per dep req stream.
  2. If above is not the case, we go and read from the stream. We are expecting a
  3. Now is the point to set the InboundDepReqStream to the incoming stream value.
  4. In unmarshal the incoming message into types.DeploymentRequest struct. If it can't, it informs the other party about the it.
  5. Otherwise, if everything is going well till now, we check the txHash value from the depReq. And make sure it exist on the blockchain before proceeding. If the txHash is not valid, or it timed out while waiting for validation, we let the other side know.
  6. Final thing we do it to put the depReq inside the DepReqQueue.

After this step, the command is handed over to executor module. Please refer to Relation between libp2p and docker modules.

Deployment Request stream handler can further be segmented into different message types:

MsgDepReq    = "DepReq"
MsgDepResp   = "DepResp"
MsgJobStatus = "JobStatus"
MsgLogStderr = "LogStderr"
MsgLogStdout = "LogStdout"

Above message types are used by various functions inside the stream. Last 4 or above is handled on the SP side. Further by the websocket server which started the deployment request. This does not means CP does not deals with them.

Relation between libp2p and docker modules

When DepReq streams receives a deployment request on the stream, it does some json validation, and pushes it to DepReqQueue. This extra step instead of directly passing the command to docker package was for decoupling and scalibility.

There is a messaging.DeploymentWorker() goroutine which is launched at DMS startup in dms.Run().

This messaging.DeploymentWorker() is the crux of the job deployment, as what is done in current proposed version of DMS. Based on executor type (currently firecracker and docker), it was passed to specific functions on different modules.

PeerFilter Interface

PeerFilter is an interface for filtering peers based on a specified criteria.

type PeerFilter interface {
	satisfies(p peer.AddrInfo) bool
}
Data Types
  • types.DeploymentResponse: DeploymentResponse is initial response from the Compute Provider (CP) to Service Provider (SP). It tells the SP that if deployment was successful or was declined due to operational or validational reasons. Most of the validation is just error check at stream handling or executor level.

  • types.DeploymentUpdate: DeploymentUpdate update is used to inform SP about the state of the job. Most of the update is handled using libp2p stream on network level and websocket on the user level. There is no REST API defined. This should change in next iteration. See the proposed section for this.

On the service provider side, we have DeploymentUpdateListener listening to the stream for any activity from the computer provider for update on the job.

Based on the message types, it does specific actions, which is more or less sending it to websocket client. These message types are MsgJobStatus, MsgDepResp, MsgLogStdout and MsgLogStderr

  • network.libp2p.DHTValidator: TBD
type DHTValidator struct {
	PS peerstore.Peerstore
}
  • network.libp2p.SelfPeer: TBD
type SelfPeer struct {
	ID    string
	Addrs []multiaddr.Multiaddr
}
  • network.libp2p.NoAddrIDFilter: filters out peers with no listening addresses // and a peer with a specific ID
type NoAddrIDFilter struct {
	ID peer.ID
}
  • network.libp2p.Libp2p: contains the configuration for a Libp2p instance
type Libp2p struct {
	Host   host.Host
	DHT    *dht.IpfsDHT
	PS     peerstore.Peerstore
	peers  []peer.AddrInfo
	config Libp2pConfig
}

type Libp2pConfig struct {
	PrivateKey     crypto.PrivKey
	ListenAddr     []string
	BootstrapPeers []multiaddr.Multiaddr
	Rendezvous     string
	Server         bool
	Scheduler      *bt.Scheduler
}
  • network.libp2p.Advertisement: TBD

type Advertisement struct { PeerID string json:"peer_id" Timestamp int64 json:"timestamp,omitempty" Data []byte json:"data" }

  • network.libp2p.OpenStream: TBD
type OpenStream struct {
	ID         int    `json:"id"`
	StreamID   string `json:"stream_id"`
	FromPeer   string `json:"from_peer"`
	TimeOpened string `json:"time_opened"`
}

Note: Data types are expected to change due to DMS refactoring

Testing

TBD

Proposed Functionality / Requirements
List of issues

All issues that are related to the implementation of network package can be found below. These include any proposals for modifications to the package or new data structures needed to cover the requirements of other packages.

References

Documentation

Index

Constants

View Source
const (
	MB = 1024 * 1024

	ValidationAccept = pubsub.ValidationAccept
	ValidationReject = pubsub.ValidationReject
	ValidationIgnore = pubsub.ValidationIgnore
)
View Source
const (
	IfaceMTU = 1420

	PacketExchangeProtocolID = "/dms/subnet/packet-exchange/0.0.1"
)

Variables

This section is empty.

Functions

func NewHost

func NewHost(ctx context.Context, config *types.Libp2pConfig, appScore func(p peer.ID) float64, scoreInspect pubsub.ExtendedPeerScoreInspectFn) (host.Host, *dht.IpfsDHT, *pubsub.PubSub, error)

NewHost returns a new libp2p host with dht and other related settings.

func PeerPassFilter

func PeerPassFilter(peers []peer.AddrInfo, pf PeerFilter) []peer.AddrInfo

Types

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry manages the registration of stream handlers for different protocols.

func NewHandlerRegistry

func NewHandlerRegistry(host host.Host) *HandlerRegistry

NewHandlerRegistry creates a new handler registry instance.

func (*HandlerRegistry) RegisterHandlerWithBytesCallback

func (r *HandlerRegistry) RegisterHandlerWithBytesCallback(
	messageType types.MessageType,
	s StreamHandler, handler func(data []byte, peerId peer.ID),
) error

RegisterHandlerWithBytesCallback registers a stream handler for a specific protocol and sends the bytes back to callback.

func (*HandlerRegistry) RegisterHandlerWithStreamCallback

func (r *HandlerRegistry) RegisterHandlerWithStreamCallback(messageType types.MessageType, handler StreamHandler) error

RegisterHandlerWithStreamCallback registers a stream handler for a specific protocol.

func (*HandlerRegistry) SendMessageToLocalHandler

func (r *HandlerRegistry) SendMessageToLocalHandler(messageType types.MessageType, data []byte, peerID peer.ID)

SendMessageToLocalHandler given the message type it sends data to the local handler found.

func (*HandlerRegistry) UnregisterHandler

func (r *HandlerRegistry) UnregisterHandler(messageType types.MessageType)

UnregisterHandler unregisters a stream handler for a specific protocol.

type Libp2p

type Libp2p struct {
	Host host.Host
	DHT  *dht.IpfsDHT
	PS   peerstore.Peerstore
	// contains filtered or unexported fields
}

Libp2p contains the configuration for a Libp2p instance.

TODO-suggestion: maybe we should call it something else like Libp2pPeer, Libp2pHost or just Peer (callers would use libp2p.Peer...)

func New

func New(config *types.Libp2pConfig, fs afero.Fs) (*Libp2p, error)

New creates a libp2p instance.

TODO-Suggestion: move types.Libp2pConfig to here for better readability. Unless there is a reason to keep within types.

func (*Libp2p) AcceptSubnetPeer

func (l *Libp2p) AcceptSubnetPeer(subnetID, peerID, ip string) error

func (*Libp2p) AddSubnetDNSRecords

func (l *Libp2p) AddSubnetDNSRecords(subnetID string, records map[string]string) error

func (*Libp2p) AddSubnetPeer

func (l *Libp2p) AddSubnetPeer(subnetID, peerID, ip string) error

func (*Libp2p) Advertise

func (l *Libp2p) Advertise(ctx context.Context, key string, data []byte) error

Advertise given data and a key pushes the data to the dht.

func (*Libp2p) BootstrapDHT

func (l *Libp2p) BootstrapDHT(ctx context.Context) error

Start dht bootstrapper

func (*Libp2p) ConnectToBootstrapNodes

func (l *Libp2p) ConnectToBootstrapNodes(ctx context.Context) error

Connect to Bootstrap nodes

func (*Libp2p) CreateSubnet

func (l *Libp2p) CreateSubnet(ctx context.Context, subnetID string, routingTable map[string]string) error

func (*Libp2p) DestroySubnet

func (l *Libp2p) DestroySubnet(subnetID string) error

func (*Libp2p) DiscoverDialPeers

func (l *Libp2p) DiscoverDialPeers(ctx context.Context) error

DiscoverDialPeers discovers peers using rendezvous point

func (*Libp2p) DumpDHTRoutingTable

func (l *Libp2p) DumpDHTRoutingTable() ([]kbucket.PeerInfo, error)

func (*Libp2p) GetBroadcastScore

func (l *Libp2p) GetBroadcastScore() map[peer.ID]*PeerScoreSnapshot

func (*Libp2p) GetHostID

func (l *Libp2p) GetHostID() PeerID

GetHostID returns the host ID.

func (*Libp2p) GetMultiaddr

func (l *Libp2p) GetMultiaddr() ([]multiaddr.Multiaddr, error)

GetMultiaddr returns the peer's multiaddr.

func (*Libp2p) GetPeerIP

func (l *Libp2p) GetPeerIP(p PeerID) string

GetPeerIP gets the ip of the peer from the peer store

func (*Libp2p) GetPeerPubKey

func (l *Libp2p) GetPeerPubKey(peerID PeerID) crypto.PubKey

GetPeerPubKey returns the public key for the given peerID.

func (*Libp2p) HandleMessage

func (l *Libp2p) HandleMessage(messageType string, handler func(data []byte, peerId peer.ID)) error

HandleMessage registers a stream handler for a specific protocol and sends bytes to handler func.

func (*Libp2p) Init

func (l *Libp2p) Init(cfg *config.Config) error

Init initializes a libp2p host with its dependencies.

func (*Libp2p) KnownPeers

func (l *Libp2p) KnownPeers() ([]peer.AddrInfo, error)

func (*Libp2p) MapPort

func (l *Libp2p) MapPort(subnetID, protocol, sourceIP, sourcePort, destIP, destPort string) error

func (*Libp2p) Notify

func (l *Libp2p) Notify(ctx context.Context, preconnected func(peer.ID, []protocol.ID, int), connected, disconnected func(peer.ID), identified, updated func(peer.ID, []protocol.ID)) error

func (*Libp2p) OpenStream

func (l *Libp2p) OpenStream(ctx context.Context, addr string, messageType types.MessageType) (network.Stream, error)

OpenStream opens a stream to a remote address and returns the stream for the caller to handle.

func (*Libp2p) PeerConnected

func (l *Libp2p) PeerConnected(p PeerID) bool

func (*Libp2p) Ping

func (l *Libp2p) Ping(ctx context.Context, peerIDAddress string, timeout time.Duration) (types.PingResult, error)

Ping the remote address. The remote address is the encoded peer id which will be decoded and used here.

TODO (Return error once): something that was confusing me when using this method is that the error is returned twice if any. Once as a field of PingResult and one as a return value.

func (*Libp2p) Publish

func (l *Libp2p) Publish(ctx context.Context, topic string, data []byte) error

Publish publishes data to a topic. The requirements are that only one topic handler should exist per topic.

func (*Libp2p) Query

func (l *Libp2p) Query(ctx context.Context, key string) ([]*commonproto.Advertisement, error)

Query return all the advertisements in the network related to a key. The network is queried to find providers for the given key, and peers which we aren't connected to can be retrieved.

func (*Libp2p) RegisterBytesMessageHandler

func (l *Libp2p) RegisterBytesMessageHandler(messageType types.MessageType, handler func(data []byte, peerId peer.ID)) error

RegisterBytesMessageHandler registers a stream handler for a specific protocol and sends bytes to handler func.

func (*Libp2p) RegisterStreamMessageHandler

func (l *Libp2p) RegisterStreamMessageHandler(messageType types.MessageType, handler StreamHandler) error

RegisterStreamMessageHandler registers a stream handler for a specific protocol.

func (*Libp2p) RemoveSubnetDNSRecord

func (l *Libp2p) RemoveSubnetDNSRecord(subnetID, name string) error

func (*Libp2p) RemoveSubnetPeer

func (l *Libp2p) RemoveSubnetPeer(subnetID, peerID, ip string) error

func (*Libp2p) ResolveAddress

func (l *Libp2p) ResolveAddress(ctx context.Context, id string) ([]string, error)

ResolveAddress resolves the address by given a peer id.

func (*Libp2p) SendMessage

func (l *Libp2p) SendMessage(ctx context.Context, hostID string, msg types.MessageEnvelope, expiry time.Time) error

SendMessage asynchronously sends a message to a peer

func (*Libp2p) SendMessageSync

func (l *Libp2p) SendMessageSync(ctx context.Context, hostID string, msg types.MessageEnvelope, expiry time.Time) error

SendMessageSync synchronously sends a message to a peer

func (*Libp2p) SetBroadcastAppScore

func (l *Libp2p) SetBroadcastAppScore(f func(peer.ID) float64)

func (*Libp2p) SetupBroadcastTopic

func (l *Libp2p) SetupBroadcastTopic(topic string, setup func(*Topic) error) error

func (*Libp2p) Start

func (l *Libp2p) Start() error

Start performs network bootstrapping, peer discovery and protocols handling.

func (*Libp2p) Stat

func (l *Libp2p) Stat() types.NetworkStats

Stat returns the status about the libp2p network.

func (*Libp2p) Stop

func (l *Libp2p) Stop() error

Stop performs a cleanup of any resources used in this package.

func (*Libp2p) Subscribe

func (l *Libp2p) Subscribe(ctx context.Context, topic string, handler func(data []byte), validator Validator) (uint64, error)

Subscribe subscribes to a topic and sends the messages to the handler.

func (*Libp2p) Unadvertise

func (l *Libp2p) Unadvertise(ctx context.Context, key string) error

Unadvertise removes the data from the dht.

func (*Libp2p) UnmapPort

func (l *Libp2p) UnmapPort(subnetID, protocol, sourceIP, sourcePort, destIP, destPort string) error

func (*Libp2p) UnregisterMessageHandler

func (l *Libp2p) UnregisterMessageHandler(messageType string)

UnregisterMessageHandler unregisters a stream handler for a specific protocol.

func (*Libp2p) Unsubscribe

func (l *Libp2p) Unsubscribe(topic string, subID uint64) error

Unsubscribe cancels the subscription to a topic

func (*Libp2p) VisiblePeers

func (l *Libp2p) VisiblePeers() []peer.AddrInfo

type NoAddrIDFilter

type NoAddrIDFilter struct {
	ID peer.ID
}

NoAddrIDFilter filters out peers with no listening addresses and a peer with a specific ID

type PeerFilter

type PeerFilter interface {
	// contains filtered or unexported methods
}

PeerFilter is an interface for filtering peers satisfaction of filter criteria allows the peer to pass

type PeerID

type PeerID = peer.ID

type PeerScoreSnapshot

type PeerScoreSnapshot = pubsub.PeerScoreSnapshot

type ProtocolID

type ProtocolID = protocol.ID

type PubSub

type PubSub = pubsub.PubSub

type StreamHandler

type StreamHandler func(stream network.Stream)

StreamHandler is a function type that processes data from a stream.

type SubnetRoutingTable

type SubnetRoutingTable interface {
	Add(peerID peer.ID, addr string)
	Remove(peerID peer.ID, ip string)
	Get(peerID peer.ID) ([]string, bool)
	RemoveByIP(addr string)
	GetByIP(addr string) (peer.ID, bool)
	All() map[peer.ID][]string
	Clear()
}

func NewRoutingTable

func NewRoutingTable() SubnetRoutingTable

type Topic

type Topic = pubsub.Topic

type ValidationResult

type ValidationResult = pubsub.ValidationResult

type Validator

type Validator func([]byte, interface{}) (ValidationResult, interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL