Documentation ¶
Overview ¶
The pubsub package provides facilities for the Publish/Subscribe pattern of message propagation, also known as overlay multicast. The implementation provides topic-based pubsub, with pluggable routing algorithms.
The main interface to the library is the PubSub object. You can construct this object with the following constructors:
- NewFloodSub creates an instance that uses the floodsub routing algorithm.
- NewGossipSub creates an instance that uses the gossipsub routing algorithm.
- NewRandomSub creates an instance that uses the randomsub routing algorithm.
In addition, there is a generic constructor that creates a pubsub instance with a custom PubSubRouter interface. This procedure is currently reserved for internal use within the package.
Once you have constructed a PubSub instance, you need to establish some connections to your peers; the implementation relies on ambient peer discovery, leaving bootstrap and active peer discovery up to the client.
To publish a message to some topic, use Publish; you don't need to be subscribed to the topic in order to publish.
To subscribe to a topic, use Subscribe; this will give you a subscription interface from which new messages can be pumped.
Index ¶
- Constants
- Variables
- type BackoffConnectorFactory
- type Blacklist
- type CacheEntry
- type DiscoverOpt
- type EventType
- type FloodSubRouter
- func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID)
- func (fs *FloodSubRouter) Attach(p *PubSub)
- func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
- func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
- func (fs *FloodSubRouter) Join(topic string)
- func (fs *FloodSubRouter) Leave(topic string)
- func (fs *FloodSubRouter) Protocols() []protocol.ID
- func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (fs *FloodSubRouter) RemovePeer(peer.ID)
- type GossipSubRouter
- func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (gs *GossipSubRouter) Attach(p *PubSub)
- func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
- func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
- func (gs *GossipSubRouter) Join(topic string)
- func (gs *GossipSubRouter) Leave(topic string)
- func (gs *GossipSubRouter) Protocols() []protocol.ID
- func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (gs *GossipSubRouter) RemovePeer(p peer.ID)
- type LRUBlacklist
- type MapBlacklist
- type Message
- type MessageCache
- type Option
- func WithBlacklist(b Blacklist) Option
- func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
- func WithMessageAuthor(author peer.ID) Option
- func WithMessageSigning(enabled bool) Option
- func WithStrictSignatureVerification(required bool) Option
- func WithValidateThrottle(n int) Option
- func WithValidateWorkers(n int) Option
- type PeerEvent
- type PubOpt
- type PubSub
- func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
- func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
- func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func (p *PubSub) BlacklistPeer(pid peer.ID)
- func (p *PubSub) GetTopics() []string
- func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)
- func (p *PubSub) ListPeers(topic string) []peer.ID
- func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) errordeprecated
- func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
- func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)deprecated
- func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)deprecated
- func (p *PubSub) UnregisterTopicValidator(topic string) error
- type PubSubNotif
- func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)
- func (p *PubSubNotif) Connected(n network.Network, c network.Conn)
- func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)
- func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)
- type PubSubRouter
- type PublishOptions
- type RPC
- type RandomSubRouter
- func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (rs *RandomSubRouter) Attach(p *PubSub)
- func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
- func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
- func (rs *RandomSubRouter) Join(topic string)
- func (rs *RandomSubRouter) Leave(topic string)
- func (rs *RandomSubRouter) Protocols() []protocol.ID
- func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (rs *RandomSubRouter) RemovePeer(p peer.ID)
- type RouterReady
- type SubOpt
- type Subscription
- type Topic
- type TopicEventHandler
- type TopicEventHandlerOpt
- type TopicOpt
- type TopicOptions
- type Validator
- type ValidatorOpt
Constants ¶
const ( FloodSubID = protocol.ID("/floodsub/1.0.0") FloodSubTopicSearchSize = 5 )
const (
GossipSubID = protocol.ID("/meshsub/1.0.0")
)
const (
RandomSubID = protocol.ID("/randomsub/1.0.0")
)
const SignPrefix = "libp2p-pubsub:"
Variables ¶
var ( // DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling DiscoveryPollInitialDelay = 0 * time.Millisecond // DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the // more peers are needed for any topic DiscoveryPollInterval = 1 * time.Second )
var ( // overlay parameters GossipSubD = 6 GossipSubDlo = 4 GossipSubDhi = 12 // gossip parameters GossipSubHistoryLength = 5 GossipSubHistoryGossip = 3 // heartbeat interval GossipSubHeartbeatInitialDelay = 100 * time.Millisecond GossipSubHeartbeatInterval = 1 * time.Second // fanout ttl GossipSubFanoutTTL = 60 * time.Second )
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")
ErrTopicClosed is returned if a Topic is utilized after it has been closed
var (
RandomSubD = 6
)
var (
TimeCacheDuration = 120 * time.Second
)
Functions ¶
This section is empty.
Types ¶
type BackoffConnectorFactory ¶ added in v0.2.0
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type Blacklist ¶
Blacklist is an interface for peer blacklisting.
func NewLRUBlacklist ¶
NewLRUBlacklist creates a new LRUBlacklist with capacity cap
type CacheEntry ¶
type CacheEntry struct {
// contains filtered or unexported fields
}
type DiscoverOpt ¶ added in v0.2.0
type DiscoverOpt func(*discoverOptions) error
func WithDiscoverConnector ¶ added in v0.2.0
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt
WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func WithDiscoveryOpts ¶ added in v0.2.0
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt
WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
type FloodSubRouter ¶
type FloodSubRouter struct {
// contains filtered or unexported fields
}
func (*FloodSubRouter) Attach ¶
func (fs *FloodSubRouter) Attach(p *PubSub)
func (*FloodSubRouter) EnoughPeers ¶ added in v0.2.0
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
func (*FloodSubRouter) HandleRPC ¶
func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
func (*FloodSubRouter) Join ¶
func (fs *FloodSubRouter) Join(topic string)
func (*FloodSubRouter) Leave ¶
func (fs *FloodSubRouter) Leave(topic string)
func (*FloodSubRouter) Protocols ¶
func (fs *FloodSubRouter) Protocols() []protocol.ID
func (*FloodSubRouter) RemovePeer ¶
func (fs *FloodSubRouter) RemovePeer(peer.ID)
type GossipSubRouter ¶
type GossipSubRouter struct {
// contains filtered or unexported fields
}
GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.
func (*GossipSubRouter) Attach ¶
func (gs *GossipSubRouter) Attach(p *PubSub)
func (*GossipSubRouter) EnoughPeers ¶ added in v0.2.0
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
func (*GossipSubRouter) HandleRPC ¶
func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
func (*GossipSubRouter) Join ¶
func (gs *GossipSubRouter) Join(topic string)
func (*GossipSubRouter) Leave ¶
func (gs *GossipSubRouter) Leave(topic string)
func (*GossipSubRouter) Protocols ¶
func (gs *GossipSubRouter) Protocols() []protocol.ID
func (*GossipSubRouter) RemovePeer ¶
func (gs *GossipSubRouter) RemovePeer(p peer.ID)
type LRUBlacklist ¶
type LRUBlacklist struct {
// contains filtered or unexported fields
}
LRUBlacklist is a blacklist implementation using an LRU cache
func (LRUBlacklist) Add ¶
func (b LRUBlacklist) Add(p peer.ID)
type MapBlacklist ¶
MapBlacklist is a blacklist implementation using a perfect map
func (MapBlacklist) Add ¶
func (b MapBlacklist) Add(p peer.ID)
type MessageCache ¶
type MessageCache struct {
// contains filtered or unexported fields
}
func NewMessageCache ¶
func NewMessageCache(gossip, history int) *MessageCache
NewMessageCache creates a sliding window cache that remembers messages for as long as `history` slots.
When queried for messages to advertise, the cache only returns messages in the last `gossip` slots.
The `gossip` parameter must be smaller or equal to `history`, or this function will panic.
The slack between `gossip` and `history` accounts for the reaction time between when a message is advertised via IHAVE gossip, and the peer pulls it via an IWANT command.
func (*MessageCache) GetGossipIDs ¶
func (mc *MessageCache) GetGossipIDs(topic string) []string
func (*MessageCache) Put ¶
func (mc *MessageCache) Put(msg *pb.Message)
func (*MessageCache) Shift ¶
func (mc *MessageCache) Shift()
type Option ¶
func WithBlacklist ¶
WithBlacklist provides an implementation of the blacklist; the default is a MapBlacklist
func WithDiscovery ¶ added in v0.2.0
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
func WithMessageAuthor ¶
WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.
func WithMessageSigning ¶
WithMessageSigning enables or disables message signing (enabled by default).
func WithStrictSignatureVerification ¶
WithStrictSignatureVerification is an option to enable or disable strict message signing. When enabled (which is the default), unsigned messages will be discarded.
func WithValidateThrottle ¶
WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192.
func WithValidateWorkers ¶ added in v0.0.3
WithValidateWorkers sets the number of synchronous validation worker goroutines. Defaults to NumCPU.
The synchronous validation workers perform signature validation, apply inline user validators, and schedule asynchronous user validators. You can adjust this parameter to devote less cpu time to synchronous validation.
type PubOpt ¶ added in v0.2.0
type PubOpt func(pub *PublishOptions) error
func WithReadiness ¶ added in v0.2.0
func WithReadiness(ready RouterReady) PubOpt
WithReadiness returns a publishing option for only publishing when the router is ready. This option is not useful unless PubSub is also using WithDiscovery
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is the implementation of the pubsub system.
func NewFloodSub ¶
NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodsubWithProtocols ¶
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewGossipSub ¶
NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewRandomSub ¶
NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
func (*PubSub) BlacklistPeer ¶
BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (*PubSub) Join ¶ added in v0.2.0
Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if the Topic handle already exists.
func (*PubSub) ListPeers ¶
ListPeers returns a list of peers we are connected to in the given topic.
func (*PubSub) RegisterTopicValidator ¶
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
RegisterTopicValidator registers a validator for topic. By default validators are asynchronous, which means they will run in a separate goroutine. The number of active goroutines is controlled by global and per topic validator throttles; if it exceeds the throttle threshold, messages will be dropped.
func (*PubSub) Subscribe
deprecated
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
Subscribe returns a new Subscription for the given topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.
Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (*PubSub) SubscribeByTopicDescriptor
deprecated
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)
SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.
Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (*PubSub) UnregisterTopicValidator ¶
UnregisterTopicValidator removes a validator from a topic. Returns an error if there was no validator registered with the topic.
type PubSubNotif ¶
type PubSubNotif PubSub
func (*PubSubNotif) ClosedStream ¶
func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)
func (*PubSubNotif) Disconnected ¶
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)
func (*PubSubNotif) ListenClose ¶
func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)
func (*PubSubNotif) OpenedStream ¶
func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)
type PubSubRouter ¶
type PubSubRouter interface { // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID // Attach is invoked by the PubSub constructor to attach the router to a // freshly initialized PubSub instance. Attach(*PubSub) // AddPeer notifies the router that a new peer has been connected. AddPeer(peer.ID, protocol.ID) // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) // EnoughPeers returns whether the router needs more peers before it's ready to publish new records. // Suggested (if greater than 0) is a suggested number of peers that the router should need. EnoughPeers(topic string, suggested int) bool // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) // Publish is invoked to forward a new message that has been validated. Publish(peer.ID, *pb.Message) // Join notifies the router that we want to receive and forward messages in a topic. // It is invoked after the subscription announcement. Join(topic string) // Leave notifies the router that we are no longer interested in a topic. // It is invoked after the unsubscription announcement. Leave(topic string) }
PubSubRouter is the message router component of PubSub.
type PublishOptions ¶ added in v0.2.0
type PublishOptions struct {
// contains filtered or unexported fields
}
type RandomSubRouter ¶
type RandomSubRouter struct {
// contains filtered or unexported fields
}
RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects RandomSubD peers and forwards the message to them.
func (*RandomSubRouter) Attach ¶
func (rs *RandomSubRouter) Attach(p *PubSub)
func (*RandomSubRouter) EnoughPeers ¶ added in v0.2.0
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
func (*RandomSubRouter) HandleRPC ¶
func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
func (*RandomSubRouter) Join ¶
func (rs *RandomSubRouter) Join(topic string)
func (*RandomSubRouter) Leave ¶
func (rs *RandomSubRouter) Leave(topic string)
func (*RandomSubRouter) Protocols ¶
func (rs *RandomSubRouter) Protocols() []protocol.ID
func (*RandomSubRouter) RemovePeer ¶
func (rs *RandomSubRouter) RemovePeer(p peer.ID)
type RouterReady ¶ added in v0.2.0
type RouterReady func(rt PubSubRouter, topic string) (bool, error)
RouterReady is a function that decides if a router is ready to publish
func MinTopicSize ¶ added in v0.2.0
func MinTopicSize(size int) RouterReady
MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
type SubOpt ¶
type SubOpt func(sub *Subscription) error
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given Topic.
func (*Subscription) Cancel ¶
func (sub *Subscription) Cancel()
Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe announcement to the network.
func (*Subscription) Next ¶
func (sub *Subscription) Next(ctx context.Context) (*Message, error)
Next returns the next message in our subscription
func (*Subscription) Topic ¶
func (sub *Subscription) Topic() string
Topic returns the topic string associated with the Subscription
type Topic ¶ added in v0.2.0
type Topic struct {
// contains filtered or unexported fields
}
Topic is the handle for a pubsub topic
func (*Topic) Close ¶ added in v0.2.0
Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. Does not error if the topic is already closed.
func (*Topic) EventHandler ¶ added in v0.2.0
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
EventHandler creates a handle for topic specific events Multiple event handlers may be created and will operate independently of each other
func (*Topic) ListPeers ¶ added in v0.2.0
ListPeers returns a list of peers we are connected to in the given topic.
func (*Topic) Subscribe ¶ added in v0.2.0
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)
Subscribe returns a new Subscription for the topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.
type TopicEventHandler ¶ added in v0.2.0
type TopicEventHandler struct {
// contains filtered or unexported fields
}
TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
func (*TopicEventHandler) Cancel ¶ added in v0.2.0
func (t *TopicEventHandler) Cancel()
Cancel closes the topic event handler
func (*TopicEventHandler) NextPeerEvent ¶ added in v0.2.0
func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)
NextPeerEvent returns the next event regarding subscribed peers Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. Unless a peer both Joins and Leaves before NextPeerEvent emits either event all events will eventually be received from NextPeerEvent.
type TopicEventHandlerOpt ¶ added in v0.2.0
type TopicEventHandlerOpt func(t *TopicEventHandler) error
type TopicOptions ¶ added in v0.2.0
type TopicOptions struct{}
type ValidatorOpt ¶
type ValidatorOpt func(addVal *addValReq) error
ValidatorOpt is an option for RegisterTopicValidator.
func WithValidatorConcurrency ¶
func WithValidatorConcurrency(n int) ValidatorOpt
WithValidatorConcurrency is an option that sets the topic validator throttle. This controls the number of active validation goroutines for the topic; the default is 1024.
func WithValidatorInline ¶ added in v0.0.3
func WithValidatorInline(inline bool) ValidatorOpt
WithValidatorInline is an option that sets the validation disposition to synchronous: it will be executed inline in validation front-end, without spawning a new goroutine. This is suitable for simple or cpu-bound validators that do not block.
func WithValidatorTimeout ¶
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt
WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators.