Documentation ¶
Overview ¶
The pubsub package provides facilities for the Publish/Subscribe pattern of message propagation, also known as overlay multicast. The implementation provides topic-based pubsub, with pluggable routing algorithms.
The main interface to the library is the PubSub object. You can construct this object with the following constructors:
- NewFloodSub creates an instance that uses the floodsub routing algorithm.
- NewGossipSub creates an instance that uses the gossipsub routing algorithm.
- NewRandomSub creates an instance that uses the randomsub routing algorithm.
In addition, there is a generic constructor that creates a pubsub instance with a custom PubSubRouter interface. This procedure is currently reserved for internal use within the package.
Once you have constructed a PubSub instance, you need to establish some connections to your peers; the implementation relies on ambient peer discovery, leaving bootstrap and active peer discovery up to the client.
To publish a message to some topic, use Publish; you don't need to be subscribed to the topic in order to publish.
To subscribe to a topic, use Subscribe; this will give you a subscription interface from which new messages can be pumped.
Index ¶
- Constants
- Variables
- func DefaultMsgIdFn(pmsg *pb.Message) string
- func RandStringBytes(n int) string
- type BackoffConnectorFactory
- type Blacklist
- type CacheEntry
- type DiscoverOpt
- type EventTracer
- type EventType
- type FloodSubRouter
- func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (fs *FloodSubRouter) Attach(p *PubSub)
- func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
- func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
- func (fs *FloodSubRouter) Join(topic string)
- func (fs *FloodSubRouter) Leave(topic string)
- func (fs *FloodSubRouter) Protocols() []protocol.ID
- func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (fs *FloodSubRouter) RemovePeer(p peer.ID)
- type GossipSubRouter
- func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (gs *GossipSubRouter) Attach(p *PubSub)
- func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
- func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
- func (gs *GossipSubRouter) Join(topic string)
- func (gs *GossipSubRouter) Leave(topic string)
- func (gs *GossipSubRouter) Protocols() []protocol.ID
- func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (gs *GossipSubRouter) RemovePeer(p peer.ID)
- type JSONTracer
- type MapBlacklist
- type Message
- type MessageCache
- type MsgIdFunction
- type Option
- func WithBlacklist(b Blacklist) Option
- func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
- func WithEventTracer(tracer EventTracer) Option
- func WithMaxMessageSize(maxMessageSize int) Option
- func WithMessageAuthor(author peer.ID) Option
- func WithMessageIdFn(fn MsgIdFunction) Option
- func WithMessageSigning(enabled bool) Option
- func WithPeerOutboundQueueSize(size int) Option
- func WithStrictSignatureVerification(required bool) Option
- func WithValidateQueueSize(n int) Option
- func WithValidateThrottle(n int) Option
- func WithValidateWorkers(n int) Option
- type PBTracer
- type PeerEvent
- type PubOpt
- type PubSub
- func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
- func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
- func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func (p *PubSub) BlacklistPeer(pid peer.ID)
- func (p *PubSub) GetTopics() []string
- func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)
- func (p *PubSub) ListPeers(topic string) []peer.ID
- func (p *PubSub) NewEpoch()
- func (p *PubSub) Publish(topic string, data []byte) error
- func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
- func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)deprecated
- func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)deprecated
- func (p *PubSub) UnregisterTopicValidator(topic string) error
- type PubSubNotif
- func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)
- func (p *PubSubNotif) Connected(n network.Network, c network.Conn)
- func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)
- func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)
- func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)
- type PubSubRouter
- type PublishOptions
- type RPC
- type RandomSubRouter
- func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (rs *RandomSubRouter) Attach(p *PubSub)
- func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
- func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
- func (rs *RandomSubRouter) Join(topic string)
- func (rs *RandomSubRouter) Leave(topic string)
- func (rs *RandomSubRouter) Protocols() []protocol.ID
- func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message)
- func (rs *RandomSubRouter) RemovePeer(p peer.ID)
- type RemoteTracer
- type RouterReady
- type StemMessage
- type SubOpt
- type Subscription
- type TimeCachedBlacklist
- type Topic
- type TopicEventHandler
- type TopicEventHandlerOpt
- type TopicOpt
- type TopicOptions
- type Validator
- type ValidatorOpt
Constants ¶
const ( FloodSubID = protocol.ID("/floodsub/1.0.0") FloodSubTopicSearchSize = 5 )
const DandelionEpochInterval = 5 * time.Second
const DandelionStemCheckInterval = 500 * time.Millisecond
const DandelionStemExpireSpan = 60 * time.Second
const DefaultMaxMessageSize = 1 << 20
DefaultMaximumMessageSize is 1mb.
const (
GossipSubID = protocol.ID("/meshsub/1.0.0")
)
const (
RandomSubID = protocol.ID("/randomsub/1.0.0")
)
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
const SignPrefix = "libp2p-pubsub:"
Variables ¶
var ( // DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling DiscoveryPollInitialDelay = 0 * time.Millisecond // DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the // more peers are needed for any topic DiscoveryPollInterval = 1 * time.Second )
var ( // overlay parameters GossipSubD = 6 GossipSubDlo = 4 GossipSubDhi = 12 // gossip parameters GossipSubHistoryLength = 5 GossipSubHistoryGossip = 3 // heartbeat interval GossipSubHeartbeatInitialDelay = 100 * time.Millisecond GossipSubHeartbeatInterval = 1 * time.Second // fanout ttl GossipSubFanoutTTL = 60 * time.Second )
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")
ErrTopicClosed is returned if a Topic is utilized after it has been closed
var MinTraceBatchSize = 16
var (
RandomSubD = 6
)
var (
TimeCacheDuration = 120 * time.Second
)
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
Functions ¶
func DefaultMsgIdFn ¶
msgID returns a unique ID of the passed Message
func RandStringBytes ¶
Types ¶
type BackoffConnectorFactory ¶
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type CacheEntry ¶
type CacheEntry struct {
// contains filtered or unexported fields
}
type DiscoverOpt ¶
type DiscoverOpt func(*discoverOptions) error
func WithDiscoverConnector ¶
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt
WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func WithDiscoveryOpts ¶
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt
WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
type EventTracer ¶
type EventTracer interface {
Trace(evt *pb.TraceEvent)
}
Generic event tracer interface
type FloodSubRouter ¶
type FloodSubRouter struct {
// contains filtered or unexported fields
}
func (*FloodSubRouter) Attach ¶
func (fs *FloodSubRouter) Attach(p *PubSub)
func (*FloodSubRouter) EnoughPeers ¶
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
func (*FloodSubRouter) HandleRPC ¶
func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
func (*FloodSubRouter) Join ¶
func (fs *FloodSubRouter) Join(topic string)
func (*FloodSubRouter) Leave ¶
func (fs *FloodSubRouter) Leave(topic string)
func (*FloodSubRouter) Protocols ¶
func (fs *FloodSubRouter) Protocols() []protocol.ID
func (*FloodSubRouter) RemovePeer ¶
func (fs *FloodSubRouter) RemovePeer(p peer.ID)
type GossipSubRouter ¶
type GossipSubRouter struct {
// contains filtered or unexported fields
}
GossipSubRouter is a router that implements the gossipsub protocol. For each topic we have joined, we maintain an overlay through which messages flow; this is the mesh map. For each topic we publish to without joining, we maintain a list of peers to use for injecting our messages in the overlay with stable routes; this is the fanout map. Fanout peer lists are expired if we don't publish any messages to their topic for GossipSubFanoutTTL.
func (*GossipSubRouter) Attach ¶
func (gs *GossipSubRouter) Attach(p *PubSub)
func (*GossipSubRouter) EnoughPeers ¶
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
func (*GossipSubRouter) HandleRPC ¶
func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
func (*GossipSubRouter) Join ¶
func (gs *GossipSubRouter) Join(topic string)
func (*GossipSubRouter) Leave ¶
func (gs *GossipSubRouter) Leave(topic string)
func (*GossipSubRouter) Protocols ¶
func (gs *GossipSubRouter) Protocols() []protocol.ID
func (*GossipSubRouter) RemovePeer ¶
func (gs *GossipSubRouter) RemovePeer(p peer.ID)
type JSONTracer ¶
type JSONTracer struct {
// contains filtered or unexported fields
}
JSONTracer is a tracer that writes events to a file, encoded in ndjson.
func NewJSONTracer ¶
func NewJSONTracer(file string) (*JSONTracer, error)
NewJsonTracer creates a new JSONTracer writing traces to file.
func OpenJSONTracer ¶
OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func (*JSONTracer) Trace ¶
func (t *JSONTracer) Trace(evt *pb.TraceEvent)
type MapBlacklist ¶
MapBlacklist is a blacklist implementation using a perfect map
func (MapBlacklist) Add ¶
func (b MapBlacklist) Add(p peer.ID)
type MessageCache ¶
type MessageCache struct {
// contains filtered or unexported fields
}
func NewMessageCache ¶
func NewMessageCache(gossip, history int) *MessageCache
NewMessageCache creates a sliding window cache that remembers messages for as long as `history` slots.
When queried for messages to advertise, the cache only returns messages in the last `gossip` slots.
The `gossip` parameter must be smaller or equal to `history`, or this function will panic.
The slack between `gossip` and `history` accounts for the reaction time between when a message is advertised via IHAVE gossip, and the peer pulls it via an IWANT command.
func (*MessageCache) GetGossipIDs ¶
func (mc *MessageCache) GetGossipIDs(topic string) []string
func (*MessageCache) Put ¶
func (mc *MessageCache) Put(msg *pb.Message)
func (*MessageCache) SetMsgIdFn ¶
func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction)
func (*MessageCache) Shift ¶
func (mc *MessageCache) Shift()
type MsgIdFunction ¶
MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any implementation of this function by configuring it with the Option from WithMessageIdFn.
type Option ¶
func WithBlacklist ¶
WithBlacklist provides an implementation of the blacklist; the default is a MapBlacklist
func WithDiscovery ¶
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
func WithEventTracer ¶
func WithEventTracer(tracer EventTracer) Option
WithEventTracer provides a tracer for the pubsub system
func WithMaxMessageSize ¶
WithMaxMessageSize sets the global maximum message size for pubsub wire messages. The default value is 1MiB (DefaultMaxMessageSize).
Observe the following warnings when setting this option.
WARNING #1: Make sure to change the default protocol prefixes for floodsub (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining the public default network, which uses the default max message size, and therefore will cause messages to be dropped.
WARNING #2: Reducing the default max message limit is fine, if you are certain that your application messages will not exceed the new limit. However, be wary of increasing the limit, as pubsub networks are naturally write-amplifying, i.e. for every message we receive, we send D copies of the message to our peers. If those messages are large, the bandwidth requirements will grow linearly. Note that propagation is sent on the uplink, which traditionally is more constrained than the downlink. Instead, consider out-of-band retrieval for large messages, by sending a CID (Content-ID) or another type of locator, such that messages can be fetched on-demand, rather than being pushed proactively. Under this design, you'd use the pubsub layer as a signalling system, rather than a data delivery system.
func WithMessageAuthor ¶
WithMessageAuthor sets the author for outbound messages to the given peer ID (defaults to the host's ID). If message signing is enabled, the private key must be available in the host's peerstore.
func WithMessageIdFn ¶
func WithMessageIdFn(fn MsgIdFunction) Option
WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message. The default ID function is DefaultMsgIdFn (concatenate source and seq nr.), but it can be customized to e.g. the hash of the message.
func WithMessageSigning ¶
WithMessageSigning enables or disables message signing (enabled by default).
func WithPeerOutboundQueueSize ¶
WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer We start dropping messages to a peer if the outbound queue if full
func WithStrictSignatureVerification ¶
WithStrictSignatureVerification is an option to enable or disable strict message signing. When enabled (which is the default), unsigned messages will be discarded.
func WithValidateQueueSize ¶
WithValidateQueueSize sets the buffer of validate queue. Defaults to 32. When queue is full, validation is throttled and new messages are dropped.
func WithValidateThrottle ¶
WithValidateThrottle sets the upper bound on the number of active validation goroutines across all topics. The default is 8192.
func WithValidateWorkers ¶
WithValidateWorkers sets the number of synchronous validation worker goroutines. Defaults to NumCPU.
The synchronous validation workers perform signature validation, apply inline user validators, and schedule asynchronous user validators. You can adjust this parameter to devote less cpu time to synchronous validation.
type PBTracer ¶
type PBTracer struct {
// contains filtered or unexported fields
}
PBTracer is a tracer that writes events to a file, as delimited protobufs.
func NewPBTracer ¶
func OpenPBTracer ¶
OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func (*PBTracer) Trace ¶
func (t *PBTracer) Trace(evt *pb.TraceEvent)
type PubOpt ¶
type PubOpt func(pub *PublishOptions) error
func WithReadiness ¶
func WithReadiness(ready RouterReady) PubOpt
WithReadiness returns a publishing option for only publishing when the router is ready. This option is not useful unless PubSub is also using WithDiscovery
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is the implementation of the pubsub system.
func NewFloodSub ¶
NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodsubWithProtocols ¶
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewGossipSub ¶
NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewRandomSub ¶
NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
func (*PubSub) BlacklistPeer ¶
BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (*PubSub) Join ¶
Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if the Topic handle already exists.
func (*PubSub) ListPeers ¶
ListPeers returns a list of peers we are connected to in the given topic.
func (*PubSub) RegisterTopicValidator ¶
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error
RegisterTopicValidator registers a validator for topic. By default validators are asynchronous, which means they will run in a separate goroutine. The number of active goroutines is controlled by global and per topic validator throttles; if it exceeds the throttle threshold, messages will be dropped.
func (*PubSub) Subscribe
deprecated
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
Subscribe returns a new Subscription for the given topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.
Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (*PubSub) SubscribeByTopicDescriptor
deprecated
func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error)
SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor.
Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (*PubSub) UnregisterTopicValidator ¶
UnregisterTopicValidator removes a validator from a topic. Returns an error if there was no validator registered with the topic.
type PubSubNotif ¶
type PubSubNotif PubSub
func (*PubSubNotif) ClosedStream ¶
func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream)
func (*PubSubNotif) Disconnected ¶
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn)
func (*PubSubNotif) ListenClose ¶
func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr)
func (*PubSubNotif) OpenedStream ¶
func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream)
type PubSubRouter ¶
type PubSubRouter interface { // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID // Attach is invoked by the PubSub constructor to attach the router to a // freshly initialized PubSub instance. Attach(*PubSub) // AddPeer notifies the router that a new peer has been connected. AddPeer(peer.ID, protocol.ID) // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) // EnoughPeers returns whether the router needs more peers before it's ready to publish new records. // Suggested (if greater than 0) is a suggested number of peers that the router should need. EnoughPeers(topic string, suggested int) bool // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) // Publish is invoked to forward a new message that has been validated. Publish(peer.ID, *pb.Message) // Join notifies the router that we want to receive and forward messages in a topic. // It is invoked after the subscription announcement. Join(topic string) // Leave notifies the router that we are no longer interested in a topic. // It is invoked after the unsubscription announcement. Leave(topic string) }
PubSubRouter is the message router component of PubSub.
type PublishOptions ¶
type PublishOptions struct {
// contains filtered or unexported fields
}
type RandomSubRouter ¶
type RandomSubRouter struct {
// contains filtered or unexported fields
}
RandomSubRouter is a router that implements a random propagation strategy. For each message, it selects RandomSubD peers and forwards the message to them.
func (*RandomSubRouter) Attach ¶
func (rs *RandomSubRouter) Attach(p *PubSub)
func (*RandomSubRouter) EnoughPeers ¶
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
func (*RandomSubRouter) HandleRPC ¶
func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
func (*RandomSubRouter) Join ¶
func (rs *RandomSubRouter) Join(topic string)
func (*RandomSubRouter) Leave ¶
func (rs *RandomSubRouter) Leave(topic string)
func (*RandomSubRouter) Protocols ¶
func (rs *RandomSubRouter) Protocols() []protocol.ID
func (*RandomSubRouter) RemovePeer ¶
func (rs *RandomSubRouter) RemovePeer(p peer.ID)
type RemoteTracer ¶
type RemoteTracer struct {
// contains filtered or unexported fields
}
RemoteTracer is a tracer that sends trace events to a remote peer
func NewRemoteTracer ¶
NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func (*RemoteTracer) Trace ¶
func (t *RemoteTracer) Trace(evt *pb.TraceEvent)
type RouterReady ¶
type RouterReady func(rt PubSubRouter, topic string) (bool, error)
RouterReady is a function that decides if a router is ready to publish
func MinTopicSize ¶
func MinTopicSize(size int) RouterReady
MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
type StemMessage ¶
type StemMessage struct {
// contains filtered or unexported fields
}
type SubOpt ¶
type SubOpt func(sub *Subscription) error
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given Topic.
func (*Subscription) Cancel ¶
func (sub *Subscription) Cancel()
Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe announcement to the network.
func (*Subscription) Next ¶
func (sub *Subscription) Next(ctx context.Context) (*Message, error)
Next returns the next message in our subscription
func (*Subscription) Topic ¶
func (sub *Subscription) Topic() string
Topic returns the topic string associated with the Subscription
type TimeCachedBlacklist ¶
TimeCachedBlacklist is a blacklist implementation using a time cache
func (*TimeCachedBlacklist) Add ¶
func (b *TimeCachedBlacklist) Add(p peer.ID)
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic is the handle for a pubsub topic
func (*Topic) Close ¶
Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. Does not error if the topic is already closed.
func (*Topic) EventHandler ¶
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
EventHandler creates a handle for topic specific events Multiple event handlers may be created and will operate independently of each other
func (*Topic) Subscribe ¶
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)
Subscribe returns a new Subscription for the topic. Note that subscription is not an instanteneous operation. It may take some time before the subscription is processed by the pubsub main loop and propagated to our peers.
type TopicEventHandler ¶
type TopicEventHandler struct {
// contains filtered or unexported fields
}
TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
func (*TopicEventHandler) Cancel ¶
func (t *TopicEventHandler) Cancel()
Cancel closes the topic event handler
func (*TopicEventHandler) NextPeerEvent ¶
func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)
NextPeerEvent returns the next event regarding subscribed peers Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. Unless a peer both Joins and Leaves before NextPeerEvent emits either event all events will eventually be received from NextPeerEvent.
type TopicEventHandlerOpt ¶
type TopicEventHandlerOpt func(t *TopicEventHandler) error
type TopicOptions ¶
type TopicOptions struct{}
type ValidatorOpt ¶
type ValidatorOpt func(addVal *addValReq) error
ValidatorOpt is an option for RegisterTopicValidator.
func WithValidatorConcurrency ¶
func WithValidatorConcurrency(n int) ValidatorOpt
WithValidatorConcurrency is an option that sets the topic validator throttle. This controls the number of active validation goroutines for the topic; the default is 1024.
func WithValidatorInline ¶
func WithValidatorInline(inline bool) ValidatorOpt
WithValidatorInline is an option that sets the validation disposition to synchronous: it will be executed inline in validation front-end, without spawning a new goroutine. This is suitable for simple or cpu-bound validators that do not block.
func WithValidatorTimeout ¶
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt
WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators.