Documentation ¶
Index ¶
- Constants
- Variables
- func CreatePubSubMsgPayload(appMsgs []*pb.ApplicationMsg, spreadMsg *pb.IHaveOrWant, topicMsg *pb.TopicMsg, ...) ([]byte, error)
- func GetMsgKey(sender string, seq uint64) string
- func GetPubSubMsgWithPayload(payload []byte) (*pb.PubsubMsg, error)
- type ChainPubSub
- func (p *ChainPubSub) AllMetadataOnlyPeers() []peer.ID
- func (p *ChainPubSub) AttachHost(h host.Host) error
- func (p *ChainPubSub) HostNotifiee() host.Notifiee
- func (p *ChainPubSub) ID() peer.ID
- func (p *ChainPubSub) ProtocolID() protocol.ID
- func (p *ChainPubSub) ProtocolMsgHandler() handler.MsgPayloadHandler
- func (p *ChainPubSub) Publish(topic string, msg []byte)
- func (p *ChainPubSub) RemoveBlackPeer(pid peer.ID)
- func (p *ChainPubSub) SetBlackPeer(pid peer.ID)
- func (p *ChainPubSub) Stop() error
- func (p *ChainPubSub) Subscribe(topic string, msgHandler handler.SubMsgHandler)
- func (p *ChainPubSub) Unsubscribe(topic string)
- type MsgBasket
- func (mb *MsgBasket) Cancel()
- func (mb *MsgBasket) HeartBeat()
- func (mb *MsgBasket) PutApplicationMsg(msg ...*pb.ApplicationMsg)
- func (mb *MsgBasket) SendCutOff(topic string, pid peer.ID)
- func (mb *MsgBasket) SendJoinUp(topic string, pid peer.ID)
- func (mb *MsgBasket) SubOrUnSubTopic(topic string, subOrUnSub bool)
- type MsgCache
- type Option
- func WithAppMsgCacheMaxSize(size int) Option
- func WithAppMsgCacheTimeout(timeout time.Duration) Option
- func WithDegreeDesired(desired int32) Option
- func WithDegreeHigh(high int32) Option
- func WithDegreeLow(low int32) Option
- func WithFanOutSize(size int32) Option
- func WithFanOutTimeout(timeout time.Duration) Option
- func WithGossipInterval(interval time.Duration) Option
- func WithGossipSize(size int) Option
- func WithMetadataCacheMaxSize(size int) Option
- func WithMetadataCacheTimeout(timeout time.Duration) Option
- func WithPubSubMessageMaxSize(size int32) Option
- type PeerState
- type TypeOfPeering
Constants ¶
const ( // ProtocolIDTemplate is the template for making pub-sub protocol id. ProtocolIDTemplate = "/chain-pubsub/v0.0.1/chain-%s" // DefaultFanOutSize is the default value of fan-out-size when fan out peering used. DefaultFanOutSize int32 = DefaultDegreeLow // DefaultFanOutTimeout is the default value of fan-out-timeout when fan out peering finished. DefaultFanOutTimeout = 2 * time.Second // DefaultDegreeLow is the default lower boundary of full-msg peering count. DefaultDegreeLow int32 = 4 // DefaultDegreeDesired is the default desired value of full-msg peering count. DefaultDegreeDesired int32 = 6 // DefaultDegreeHigh is the default higher boundary of full-msg peering count. DefaultDegreeHigh int32 = 12 // DefaultMetadataCacheTimeout is the default timeout for metadata cache. DefaultMetadataCacheTimeout = 2 * time.Minute // DefaultMetadataCacheMaxSize is the default max size for metadata cache. DefaultMetadataCacheMaxSize = 120 // DefaultAppMsgCacheTimeout is the default timeout for application msg cache. DefaultAppMsgCacheTimeout = 10 * time.Second // DefaultAppMsgCacheMaxSize is the default max size for application msg cache. DefaultAppMsgCacheMaxSize = 20 // DefaultGossipSize is the default gossip size for spreading metadata. DefaultGossipSize = 3 // DefaultGossipInterval is the default gossip interval(seconds). DefaultGossipInterval = time.Second // DefaultMaxSendApplicationMsgSize is the default size for max send application message size.(M) DefaultMaxSendApplicationMsgSize = 30 // DefaultGossipBatchSendApplicationLength is the default max count of application messages // which sent when gossiping. DefaultGossipBatchSendApplicationLength = 3 // DefaultAppMsgStationsLength is the default length of spread stations can be stored in an application message. DefaultAppMsgStationsLength = 10 )
Variables ¶
Functions ¶
func CreatePubSubMsgPayload ¶
func CreatePubSubMsgPayload( appMsgs []*pb.ApplicationMsg, spreadMsg *pb.IHaveOrWant, topicMsg *pb.TopicMsg, peeringMsg *pb.PeeringMsg) ([]byte, error)
CreatePubSubMsgPayload create payload bytes for PubSub.
func GetMsgKey ¶
GetMsgKey build a cache key with sender and sequence string.
Types ¶
type ChainPubSub ¶
type ChainPubSub struct {
// contains filtered or unexported fields
}
ChainPubSub is an implementation of broadcast.PubSub interface. It is based on supporting of application protocols. Different ChainPubSub use different protocol, that these ChainPubSub will be isolated when spreading messages.
func NewChainPubSub ¶
func NewChainPubSub(chainId string, logger api.Logger, opts ...Option) *ChainPubSub
NewChainPubSub create a new ChainPubSub instance.
func (*ChainPubSub) AllMetadataOnlyPeers ¶
func (p *ChainPubSub) AllMetadataOnlyPeers() []peer.ID
AllMetadataOnlyPeers return a list of peer.ID who communicates with us in a metadata-only link.
func (*ChainPubSub) AttachHost ¶
func (p *ChainPubSub) AttachHost(h host.Host) error
AttachHost set up a host.
func (*ChainPubSub) HostNotifiee ¶
func (p *ChainPubSub) HostNotifiee() host.Notifiee
HostNotifiee return an implementation of host.Notifiee interface. It will be registered in host.Host.Notify method.
func (*ChainPubSub) ProtocolID ¶
func (p *ChainPubSub) ProtocolID() protocol.ID
ProtocolID return the protocol.ID of the PubSub service. The protocol id will be registered in host.RegisterMsgPayloadHandler method.
func (*ChainPubSub) ProtocolMsgHandler ¶
func (p *ChainPubSub) ProtocolMsgHandler() handler.MsgPayloadHandler
ProtocolMsgHandler return a function which type is handler.MsgPayloadHandler. It will be registered in host.Host.RegisterMsgPayloadHandler method.
func (*ChainPubSub) Publish ¶
func (p *ChainPubSub) Publish(topic string, msg []byte)
Publish will push a msg to the network of the topic given.
func (*ChainPubSub) RemoveBlackPeer ¶
func (p *ChainPubSub) RemoveBlackPeer(pid peer.ID)
func (*ChainPubSub) SetBlackPeer ¶
func (p *ChainPubSub) SetBlackPeer(pid peer.ID)
func (*ChainPubSub) Stop ¶
func (p *ChainPubSub) Stop() error
func (*ChainPubSub) Subscribe ¶
func (p *ChainPubSub) Subscribe(topic string, msgHandler handler.SubMsgHandler)
Subscribe register a sub-msg handler for handling the msg listened from the topic given.
func (*ChainPubSub) Unsubscribe ¶
func (p *ChainPubSub) Unsubscribe(topic string)
Unsubscribe cancels listening the topic given and unregister the sub-msg handler registered for this topic.
type MsgBasket ¶
MsgBasket manage the message that will be sent uniformly. Use this to reduce the times of calling send msg method. And then reduce occupation of resources of network.
func NewMsgBasket ¶
func NewMsgBasket(pubSub *ChainPubSub) *MsgBasket
NewMsgBasket create a new *MsgBasket instance.
func (*MsgBasket) PutApplicationMsg ¶
func (mb *MsgBasket) PutApplicationMsg(msg ...*pb.ApplicationMsg)
PutApplicationMsg push messages into basket, then waiting for sent out.
func (*MsgBasket) SendCutOff ¶
SendCutOff send cut-off control msg.
func (*MsgBasket) SendJoinUp ¶
SendJoinUp send join-up control msg.
type MsgCache ¶
type MsgCache struct {
// contains filtered or unexported fields
}
func NewMsgCache ¶
func (*MsgCache) AllApplicationMsg ¶
func (c *MsgCache) AllApplicationMsg() []*pb.ApplicationMsg
AllApplicationMsg return all application message in cache
func (*MsgCache) AllMsgMetadata ¶
func (c *MsgCache) AllMsgMetadata() []*pb.MsgMetadata
AllMsgMetadata get all cached metadata
func (*MsgCache) PutIfNoExists ¶
func (c *MsgCache) PutIfNoExists(msg *pb.ApplicationMsg) bool
PutIfNoExists put message to cache, if message not exists return ture, if message has exists return false
type Option ¶
type Option func(pubsub *ChainPubSub)
Option for ChainPubSub.
func WithAppMsgCacheMaxSize ¶
WithAppMsgCacheMaxSize set max cache size for full application message.
func WithAppMsgCacheTimeout ¶
WithAppMsgCacheTimeout set cache timeout for full application message.
func WithDegreeDesired ¶
WithDegreeDesired set degree desired for full-msg type.
func WithDegreeHigh ¶
WithDegreeHigh set degree high for full-msg type.
func WithDegreeLow ¶
WithDegreeLow set degree low for full-msg type.
func WithFanOutTimeout ¶
WithFanOutTimeout set fan-out timeout duration.
func WithGossipInterval ¶
WithGossipInterval set heartbeat interval for gossiping.
func WithGossipSize ¶
WithGossipSize set max count of peers that gossiping choose.
func WithMetadataCacheMaxSize ¶
WithMetadataCacheMaxSize set max cache size for metadata.
func WithMetadataCacheTimeout ¶
WithMetadataCacheTimeout set cache timeout for metadata .
type PeerState ¶
type PeerState interface { // CacheMsg store the app msg into cache. CacheMsg(applicationMsg *pb.ApplicationMsg) // IsPeerReceivedMsg return whether the app msg exist in cache. IsPeerReceivedMsg(pid peer.ID, applicationMsg *pb.ApplicationMsg) bool // RecordPeerReceivedMsg add a record that who has received the msg. RecordPeerReceivedMsg(pid peer.ID, meta *pb.MsgMetadata) bool // AllMetadataPeerId return the peer.ID list of peers who linking us with metadata-only stat. AllMetadataPeerId() []peer.ID // GetChainPubSub return ChainPubSub instance of this PeerState. GetChainPubSub() *ChainPubSub // Subscribe listen a topic and register a topic sub message handler. Subscribe(topic string, handler handler.SubMsgHandler) error // Unsubscribe cancel listening a topic and unregister the topic sub message handler. Unsubscribe(topic string) // IsSubscribed return whether the topic has subscribed. IsSubscribed(topic string) bool // GetTopics return the list of topics that subscribed before. GetTopics() []string // GetAllMsgMetadata return the list of all the metadata in cache. GetAllMsgMetadata() []*pb.MsgMetadata // IHaveMessage return the list of the metadata of full app messages in cache. IHaveMessage() []*pb.MsgMetadata // IWantMessage return the list of metadata that peer having and not received by me. IWantMessage(peerHaveMessage []*pb.MsgMetadata) []*pb.MsgMetadata // GetMessageListWithMetadataList query full app messages with msg metadata infos. GetMessageListWithMetadataList(metadataList []*pb.MsgMetadata) []*pb.ApplicationMsg // ID return local peer.ID ID() peer.ID }
PeerState stored subscription infos and cached necessary data of message published or received. It also provides some method to support gossip service to get all necessary info.
type TypeOfPeering ¶
type TypeOfPeering int
TypeOfPeering is the type of peering stat.
const ( // UnknownTypeOfPeering is unknown peering stat. UnknownTypeOfPeering TypeOfPeering = iota // FullMsgTypeOfPeering is full-msg peering stat. FullMsgTypeOfPeering // MetadataOnlyTypeOfPeering is metadata-only peering stat. MetadataOnlyTypeOfPeering // FanOutTypeOfPeering is fan-out peering stat. FanOutTypeOfPeering )