Documentation ¶
Overview ¶
The pubsub package provides facilities for the Publish/Subscribe pattern of message propagation, also known as overlay multicast. The implementation provides topic-based pubsub, with pluggable routing algorithms.
The main interface to the library is the PubSub object. You can construct this object with the following constructors:
- NewFloodSub creates an instance that uses the floodsub routing algorithm.
- NewGossipSub creates an instance that uses the gossipsub routing algorithm.
- NewRandomSub creates an instance that uses the randomsub routing algorithm.
In addition, there is a generic constructor that creates a pubsub instance with a custom PubSubRouter interface. This procedure is currently reserved for internal use within the package.
Once you have constructed a PubSub instance, you need to establish some connections to your peers; the implementation relies on ambient peer discovery, leaving bootstrap and active peer discovery up to the client.
To publish a message to some topic, use Publish; you don't need to be subscribed to the topic in order to publish.
To subscribe to a topic, use Subscribe; this will give you a subscription interface from which new messages can be pumped.
Index ¶
- Constants
- Variables
- type Blacklist
- type CacheEntry
- type FloodSubRouter
- func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID)
- func (fs *FloodSubRouter) Attach(p *PubSub)
- func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
- func (fs *FloodSubRouter) Join(topic string)
- func (fs *FloodSubRouter) Leave(topic string)
- func (fs *FloodSubRouter) Protocols() []protocol.ID
- func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (fs *FloodSubRouter) RemovePeer(peer.ID)
- type GossipSubRouter
- func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (gs *GossipSubRouter) Attach(p *PubSub)
- func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
- func (gs *GossipSubRouter) Join(topic string)
- func (gs *GossipSubRouter) Leave(topic string)
- func (gs *GossipSubRouter) Protocols() []protocol.ID
- func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (gs *GossipSubRouter) RemovePeer(p peer.ID)
- type LRUBlacklist
- type MapBlacklist
- type Message
- type MessageCache
- type Option
- type PubSub
- func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
- func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
- func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func (p *PubSub) BlacklistPeer(pid peer.ID)
- func (p *PubSub) GetTopics() []string
- func (p *PubSub) ListPeers(topic string) []peer.ID
- func (p *PubSub) Publish(topic string, data []byte) error
- func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
- func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
- func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)
- func (p *PubSub) UnregisterTopicValidator(topic string) error
- type PubSubNotif
- func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream)
- func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn)
- func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn)
- func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) ListenClose(n inet.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) OpenedStream(n inet.Network, s inet.Stream)
- type PubSubRouter
- type RPC
- type RandomSubRouter
- func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (rs *RandomSubRouter) Attach(p *PubSub)
- func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
- func (rs *RandomSubRouter) Join(topic string)
- func (rs *RandomSubRouter) Leave(topic string)
- func (rs *RandomSubRouter) Protocols() []protocol.ID
- func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (rs *RandomSubRouter) RemovePeer(p peer.ID)
- type SubOpt
- type Subscription
- type Validator
- type ValidatorOpt
Constants ¶
const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
)
const (
GossipSubID = protocol.ID("/meshsub/1.0.0")
)
const (
RandomSubID = protocol.ID("/randomsub/1.0.0")
)
const SignPrefix = "libp2p-pubsub:"
Variables ¶
var ( // overlay parameters GossipSubD = 6 GossipSubDlo = 4 GossipSubDhi = 12 // gossip parameters GossipSubHistoryLength = 5 GossipSubHistoryGossip = 3 // heartbeat interval GossipSubHeartbeatInitialDelay = 100 * time.Millisecond GossipSubHeartbeatInterval = 1 * time.Second // fanout ttl GossipSubFanoutTTL = 60 * time.Second )
var (
RandomSubD = 6
)
var (
TimeCacheDuration = 120 * time.Second
)
Functions ¶
This section is empty.
Types ¶
type Blacklist ¶
Blacklist is an interface for peer blacklisting.
func NewLRUBlacklist ¶
NewLRUBlacklist creates a new LRUBlacklist with capacity cap
type CacheEntry ¶
type CacheEntry struct {
// contains filtered or unexported fields
}
type FloodSubRouter ¶
type FloodSubRouter struct {
// contains filtered or unexported fields
}
func (*FloodSubRouter) Attach ¶
func (fs *FloodSubRouter) Attach(p *PubSub)
func (*FloodSubRouter) HandleRPC ¶
func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
func (*FloodSubRouter) Join ¶
func (fs *FloodSubRouter) Join(topic string)
func (*FloodSubRouter) Leave ¶
func (fs *FloodSubRouter) Leave(topic string)
func (*FloodSubRouter) Protocols ¶
func (fs *FloodSubRouter) Protocols() []protocol.ID
func (*FloodSubRouter) RemovePeer ¶
func (fs *FloodSubRouter) RemovePeer(peer.ID)
type GossipSubRouter ¶
type GossipSubRouter struct {
// contains filtered or unexported fields
}
GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.
func (*GossipSubRouter) Attach ¶
func (gs *GossipSubRouter) Attach(p *PubSub)
func (*GossipSubRouter) HandleRPC ¶
func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
func (*GossipSubRouter) Join ¶
func (gs *GossipSubRouter) Join(topic string)
func (*GossipSubRouter) Leave ¶
func (gs *GossipSubRouter) Leave(topic string)
func (*GossipSubRouter) Protocols ¶
func (gs *GossipSubRouter) Protocols() []protocol.ID
func (*GossipSubRouter) RemovePeer ¶
func (gs *GossipSubRouter) RemovePeer(p peer.ID)
type LRUBlacklist ¶
type LRUBlacklist struct {
// contains filtered or unexported fields
}
LRUBlacklist is a blacklist implementation using an LRU cache
func (LRUBlacklist) Add ¶
func (b LRUBlacklist) Add(p peer.ID)
type MapBlacklist ¶
MapBlacklist is a blacklist implementation using a perfect map
func (MapBlacklist) Add ¶
func (b MapBlacklist) Add(p peer.ID)
type MessageCache ¶
type MessageCache struct {
// contains filtered or unexported fields
}
func NewMessageCache ¶
func NewMessageCache(gossip, history int) *MessageCache
func (*MessageCache) GetGossipIDs ¶
func (mc *MessageCache) GetGossipIDs(topic string) []string
func (*MessageCache) Put ¶
func (mc *MessageCache) Put(msg *pb.Message)
func (*MessageCache) Shift ¶
func (mc *MessageCache) Shift()
type Option ¶
func WithBlacklist ¶
WithBlacklist provides an implementation of the blacklist; the default is a MapBlacklist
func WithMessageAuthor ¶
WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.
func WithMessageSigning ¶
WithMessageSigning enables or disables message signing (enabled by default).
func WithStrictSignatureVerification ¶
WithStrictSignatureVerification enforces message signing. If set, unsigned messages will be discarded.
This currently defaults to false but, as we transition to signing by default, will eventually default to true.
func WithValidateThrottle ¶
WithValidateThrottle sets the upper bound on the number of active validation goroutines.
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is the implementation of the pubsub system.
func NewFloodSub ¶
NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodsubWithProtocols ¶
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewGossipSub ¶
NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewRandomSub ¶
NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
func (*PubSub) BlacklistPeer ¶
BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (*PubSub) ListPeers ¶
ListPeers returns a list of peers we are connected to in the given topic.
func (*PubSub) RegisterTopicValidator ¶
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
RegisterTopicValidator registers a validator for topic.
func (*PubSub) Subscribe ¶
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
Subscribe returns a new Subscription for the given topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.
func (*PubSub) SubscribeByTopicDescriptor ¶
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)
SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.
func (*PubSub) UnregisterTopicValidator ¶
UnregisterTopicValidator removes a validator from a topic. Returns an error if there was no validator registered with the topic.
type PubSubNotif ¶
type PubSubNotif PubSub
func (*PubSubNotif) ClosedStream ¶
func (p *PubSubNotif) ClosedStream(n inet.Network, s inet.Stream)
func (*PubSubNotif) Disconnected ¶
func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn)
func (*PubSubNotif) ListenClose ¶
func (p *PubSubNotif) ListenClose(n inet.Network, _ ma.Multiaddr)
func (*PubSubNotif) OpenedStream ¶
func (p *PubSubNotif) OpenedStream(n inet.Network, s inet.Stream)
type PubSubRouter ¶
type PubSubRouter interface { // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID // Attach is invoked by the PubSub constructor to attach the router to a // freshly initialized PubSub instance. Attach(*PubSub) // AddPeer notifies the router that a new peer has been connected. AddPeer(peer.ID, protocol.ID) // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) // Publish is invoked to forward a new message that has been validated. Publish(peer.ID, *pb.Message) // Join notifies the router that we want to receive and forward messages in a topic. // It is invoked after the subscription announcement. Join(topic string) // Leave notifies the router that we are no longer interested in a topic. // It is invoked after the unsubscription announcement. Leave(topic string) }
PubSubRouter is the message router component of PubSub.
type RandomSubRouter ¶
type RandomSubRouter struct {
// contains filtered or unexported fields
}
RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects RandomSubD peers and forwards the message to them.
func (*RandomSubRouter) Attach ¶
func (rs *RandomSubRouter) Attach(p *PubSub)
func (*RandomSubRouter) HandleRPC ¶
func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
func (*RandomSubRouter) Join ¶
func (rs *RandomSubRouter) Join(topic string)
func (*RandomSubRouter) Leave ¶
func (rs *RandomSubRouter) Leave(topic string)
func (*RandomSubRouter) Protocols ¶
func (rs *RandomSubRouter) Protocols() []protocol.ID
func (*RandomSubRouter) RemovePeer ¶
func (rs *RandomSubRouter) RemovePeer(p peer.ID)
type SubOpt ¶
type SubOpt func(sub *Subscription) error
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
func (*Subscription) Cancel ¶
func (sub *Subscription) Cancel()
func (*Subscription) Topic ¶
func (sub *Subscription) Topic() string
type ValidatorOpt ¶
type ValidatorOpt func(addVal *addValReq) error
ValidatorOpt is an option for RegisterTopicValidator.
func WithValidatorConcurrency ¶
func WithValidatorConcurrency(n int) ValidatorOpt
WithValidatorConcurrency is an option that sets topic validator throttle.
func WithValidatorTimeout ¶
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt
WithValidatorTimeout is an option that sets the topic validator timeout.