pubsub

package
v0.0.0-...-dae542e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ProtocolIDTemplate is the template for making pub-sub protocol id.
	ProtocolIDTemplate = "/chain-pubsub/v0.0.1/chain-%s"

	// DefaultFanOutSize is the default value of fan-out-size when fan out peering used.
	DefaultFanOutSize int32 = DefaultDegreeLow
	// DefaultFanOutTimeout is the default value of fan-out-timeout when fan out peering finished.
	DefaultFanOutTimeout = 2 * time.Second
	// DefaultDegreeLow is the default lower boundary of full-msg peering count.
	DefaultDegreeLow int32 = 4
	// DefaultDegreeDesired is the default desired value of full-msg peering count.
	DefaultDegreeDesired int32 = 6
	// DefaultDegreeHigh is the default higher boundary of full-msg peering count.
	DefaultDegreeHigh int32 = 12
	// DefaultMetadataCacheTimeout is the default timeout for metadata cache.
	DefaultMetadataCacheTimeout = 2 * time.Minute
	// DefaultMetadataCacheMaxSize is the default max size for metadata cache.
	DefaultMetadataCacheMaxSize = 120
	// DefaultAppMsgCacheTimeout is the default timeout for application msg cache.
	DefaultAppMsgCacheTimeout = 10 * time.Second
	// DefaultAppMsgCacheMaxSize is the default max size for application msg cache.
	DefaultAppMsgCacheMaxSize = 20
	// DefaultGossipSize is the default gossip size for spreading metadata.
	DefaultGossipSize = 3
	// DefaultGossipInterval is the default gossip interval(seconds).
	DefaultGossipInterval = time.Second
	// DefaultMaxSendApplicationMsgSize is the default size for max send application message size.(M)
	DefaultMaxSendApplicationMsgSize = 30
	// DefaultGossipBatchSendApplicationLength is the default max count of application messages
	// which sent when gossiping.
	DefaultGossipBatchSendApplicationLength = 3
	// DefaultAppMsgStationsLength is the default length of spread stations can be stored in an application message.
	DefaultAppMsgStationsLength = 10
)

Variables

View Source
var (
	// ErrNilHost will be returned if host is nil.
	ErrNilHost = errors.New("nil host")
	// ErrDiffHost will be returned if the host given is not the same one that we used.
	ErrDiffHost = errors.New("different host")
)

Functions

func CreatePubSubMsgPayload

func CreatePubSubMsgPayload(
	appMsgs []*pb.ApplicationMsg,
	spreadMsg *pb.IHaveOrWant,
	topicMsg *pb.TopicMsg,
	peeringMsg *pb.PeeringMsg) ([]byte, error)

CreatePubSubMsgPayload create payload bytes for PubSub.

func GetMsgKey

func GetMsgKey(sender string, seq uint64) string

GetMsgKey build a cache key with sender and sequence string.

func GetPubSubMsgWithPayload

func GetPubSubMsgWithPayload(payload []byte) (*pb.PubsubMsg, error)

GetPubSubMsgWithPayload parse payload bytes to *pb.PubsubMsg

Types

type ChainPubSub

type ChainPubSub struct {
	// contains filtered or unexported fields
}

ChainPubSub is an implementation of broadcast.PubSub interface. It is based on supporting of application protocols. Different ChainPubSub use different protocol, that these ChainPubSub will be isolated when spreading messages.

func NewChainPubSub

func NewChainPubSub(chainId string, logger api.Logger, opts ...Option) *ChainPubSub

NewChainPubSub create a new ChainPubSub instance.

func (*ChainPubSub) AllMetadataOnlyPeers

func (p *ChainPubSub) AllMetadataOnlyPeers() []peer.ID

AllMetadataOnlyPeers return a list of peer.ID who communicates with us in a metadata-only link.

func (*ChainPubSub) AttachHost

func (p *ChainPubSub) AttachHost(h host.Host) error

AttachHost set up a host.

func (*ChainPubSub) HostNotifiee

func (p *ChainPubSub) HostNotifiee() host.Notifiee

HostNotifiee return an implementation of host.Notifiee interface. It will be registered in host.Host.Notify method.

func (*ChainPubSub) ID

func (p *ChainPubSub) ID() peer.ID

ID return the local peer.ID.

func (*ChainPubSub) ProtocolID

func (p *ChainPubSub) ProtocolID() protocol.ID

ProtocolID return the protocol.ID of the PubSub service. The protocol id will be registered in host.RegisterMsgPayloadHandler method.

func (*ChainPubSub) ProtocolMsgHandler

func (p *ChainPubSub) ProtocolMsgHandler() handler.MsgPayloadHandler

ProtocolMsgHandler return a function which type is handler.MsgPayloadHandler. It will be registered in host.Host.RegisterMsgPayloadHandler method.

func (*ChainPubSub) Publish

func (p *ChainPubSub) Publish(topic string, msg []byte)

Publish will push a msg to the network of the topic given.

func (*ChainPubSub) RemoveBlackPeer

func (p *ChainPubSub) RemoveBlackPeer(pid peer.ID)

func (*ChainPubSub) SetBlackPeer

func (p *ChainPubSub) SetBlackPeer(pid peer.ID)

func (*ChainPubSub) Stop

func (p *ChainPubSub) Stop() error

func (*ChainPubSub) Subscribe

func (p *ChainPubSub) Subscribe(topic string, msgHandler handler.SubMsgHandler)

Subscribe register a sub-msg handler for handling the msg listened from the topic given.

func (*ChainPubSub) Unsubscribe

func (p *ChainPubSub) Unsubscribe(topic string)

Unsubscribe cancels listening the topic given and unregister the sub-msg handler registered for this topic.

type MsgBasket

type MsgBasket struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MsgBasket manage the message that will be sent uniformly. Use this to reduce the times of calling send msg method. And then reduce occupation of resources of network.

func NewMsgBasket

func NewMsgBasket(pubSub *ChainPubSub) *MsgBasket

NewMsgBasket create a new *MsgBasket instance.

func (*MsgBasket) Cancel

func (mb *MsgBasket) Cancel()

Cancel basket working.

func (*MsgBasket) HeartBeat

func (mb *MsgBasket) HeartBeat()

HeartBeat to pour out the basket.

func (*MsgBasket) PutApplicationMsg

func (mb *MsgBasket) PutApplicationMsg(msg ...*pb.ApplicationMsg)

PutApplicationMsg push messages into basket, then waiting for sent out.

func (*MsgBasket) SendCutOff

func (mb *MsgBasket) SendCutOff(topic string, pid peer.ID)

SendCutOff send cut-off control msg.

func (*MsgBasket) SendJoinUp

func (mb *MsgBasket) SendJoinUp(topic string, pid peer.ID)

SendJoinUp send join-up control msg.

func (*MsgBasket) SubOrUnSubTopic

func (mb *MsgBasket) SubOrUnSubTopic(topic string, subOrUnSub bool)

SubOrUnSubTopic called when subscribing or unsubscribing.

type MsgCache

type MsgCache struct {
	// contains filtered or unexported fields
}

func NewMsgCache

func NewMsgCache(
	applicationMsgDuration time.Duration,
	metadataMsgDuration time.Duration,
	applicationMaxLength int,
	metadataMsgMaxLength int) *MsgCache

func (*MsgCache) AllApplicationMsg

func (c *MsgCache) AllApplicationMsg() []*pb.ApplicationMsg

AllApplicationMsg return all application message in cache

func (*MsgCache) AllMsgMetadata

func (c *MsgCache) AllMsgMetadata() []*pb.MsgMetadata

AllMsgMetadata get all cached metadata

func (*MsgCache) Put

func (c *MsgCache) Put(msg *pb.ApplicationMsg) bool

Put message to cache

func (*MsgCache) PutIfNoExists

func (c *MsgCache) PutIfNoExists(msg *pb.ApplicationMsg) bool

PutIfNoExists put message to cache, if message not exists return ture, if message has exists return false

type Option

type Option func(cps *ChainPubSub)

Option for ChainPubSub.

func WithAppMsgCacheMaxSize

func WithAppMsgCacheMaxSize(size int) Option

WithAppMsgCacheMaxSize set max cache size for full application message.

func WithAppMsgCacheTimeout

func WithAppMsgCacheTimeout(timeout time.Duration) Option

WithAppMsgCacheTimeout set cache timeout for full application message.

func WithDegreeDesired

func WithDegreeDesired(desired int32) Option

WithDegreeDesired set degree desired for full-msg type.

func WithDegreeHigh

func WithDegreeHigh(high int32) Option

WithDegreeHigh set degree high for full-msg type.

func WithDegreeLow

func WithDegreeLow(low int32) Option

WithDegreeLow set degree low for full-msg type.

func WithFanOutSize

func WithFanOutSize(size int32) Option

WithFanOutSize set fan-out size.

func WithFanOutTimeout

func WithFanOutTimeout(timeout time.Duration) Option

WithFanOutTimeout set fan-out timeout duration.

func WithGossipInterval

func WithGossipInterval(interval time.Duration) Option

WithGossipInterval set heartbeat interval for gossiping.

func WithGossipSize

func WithGossipSize(size int) Option

WithGossipSize set max count of peers that gossiping choose.

func WithMetadataCacheMaxSize

func WithMetadataCacheMaxSize(size int) Option

WithMetadataCacheMaxSize set max cache size for metadata.

func WithMetadataCacheTimeout

func WithMetadataCacheTimeout(timeout time.Duration) Option

WithMetadataCacheTimeout set cache timeout for metadata .

func WithPubSubMessageMaxSize

func WithPubSubMessageMaxSize(size int32) Option

WithPubSubMessageMaxSize set max size allowed of pub-sub message.

type PeerState

type PeerState interface {
	// CacheMsg store the app msg into cache.
	CacheMsg(applicationMsg *pb.ApplicationMsg)
	// IsPeerReceivedMsg return whether the app msg exist in cache.
	IsPeerReceivedMsg(pid peer.ID, applicationMsg *pb.ApplicationMsg) bool
	// RecordPeerReceivedMsg add a record that who has received the msg.
	RecordPeerReceivedMsg(pid peer.ID, meta *pb.MsgMetadata) bool
	// AllMetadataPeerId return the peer.ID list of peers who linking us with metadata-only stat.
	AllMetadataPeerId() []peer.ID
	// GetChainPubSub return ChainPubSub instance of this PeerState.
	GetChainPubSub() *ChainPubSub
	// Subscribe listen a topic and register a topic sub message handler.
	Subscribe(topic string, handler handler.SubMsgHandler) error
	// Unsubscribe cancel listening a topic and unregister the topic sub message handler.
	Unsubscribe(topic string)
	// IsSubscribed return whether the topic has subscribed.
	IsSubscribed(topic string) bool
	// GetTopics return the list of topics that subscribed before.
	GetTopics() []string
	// GetAllMsgMetadata return the list of all the metadata in cache.
	GetAllMsgMetadata() []*pb.MsgMetadata
	// IHaveMessage return the list of the metadata of full app messages in cache.
	IHaveMessage() []*pb.MsgMetadata
	// IWantMessage return the list of metadata that peer having and not received by me.
	IWantMessage(peerHaveMessage []*pb.MsgMetadata) []*pb.MsgMetadata
	// GetMessageListWithMetadataList query full app messages with msg metadata infos.
	GetMessageListWithMetadataList(metadataList []*pb.MsgMetadata) []*pb.ApplicationMsg
	// ID return local peer.ID
	ID() peer.ID
}

PeerState stored subscription infos and cached necessary data of message published or received. It also provides some method to support gossip service to get all necessary info.

type TypeOfPeering

type TypeOfPeering int

TypeOfPeering is the type of peering stat.

const (
	// UnknownTypeOfPeering is unknown peering stat.
	UnknownTypeOfPeering TypeOfPeering = iota
	// FullMsgTypeOfPeering is full-msg peering stat.
	FullMsgTypeOfPeering
	// MetadataOnlyTypeOfPeering is metadata-only peering stat.
	MetadataOnlyTypeOfPeering
	// FanOutTypeOfPeering is fan-out peering stat.
	FanOutTypeOfPeering
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL