p2pv2

package
v0.0.0-...-56f3d1a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2024 License: Apache-2.0 Imports: 46 Imported by: 6

README

p2pv2

p2pv2组件实现。

Documentation

Index

Constants

View Source
const ID = "/tls/1.0.0"

ID is the protocol ID (used when negotiating with multistream)

View Source
const (
	ServerName = "p2pv2"
)

Variables

View Source
var (
	ErrEmptyPeer  = errors.New("empty peer")
	ErrNoResponse = errors.New("no response")
)
View Source
var (

	// MaxBroadCastPeers define the maximum number of common peers to broadcast messages
	MaxBroadCastPeers = 20

	DefaultBucketsFilterFactor = 0.5
)
View Source
var (
	ErrGenerateOpts     = errors.New("generate host opts error")
	ErrCreateHost       = errors.New("create host error")
	ErrBindAddress      = errors.New("bind address error")
	ErrCreateKadDht     = errors.New("create kad dht error")
	ErrCreateStreamPool = errors.New("create stream pool error")
	ErrCreateBootStrap  = errors.New("create bootstrap error pool error")
	ErrConnectBootStrap = errors.New("error to connect to all bootstrap")
	ErrLoadAccount      = errors.New("load account error")
	ErrConnect          = errors.New("connect all boot and static peer error")
)

define errors

View Source
var (
	ErrNewStream       = errors.New("new stream error")
	ErrStreamNotValid  = errors.New("stream not valid")
	ErrNoneMessageType = errors.New("none message type")
)

define common errors

View Source
var (
	ErrStreamPoolFull = errors.New("stream pool is full")
)

define common errors

Functions

func GenAccountKey

func GenAccountKey(account string) string

func GenPeerIDKey

func GenPeerIDKey(id peer.ID) string

func Key

func Key(account string) string

func NewP2PServerV2

func NewP2PServerV2() p2p.Server

NewP2PServerV2 create P2PServerV2 instance

func NewTLS

func NewTLS(path, serviceName string) func(key crypto.PrivKey) (*Transport, error)

Types

type BucketsFilter

type BucketsFilter struct {
	// contains filtered or unexported fields
}

BucketsFilter define filter that get all peers in buckets

func (*BucketsFilter) Filter

func (bf *BucketsFilter) Filter() ([]peer.ID, error)

Filter 依据Bucket分层广播

type BucketsFilterWithFactor

type BucketsFilterWithFactor struct {
	// contains filtered or unexported fields
}

BucketsFilterWithFactor define filter that get a certain percentage peers in each bucket

func (*BucketsFilterWithFactor) Filter

func (nf *BucketsFilterWithFactor) Filter() ([]peer.ID, error)

Filter 从每个Bucket中挑选占比Factor个peers进行广播 对于每一个Bucket,平均分成若干块,每个块抽取若干个节点

*|<---------------- Bucket ---------------->|
*--------------------------------------------
*|        |        |        |        |      |
*--------------------------------------------
*       split1   split2    split3   split4 split5

type MultiStrategy

type MultiStrategy struct {
	// contains filtered or unexported fields
}

MultiStrategy a peer filter that contains multiple filters

func NewMultiStrategy

func NewMultiStrategy(filters []PeerFilter, peerIDs []peer.ID) *MultiStrategy

NewMultiStrategy create instance of MultiStrategy

func (*MultiStrategy) Filter

func (cp *MultiStrategy) Filter() ([]peer.ID, error)

Filter return peer IDs with multiple filters

type NearestBucketFilter

type NearestBucketFilter struct {
	// contains filtered or unexported fields
}

NearestBucketFilter define filter that get nearest peers from a specified peer ID

func (*NearestBucketFilter) Filter

func (nf *NearestBucketFilter) Filter() ([]peer.ID, error)

Filter 广播给最近的Bucket

type P2PServerV2

type P2PServerV2 struct {
	// contains filtered or unexported fields
}

P2PServerV2 is the node in the network

func (*P2PServerV2) Context

func (p *P2PServerV2) Context() *netCtx.NetCtx

func (*P2PServerV2) GetPeerIdByAccount

func (p *P2PServerV2) GetPeerIdByAccount(account string) (peer.ID, error)

func (*P2PServerV2) HandleMessage

func (p *P2PServerV2) HandleMessage(stream p2p.Stream, msg *pb.XuperMessage) error

func (*P2PServerV2) Init

func (p *P2PServerV2) Init(ctx *netCtx.NetCtx) error

Init initialize p2p server using given config

func (*P2PServerV2) NewSubscriber

func (p *P2PServerV2) NewSubscriber(typ pb.XuperMessage_MessageType, v interface{}, opts ...p2p.SubscriberOption) p2p.Subscriber

func (*P2PServerV2) PeerID

func (p *P2PServerV2) PeerID() string

PeerID return the peer ID

func (*P2PServerV2) PeerInfo

func (p *P2PServerV2) PeerInfo() pb.PeerInfo

func (*P2PServerV2) Register

func (p *P2PServerV2) Register(sub p2p.Subscriber) error

Register register message subscriber to handle messages

func (*P2PServerV2) SendMessage

func (p *P2PServerV2) SendMessage(ctx xctx.XContext, msg *pb.XuperMessage,
	optFunc ...p2p.OptionFunc) error

SendMessage send message to peers using given filter strategy

func (*P2PServerV2) SendMessageWithResponse

func (p *P2PServerV2) SendMessageWithResponse(ctx xctx.XContext, msg *pb.XuperMessage,
	optFunc ...p2p.OptionFunc) ([]*pb.XuperMessage, error)

SendMessageWithResponse send message to peers using given filter strategy, expect response from peers 客户端再使用该方法请求带返回的消息时,最好带上log_id, 否则会导致收消息时收到不匹配的消息而影响后续的处理

func (*P2PServerV2) Start

func (p *P2PServerV2) Start()

Start start the node

func (*P2PServerV2) Stop

func (p *P2PServerV2) Stop()

Stop stop the node

func (*P2PServerV2) UnRegister

func (p *P2PServerV2) UnRegister(sub p2p.Subscriber) error

UnRegister remove message subscriber

type PeerFilter

type PeerFilter interface {
	Filter() ([]peer.ID, error)
}

PeerFilter the interface for filter peers

type StaticNodeStrategy

type StaticNodeStrategy struct {
	// contains filtered or unexported fields
}

StaticNodeStrategy a peer filter that contains strategy nodes

func (*StaticNodeStrategy) Filter

func (ss *StaticNodeStrategy) Filter() ([]peer.ID, error)

Filter return static nodes peers

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is the IO wrapper for underly P2P connection

func NewStream

func NewStream(ctx *nctx.NetCtx, srv *P2PServerV2, netStream network.Stream) (*Stream, error)

NewStream create Stream instance

func (*Stream) Close

func (s *Stream) Close()

Close close the connected IO stream

func (*Stream) MultiAddr

func (s *Stream) MultiAddr() ma.Multiaddr

MultiAddr get multi addr

func (*Stream) PeerID

func (s *Stream) PeerID() peer.ID

PeerID get id

func (*Stream) Recv

func (s *Stream) Recv()

Recv loop to read data from stream

func (*Stream) Send

func (s *Stream) Send(msg *pb.XuperMessage) error

func (*Stream) SendMessage

func (s *Stream) SendMessage(ctx xctx.XContext, msg *pb.XuperMessage) error

SendMessage will send a message to a peer

func (*Stream) SendMessageWithResponse

func (s *Stream) SendMessageWithResponse(ctx xctx.XContext,
	msg *pb.XuperMessage) (*pb.XuperMessage, error)

SendMessageWithResponse will send a message to a peer and wait for response

func (*Stream) Start

func (s *Stream) Start()

Start used to start

func (*Stream) Valid

func (s *Stream) Valid() bool

type StreamLimit

type StreamLimit struct {
	// contains filtered or unexported fields
}

StreamLimit limit the peerID amount of same ip

func (*StreamLimit) AddStream

func (sl *StreamLimit) AddStream(addrStr string, peerID peer.ID) bool

AddStream used to add the amount of same ip, plus one per call

func (*StreamLimit) DelStream

func (sl *StreamLimit) DelStream(addrStr string)

DelStream used to dec the amount of same ip, dec one per call

func (*StreamLimit) GetStreams

func (sl *StreamLimit) GetStreams() []string

GetStreams get all NetURLs from effective streams

func (*StreamLimit) Init

func (sl *StreamLimit) Init(ctx *nctx.NetCtx)

Init initialize the StreamLimit

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool manage all the stream

func NewStreamPool

func NewStreamPool(ctx *nctx.NetCtx, srv *P2PServerV2) (*StreamPool, error)

NewStreamPool create StreamPool instance

func (*StreamPool) AddStream

func (sp *StreamPool) AddStream(ctx xctx.XContext, stream *Stream) error

AddStream used to add a new P2P stream into pool

func (*StreamPool) DelStream

func (sp *StreamPool) DelStream(stream *Stream) error

DelStream delete a stream

func (*StreamPool) Get

func (sp *StreamPool) Get(ctx xctx.XContext, peerId peer.ID) (*Stream, error)

Get will probe and return a stream

func (*StreamPool) NewStream

func (sp *StreamPool) NewStream(ctx xctx.XContext, netStream network.Stream) (*Stream, error)

Add used to add a new net stream into pool

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport constructs secure communication sessions for a peer.

func (*Transport) SecureInbound

func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn) (sec.SecureConn, error)

SecureInbound runs the TLS handshake as a server.

func (*Transport) SecureOutbound

func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error)

SecureOutbound runs the TLS handshake as a client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL