Documentation ¶
Overview ¶
Package p2pv2 is the v2 of AmpChain p2p network.
Index ¶
- Constants
- Variables
- func GenerateKeyPairWithPath(path string) error
- func GenerateUniqueRandList(size int, max int) []int
- func GetAuthRequest(v *AChainAddrInfo) (*pb.IdentityAuth, error)
- func GetKeyPairFromPath(path string) (crypto.PrivKey, error)
- func GetPeerIDFromPath(keypath string) (string, error)
- type AChainAddrInfo
- type BucketsFilter
- type BucketsFilterWithFactor
- type CorePeersFilter
- type CorePeersInfo
- type FilterStrategy
- type HandlerMap
- func (hm *HandlerMap) HandleMessage(s *Stream, msg *amperp2p.AmperMessage) error
- func (hm *HandlerMap) IsMsgAsHandled(msg *amperp2p.AmperMessage) bool
- func (hm *HandlerMap) MarkMsgAsHandled(msg *amperp2p.AmperMessage)
- func (hm *HandlerMap) Register(sub *Subscriber) (*Subscriber, error)
- func (hm *HandlerMap) Start()
- func (hm *HandlerMap) Stop()
- func (hm *HandlerMap) UnRegister(sub *Subscriber) error
- type MessageOption
- type MultiStrategy
- type MultiSubscriber
- type NearestBucketFilter
- type Node
- func (no *Node) Context() context.Context
- func (no *Node) ListPeers() []peer.ID
- func (no *Node) NodeID() peer.ID
- func (no *Node) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID) error
- func (no *Node) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, ...) ([]*p2pPb.AmperMessage, error)
- func (no *Node) SetServer(srv *P2PServerV2)
- func (no *Node) Start()
- func (no *Node) Stop()
- func (no *Node) UpdateCorePeers(cp *CorePeersInfo) error
- type P2PServer
- type P2PServerV2
- func (p *P2PServerV2) GetNetURL() string
- func (p *P2PServerV2) GetPeerUrls() []string
- func (p *P2PServerV2) Register(sub *Subscriber) (*Subscriber, error)
- func (p *P2PServerV2) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, opts ...MessageOption) error
- func (p *P2PServerV2) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, opts ...MessageOption) ([]*p2pPb.AmperMessage, error)
- func (p *P2PServerV2) SetAChainAddr(bcname string, info *AChainAddrInfo)
- func (p *P2PServerV2) SetCorePeers(cp *CorePeersInfo) error
- func (p *P2PServerV2) Start()
- func (p *P2PServerV2) Stop()
- func (p *P2PServerV2) UnRegister(sub *Subscriber) error
- type PeersFilter
- type Stream
- func (s *Stream) Authenticate() error
- func (s *Stream) Close()
- func (s *Stream) PeerID() string
- func (s *Stream) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage) error
- func (s *Stream) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage) (*p2pPb.AmperMessage, error)
- func (s *Stream) Start()
- type StreamLimit
- type StreamPool
- func (sp *StreamPool) Add(s net.Stream) *Stream
- func (sp *StreamPool) AddStream(stream *Stream) error
- func (sp *StreamPool) Authenticate(stream *Stream) error
- func (sp *StreamPool) DelStream(stream *Stream) error
- func (sp *StreamPool) FindStream(peer peer.ID) (*Stream, error)
- func (sp *StreamPool) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID) error
- func (sp *StreamPool) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, ...) ([]*p2pPb.AmperMessage, error)
- func (sp *StreamPool) Start()
- func (sp *StreamPool) Stop()
- type Subscriber
Constants ¶
const ( DefaultStrategy FilterStrategy = "DefaultStrategy" BucketsStrategy = "BucketsStrategy" NearestBucketStrategy = "NearestBucketStrategy" BucketsWithFactorStrategy = "BucketsWithFactorStrategy" CorePeersStrategy = "CorePeersStrategy" )
supported filter strategies
const ( MsgChanSize = 50000 MsgHandledCacheSize = 50000 )
define default message config
const ( AmperProtocolID = "/amper/2.0.0" // protocol version MaxBroadCastPeers = 20 // the maximum peers to broadcast messages MaxBroadCastCorePeers = 10 // the maximum core peers to broadcast messages )
define the common config
Variables ¶
var ( ErrSubscribe = errors.New("subscribe error") ErrAlreadyRegisted = errors.New("subscriber already registered") ErrUnregister = errors.New("unregister subscriber error") )
define errors
var ( ErrGenerateOpts = errors.New("generate host opts error") ErrCreateHost = errors.New("create host error") ErrCreateKadDht = errors.New("create kad dht error") ErrCreateStreamPool = errors.New("create stream pool error") ErrCreateBootStrap = errors.New("create bootstrap error pool error") ErrConnectBootStrap = errors.New("error to connect to all bootstrap") ErrConnectCorePeers = errors.New("error to connect to all core peers") ErrInvalidParams = errors.New("invalid params") )
define errors
var ( ErrValidateConfig = errors.New("config not valid") ErrCreateNode = errors.New("create node error") ErrCreateHandlerMap = errors.New("create handlerMap error") )
define errors
var ( ErrTimeout = errors.New("request time out") ErrNullResult = errors.New("request result is null") ErrStrNotValid = errors.New("stream not valid") )
define common errors
var ( ErrStreamNotFound = errors.New("stream not found") ErrStreamPoolFull = errors.New("stream pool is full") ErrAddStream = errors.New("error to add stream") ErrRequest = errors.New("error request from network") ErrAuth = errors.New("error invalid auth request") )
define common errors
Functions ¶
func GenerateKeyPairWithPath ¶
GenerateKeyPairWithPath generate amper net key pair
func GenerateUniqueRandList ¶
GenerateUniqueRandList get a random unique number list
func GetAuthRequest ¶
func GetAuthRequest(v *AChainAddrInfo) (*pb.IdentityAuth, error)
GetAuthRequest get auth request for authentication
func GetKeyPairFromPath ¶
GetKeyPairFromPath get amper net key from file path
func GetPeerIDFromPath ¶
GetPeerIDFromPath return peer id of given private key path
Types ¶
type AChainAddrInfo ¶
AChainAddrInfo my AmpChain addr info
type BucketsFilter ¶
type BucketsFilter struct {
// contains filtered or unexported fields
}
BucketsFilter define filter that get all peers in buckets
type BucketsFilterWithFactor ¶
type BucketsFilterWithFactor struct {
// contains filtered or unexported fields
}
BucketsFilterWithFactor define filter that get a certain percentage peers in each bucket
func (*BucketsFilterWithFactor) Filter ¶
func (nf *BucketsFilterWithFactor) Filter() ([]peer.ID, error)
Filter 从每个Bucket中挑选占比Factor个peers进行广播 对于每一个Bucket,平均分成若干块,每个块抽取若干个节点
*|<---------------- Bucket ---------------->| *-------------------------------------------- *| | | | | | *-------------------------------------------- * split1 split2 split3 split4 split5
type CorePeersFilter ¶
type CorePeersFilter struct {
// contains filtered or unexported fields
}
CorePeersFilter define filter for core peers
func (*CorePeersFilter) Filter ¶
func (cp *CorePeersFilter) Filter() ([]peer.ID, error)
Filter select MaxBroadCastCorePeers random peers from core peers, half from current and half from next
func (*CorePeersFilter) SetRouteName ¶
func (cp *CorePeersFilter) SetRouteName(name string)
SetRouteName set the core route name to filter in AmpChain, the route name is the blockchain name
type CorePeersInfo ¶
type CorePeersInfo struct { Name string // distinguished name of the node routing CurrentTermNum int64 // the current term number CurrentPeerIDs []string // current core peer IDs NextPeerIDs []string // upcoming core peer IDs }
CorePeersInfo defines the peers' info for core nodes By setting this info, we can keep some core peers always connected directly It's useful for keeping DPoS key network security and for some BFT-like consensus
type FilterStrategy ¶
type FilterStrategy string
FilterStrategy defines the supported filter strategies
type HandlerMap ¶
type HandlerMap struct {
// contains filtered or unexported fields
}
HandlerMap the message handler manager keeps the message and handler mapping and recently handled messages
func NewHandlerMap ¶
func NewHandlerMap(log log.Logger) (*HandlerMap, error)
NewHandlerMap create instance of HandlerMap
func (*HandlerMap) HandleMessage ¶
func (hm *HandlerMap) HandleMessage(s *Stream, msg *amperp2p.AmperMessage) error
HandleMessage handle new messages with registered handlers
func (*HandlerMap) IsMsgAsHandled ¶
func (hm *HandlerMap) IsMsgAsHandled(msg *amperp2p.AmperMessage) bool
IsMsgAsHandled used to check whether the msg has been dealt with.
func (*HandlerMap) MarkMsgAsHandled ¶
func (hm *HandlerMap) MarkMsgAsHandled(msg *amperp2p.AmperMessage)
MarkMsgAsHandled used to mark message has been dealt with.
func (*HandlerMap) Register ¶
func (hm *HandlerMap) Register(sub *Subscriber) (*Subscriber, error)
Register used to register subscriber to handlerMap.
func (*HandlerMap) UnRegister ¶
func (hm *HandlerMap) UnRegister(sub *Subscriber) error
UnRegister used to un register subscriber from handlerMap.
type MessageOption ¶
type MessageOption func(*msgOptions)
MessageOption define single option function
func WithBcName ¶
func WithBcName(bcname string) MessageOption
WithBcName add bcname to message option
func WithFilters ¶
func WithFilters(filter []FilterStrategy) MessageOption
WithFilters add filter strategies to message option
func WithPercentage ¶
func WithPercentage(percentage float32) MessageOption
WithPercentage add percentage to message option
func WithTargetPeerAddrs ¶
func WithTargetPeerAddrs(peerAddrs []string) MessageOption
WithTargetPeerAddrs add target peer addresses to message option
type MultiStrategy ¶
type MultiStrategy struct {
// contains filtered or unexported fields
}
MultiStrategy a peer filter that contains multiple filters
func NewMultiStrategy ¶
func NewMultiStrategy(node *Node, filters []PeersFilter) *MultiStrategy
NewMultiStrategy create instance of MultiStrategy
type MultiSubscriber ¶
type MultiSubscriber struct {
// contains filtered or unexported fields
}
MultiSubscriber wrap a list of Subscriber of same message
type NearestBucketFilter ¶
type NearestBucketFilter struct {
// contains filtered or unexported fields
}
NearestBucketFilter define filter that get nearest peers from a specified peer ID
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is the node in the network
func (*Node) SendMessage ¶
SendMessage send message to given peers
func (*Node) SendMessageWithResponse ¶
func (no *Node) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.AmperMessage, error)
SendMessageWithResponse send message to given peers, expecting response from peers
func (*Node) SetServer ¶
func (no *Node) SetServer(srv *P2PServerV2)
SetServer set the p2p server of the node
func (*Node) UpdateCorePeers ¶
func (no *Node) UpdateCorePeers(cp *CorePeersInfo) error
UpdateCorePeers update core peers' info and keep connection to core peers
type P2PServer ¶
type P2PServer interface { Start() Stop() // 注册订阅者,支持多个用户订阅同一类消息 Register(sub *Subscriber) (*Subscriber, error) // 注销订阅者,需要根据当时注册时返回的Subscriber实例删除 UnRegister(sub *Subscriber) error SendMessage(context.Context, *p2pPb.AmperMessage, ...MessageOption) error // todo: 将请求的参数改为Option的方式 SendMessageWithResponse(context.Context, *p2pPb.AmperMessage, ...MessageOption) ([]*p2pPb.AmperMessage, error) GetNetURL() string // 查询所连接节点的信息 GetPeerUrls() []string // SetCorePeers set core peers' info to P2P server SetCorePeers(cp *CorePeersInfo) error // SetAChainAddr Set AmpChain address from AmpChaincore SetAChainAddr(bcname string, info *AChainAddrInfo) }
P2PServer is the p2p server interface of Amper
type P2PServerV2 ¶
type P2PServerV2 struct {
// contains filtered or unexported fields
}
P2PServerV2 is the v2 of AmpChain p2p server. An implement of P2PServer interface.
func NewP2PServerV2 ¶
NewP2PServerV2 create P2PServerV2 instance
func (*P2PServerV2) GetNetURL ¶
func (p *P2PServerV2) GetNetURL() string
GetNetURL return net url of the amper node url = /ip4/127.0.0.1/tcp/<port>/p2p/<peer.Id>
func (*P2PServerV2) GetPeerUrls ¶
func (p *P2PServerV2) GetPeerUrls() []string
GetPeerUrls 查询所连接节点的信息
func (*P2PServerV2) Register ¶
func (p *P2PServerV2) Register(sub *Subscriber) (*Subscriber, error)
Register register message subscribers to handle messages
func (*P2PServerV2) SendMessage ¶
func (p *P2PServerV2) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, opts ...MessageOption) error
SendMessage send message to peers using given filter strategy
func (*P2PServerV2) SendMessageWithResponse ¶
func (p *P2PServerV2) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, opts ...MessageOption) ([]*p2pPb.AmperMessage, error)
SendMessageWithResponse send message to peers using given filter strategy, expect response from peers 客户端再使用该方法请求带返回的消息时,最好带上log_id, 否则会导致收消息时收到不匹配的消息而影响后续的处理
func (*P2PServerV2) SetAChainAddr ¶
func (p *P2PServerV2) SetAChainAddr(bcname string, info *AChainAddrInfo)
SetAChainAddr Set AmpChain address info from core
func (*P2PServerV2) SetCorePeers ¶
func (p *P2PServerV2) SetCorePeers(cp *CorePeersInfo) error
SetCorePeers set core peers' info to P2P server
func (*P2PServerV2) UnRegister ¶
func (p *P2PServerV2) UnRegister(sub *Subscriber) error
UnRegister remove message subscribers
type PeersFilter ¶
PeersFilter the interface for filter peers
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is the IO wrapper for underly P2P connection
func (*Stream) Authenticate ¶
Authenticate it's used for identity authentication
func (*Stream) SendMessage ¶
SendMessage will send a message to a peer
func (*Stream) SendMessageWithResponse ¶
func (s *Stream) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage) (*p2pPb.AmperMessage, error)
SendMessageWithResponse will send a message to a peer and wait for response
type StreamLimit ¶
type StreamLimit struct {
// contains filtered or unexported fields
}
StreamLimit limit the peerID amount of same ip
func (*StreamLimit) AddStream ¶
func (sl *StreamLimit) AddStream(addrStr string, peerID peer.ID) bool
AddStream used to add the amount of same ip, plus one per call
func (*StreamLimit) DelStream ¶
func (sl *StreamLimit) DelStream(addrStr string)
DelStream used to dec the amount of same ip, dec one per call
func (*StreamLimit) Init ¶
func (sl *StreamLimit) Init(limit int64, lg log.Logger)
Init initialize the StreamLimit
type StreamPool ¶
type StreamPool struct {
// contains filtered or unexported fields
}
StreamPool manage all the stream
func NewStreamPool ¶
NewStreamPool create StreamPool instance
func (*StreamPool) Add ¶
func (sp *StreamPool) Add(s net.Stream) *Stream
Add used to add a new net stream into pool
func (*StreamPool) AddStream ¶
func (sp *StreamPool) AddStream(stream *Stream) error
AddStream used to add a new P2P stream into pool
func (*StreamPool) Authenticate ¶
func (sp *StreamPool) Authenticate(stream *Stream) error
Authenticate it's used for identity authentication
func (*StreamPool) DelStream ¶
func (sp *StreamPool) DelStream(stream *Stream) error
DelStream delete a stream
func (*StreamPool) FindStream ¶
func (sp *StreamPool) FindStream(peer peer.ID) (*Stream, error)
FindStream get the stream between given peer ID
func (*StreamPool) SendMessage ¶
func (sp *StreamPool) SendMessage(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID) error
SendMessage send message to given peer ID
func (*StreamPool) SendMessageWithResponse ¶
func (sp *StreamPool) SendMessageWithResponse(ctx context.Context, msg *p2pPb.AmperMessage, peers []peer.ID, percentage float32) ([]*p2pPb.AmperMessage, error)
SendMessageWithResponse will send message to peers with response withBreak means whether request wait for all response
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber define the subscriber of message
func NewSubscriber ¶
func NewSubscriber(msgCh chan *amperp2p.AmperMessage, msgType amperp2p.AmperMessage_MessageType, handler amperHandler, msgFrom string) *Subscriber
NewSubscriber create instance of Subscriber