Documentation ¶
Index ¶
- Constants
- Variables
- func GenAccountKey(account string) string
- func GenPeerIDKey(id peer.ID) string
- func Key(account string) string
- func NewP2PServerV2() p2p.Server
- func NewTLS(path, serviceName string) func(key crypto.PrivKey) (*Transport, error)
- type BucketsFilter
- type BucketsFilterWithFactor
- type MultiStrategy
- type NearestBucketFilter
- type P2PServerV2
- func (p *P2PServerV2) Context() *netCtx.NetCtx
- func (p *P2PServerV2) GetPeerIdByAccount(account string) (peer.ID, error)
- func (p *P2PServerV2) HandleMessage(stream p2p.Stream, msg *pb.XuperMessage) error
- func (p *P2PServerV2) Init(ctx *netCtx.NetCtx) error
- func (p *P2PServerV2) NewSubscriber(typ pb.XuperMessage_MessageType, v interface{}, opts ...p2p.SubscriberOption) p2p.Subscriber
- func (p *P2PServerV2) PeerID() string
- func (p *P2PServerV2) PeerInfo() pb.PeerInfo
- func (p *P2PServerV2) Register(sub p2p.Subscriber) error
- func (p *P2PServerV2) SendMessage(ctx xctx.XContext, msg *pb.XuperMessage, optFunc ...p2p.OptionFunc) error
- func (p *P2PServerV2) SendMessageWithResponse(ctx xctx.XContext, msg *pb.XuperMessage, optFunc ...p2p.OptionFunc) ([]*pb.XuperMessage, error)
- func (p *P2PServerV2) Start()
- func (p *P2PServerV2) Stop()
- func (p *P2PServerV2) UnRegister(sub p2p.Subscriber) error
- type PeerFilter
- type StaticNodeStrategy
- type Stream
- func (s *Stream) Close()
- func (s *Stream) MultiAddr() ma.Multiaddr
- func (s *Stream) PeerID() peer.ID
- func (s *Stream) Recv()
- func (s *Stream) Send(msg *pb.XuperMessage) error
- func (s *Stream) SendMessage(ctx xctx.XContext, msg *pb.XuperMessage) error
- func (s *Stream) SendMessageWithResponse(ctx xctx.XContext, msg *pb.XuperMessage) (*pb.XuperMessage, error)
- func (s *Stream) Start()
- func (s *Stream) Valid() bool
- type StreamLimit
- type StreamPool
- type Transport
Constants ¶
const ID = "/tls/1.0.0"
ID is the protocol ID (used when negotiating with multistream)
const (
ServerName = "p2pv2"
)
Variables ¶
var ( ErrEmptyPeer = errors.New("empty peer") ErrNoResponse = errors.New("no response") )
var ( // MaxBroadCastPeers define the maximum number of common peers to broadcast messages MaxBroadCastPeers = 20 DefaultBucketsFilterFactor = 0.5 )
var ( ErrGenerateOpts = errors.New("generate host opts error") ErrCreateHost = errors.New("create host error") ErrBindAddress = errors.New("bind address error") ErrCreateKadDht = errors.New("create kad dht error") ErrCreateStreamPool = errors.New("create stream pool error") ErrCreateBootStrap = errors.New("create bootstrap error pool error") ErrConnectBootStrap = errors.New("error to connect to all bootstrap") ErrLoadAccount = errors.New("load account error") ErrConnect = errors.New("connect all boot and static peer error") )
define errors
var ( ErrNewStream = errors.New("new stream error") ErrStreamNotValid = errors.New("stream not valid") ErrNoneMessageType = errors.New("none message type") )
define common errors
var (
ErrStreamPoolFull = errors.New("stream pool is full")
)
define common errors
Functions ¶
func GenAccountKey ¶
func GenPeerIDKey ¶
Types ¶
type BucketsFilter ¶
type BucketsFilter struct {
// contains filtered or unexported fields
}
BucketsFilter define filter that get all peers in buckets
type BucketsFilterWithFactor ¶
type BucketsFilterWithFactor struct {
// contains filtered or unexported fields
}
BucketsFilterWithFactor define filter that get a certain percentage peers in each bucket
func (*BucketsFilterWithFactor) Filter ¶
func (nf *BucketsFilterWithFactor) Filter() ([]peer.ID, error)
Filter 从每个Bucket中挑选占比Factor个peers进行广播 对于每一个Bucket,平均分成若干块,每个块抽取若干个节点
*|<---------------- Bucket ---------------->| *-------------------------------------------- *| | | | | | *-------------------------------------------- * split1 split2 split3 split4 split5
type MultiStrategy ¶
type MultiStrategy struct {
// contains filtered or unexported fields
}
MultiStrategy a peer filter that contains multiple filters
func NewMultiStrategy ¶
func NewMultiStrategy(filters []PeerFilter, peerIDs []peer.ID) *MultiStrategy
NewMultiStrategy create instance of MultiStrategy
type NearestBucketFilter ¶
type NearestBucketFilter struct {
// contains filtered or unexported fields
}
NearestBucketFilter define filter that get nearest peers from a specified peer ID
type P2PServerV2 ¶
type P2PServerV2 struct {
// contains filtered or unexported fields
}
P2PServerV2 is the node in the network
func (*P2PServerV2) Context ¶
func (p *P2PServerV2) Context() *netCtx.NetCtx
func (*P2PServerV2) GetPeerIdByAccount ¶
func (p *P2PServerV2) GetPeerIdByAccount(account string) (peer.ID, error)
func (*P2PServerV2) HandleMessage ¶
func (p *P2PServerV2) HandleMessage(stream p2p.Stream, msg *pb.XuperMessage) error
func (*P2PServerV2) Init ¶
func (p *P2PServerV2) Init(ctx *netCtx.NetCtx) error
Init initialize p2p server using given config
func (*P2PServerV2) NewSubscriber ¶
func (p *P2PServerV2) NewSubscriber(typ pb.XuperMessage_MessageType, v interface{}, opts ...p2p.SubscriberOption) p2p.Subscriber
func (*P2PServerV2) PeerInfo ¶
func (p *P2PServerV2) PeerInfo() pb.PeerInfo
func (*P2PServerV2) Register ¶
func (p *P2PServerV2) Register(sub p2p.Subscriber) error
Register register message subscriber to handle messages
func (*P2PServerV2) SendMessage ¶
func (p *P2PServerV2) SendMessage(ctx xctx.XContext, msg *pb.XuperMessage, optFunc ...p2p.OptionFunc) error
SendMessage send message to peers using given filter strategy
func (*P2PServerV2) SendMessageWithResponse ¶
func (p *P2PServerV2) SendMessageWithResponse(ctx xctx.XContext, msg *pb.XuperMessage, optFunc ...p2p.OptionFunc) ([]*pb.XuperMessage, error)
SendMessageWithResponse send message to peers using given filter strategy, expect response from peers 客户端再使用该方法请求带返回的消息时,最好带上log_id, 否则会导致收消息时收到不匹配的消息而影响后续的处理
func (*P2PServerV2) UnRegister ¶
func (p *P2PServerV2) UnRegister(sub p2p.Subscriber) error
UnRegister remove message subscriber
type PeerFilter ¶
PeerFilter the interface for filter peers
type StaticNodeStrategy ¶
type StaticNodeStrategy struct {
// contains filtered or unexported fields
}
StaticNodeStrategy a peer filter that contains strategy nodes
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is the IO wrapper for underly P2P connection
func (*Stream) SendMessage ¶
SendMessage will send a message to a peer
func (*Stream) SendMessageWithResponse ¶
func (s *Stream) SendMessageWithResponse(ctx xctx.XContext, msg *pb.XuperMessage) (*pb.XuperMessage, error)
SendMessageWithResponse will send a message to a peer and wait for response
type StreamLimit ¶
type StreamLimit struct {
// contains filtered or unexported fields
}
StreamLimit limit the peerID amount of same ip
func (*StreamLimit) AddStream ¶
func (sl *StreamLimit) AddStream(addrStr string, peerID peer.ID) bool
AddStream used to add the amount of same ip, plus one per call
func (*StreamLimit) DelStream ¶
func (sl *StreamLimit) DelStream(addrStr string)
DelStream used to dec the amount of same ip, dec one per call
func (*StreamLimit) GetStreams ¶
func (sl *StreamLimit) GetStreams() []string
GetStreams get all NetURLs from effective streams
func (*StreamLimit) Init ¶
func (sl *StreamLimit) Init(ctx *nctx.NetCtx)
Init initialize the StreamLimit
type StreamPool ¶
type StreamPool struct {
// contains filtered or unexported fields
}
StreamPool manage all the stream
func NewStreamPool ¶
func NewStreamPool(ctx *nctx.NetCtx, srv *P2PServerV2) (*StreamPool, error)
NewStreamPool create StreamPool instance
func (*StreamPool) AddStream ¶
func (sp *StreamPool) AddStream(ctx xctx.XContext, stream *Stream) error
AddStream used to add a new P2P stream into pool
func (*StreamPool) DelStream ¶
func (sp *StreamPool) DelStream(stream *Stream) error
DelStream delete a stream
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport constructs secure communication sessions for a peer.
func (*Transport) SecureInbound ¶
SecureInbound runs the TLS handshake as a server.