Documentation ¶
Index ¶
- func Consume(items [][]byte, w io.WriteCloser) bool
- func CreateGossipStreamer() (*EventBus, *GossipStreamer)
- type Adder
- type Broker
- type CallbackListener
- type ChanListener
- type Collector
- type EventBus
- func (bus *EventBus) AddDefaultTopic(topic topics.Topic)
- func (bus *EventBus) Publish(topic topics.Topic, messageBuffer *bytes.Buffer)
- func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32
- func (bus *EventBus) SubscribeDefault(listener Listener) uint32
- func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)
- type GossipStreamer
- type Listener
- type ListenerType
- type Multicaster
- type Preprocessor
- type ProcessorRegistry
- type Publisher
- type SafeProcessorRegistry
- func (p *SafeProcessorRegistry) Preprocess(topic topics.Topic, messageBuffer *bytes.Buffer) error
- func (p *SafeProcessorRegistry) Register(topic topics.Topic, preprocessors ...Preprocessor) []uint32
- func (p *SafeProcessorRegistry) RemoveAllProcessors()
- func (p *SafeProcessorRegistry) RemoveProcessor(topic topics.Topic, id uint32)
- func (p *SafeProcessorRegistry) RemoveProcessors(topic topics.Topic)
- type SimpleStreamer
- type StreamListener
- type Subscriber
- type TopicListener
- func NewCallbackTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener
- func NewChanTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener
- func NewTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic, ...) TopicListener
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume(items [][]byte, w io.WriteCloser) bool
Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation
func CreateGossipStreamer ¶
func CreateGossipStreamer() (*EventBus, *GossipStreamer)
CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
Types ¶
type Adder ¶
type Adder struct {
// contains filtered or unexported fields
}
Adder is a very simple Preprocessor for test purposes
type Broker ¶
type Broker interface { ProcessorRegistry Subscriber Publisher }
Broker is an Publisher and an Subscriber
type CallbackListener ¶
type CallbackListener struct {
// contains filtered or unexported fields
}
CallbackListener subscribes using callbacks
func (*CallbackListener) Close ¶
func (c *CallbackListener) Close()
Close as part of the Listener method
type ChanListener ¶
type ChanListener struct {
// contains filtered or unexported fields
}
ChanListener dispatches a message using a channel
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets
func NewSimpleCollector ¶
NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel
type EventBus ¶
type EventBus struct { ProcessorRegistry // contains filtered or unexported fields }
EventBus - box for listeners and callbacks.
func CreateFrameStreamer ¶
func CreateFrameStreamer(topic topics.Topic) (*EventBus, io.WriteCloser)
CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
func (*EventBus) AddDefaultTopic ¶
AddDefaultTopic adds a topic to the default multiListener
func (*EventBus) SubscribeDefault ¶
SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture)
type GossipStreamer ¶
type GossipStreamer struct {
*SimpleStreamer
}
func NewGossipStreamer ¶
func NewGossipStreamer(magic protocol.Magic) *GossipStreamer
func (*GossipStreamer) Read ¶
func (ms *GossipStreamer) Read() ([]byte, error)
func (*GossipStreamer) SeenTopics ¶
func (ms *GossipStreamer) SeenTopics() []topics.Topic
SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.
type Listener ¶
type Listener interface { // Notify a listener of a new message Notify(bytes.Buffer) error // Close the listener Close() }
Listener publishes a byte array that subscribers of the EventBus can use
func NewCallbackListener ¶
NewCallbackListener creates a callback based dispatcher
func NewChanListener ¶
NewChanListener creates a channel based dispatcher
func NewStreamListener ¶
func NewStreamListener(w io.WriteCloser) Listener
NewStreamListener creates a new StreamListener
type ListenerType ¶
type ListenerType int
ListenerType is the enum of the type of available listeners
const ( // ChannelType is the Listener type that relies on channels to communicate ChannelType ListenerType = iota // CallbackType is the Listener type that relies on Callbacks to notify // messages CallbackType )
type Multicaster ¶
Multicaster allows for a single Listener to listen to multiple topics
type Preprocessor ¶
Preprocessor is for mutating a message before it gets notified to the subscribers of a topic
type ProcessorRegistry ¶
type ProcessorRegistry interface { Preprocess(topics.Topic, *bytes.Buffer) error Register(topics.Topic, ...Preprocessor) []uint32 RemoveProcessor(topics.Topic, uint32) RemoveProcessors(topics.Topic) RemoveAllProcessors() }
ProcessorRegistry is a registry of TopicProcessor
func NewSafeProcessorRegistry ¶
func NewSafeProcessorRegistry() ProcessorRegistry
NewSafeProcessorRegistry creates a new Preprocessor
type SafeProcessorRegistry ¶
SafeProcessorRegistry allows registration of preprocessors to be applied to incoming Event on a specific topic It is threadsafe
func (*SafeProcessorRegistry) Preprocess ¶
Preprocess applies to a message all preprocessors registered for a topic
func (*SafeProcessorRegistry) Register ¶
func (p *SafeProcessorRegistry) Register(topic topics.Topic, preprocessors ...Preprocessor) []uint32
Register creates a new set of TopicProcessor to a specified topic.
func (*SafeProcessorRegistry) RemoveAllProcessors ¶
func (p *SafeProcessorRegistry) RemoveAllProcessors()
RemoveAllProcessors removes all TopicProcessors from all topics
func (*SafeProcessorRegistry) RemoveProcessor ¶
func (p *SafeProcessorRegistry) RemoveProcessor(topic topics.Topic, id uint32)
RemoveProcessor removes all TopicProcessor previously registered on a given topic using its ID
func (*SafeProcessorRegistry) RemoveProcessors ¶
func (p *SafeProcessorRegistry) RemoveProcessors(topic topics.Topic)
RemoveProcessors removes all TopicProcessor from a topic
type SimpleStreamer ¶
type SimpleStreamer struct { *bufio.Reader *bufio.Writer // contains filtered or unexported fields }
SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.
func NewSimpleStreamer ¶
func NewSimpleStreamer(magic protocol.Magic) *SimpleStreamer
NewSimpleStreamer returns an initialized SimpleStreamer.
func (*SimpleStreamer) Close ¶
func (ms *SimpleStreamer) Close() error
Close implements io.WriteCloser.
func (*SimpleStreamer) Read ¶
func (ms *SimpleStreamer) Read() ([]byte, error)
type StreamListener ¶
type StreamListener struct {
// contains filtered or unexported fields
}
StreamListener uses a ring buffer to dispatch messages
type Subscriber ¶
type Subscriber interface { Subscribe(topics.Topic, Listener) uint32 Unsubscribe(topics.Topic, uint32) }
Subscriber subscribes a channel to Event notifications on a specific topic
type TopicListener ¶
type TopicListener interface {
Quit()
}
TopicListener is a helper interface to connect an EventCollector to the EventBus. Considering that the EventCollector handles finite messages rather than packages, this interface is implemented only by callback and channel subscribers Deprecated: use eventbus.NewChanListener or eventbus.NewCallbackListener instead
func NewCallbackTopicListener ¶
func NewCallbackTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener
func NewChanTopicListener ¶
func NewChanTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener
NewCallbackTopicListener creates the TopicListener listening to a topic on the EventBus. The EventBus, EventCollector and Topic are injected
func NewTopicListener ¶
func NewTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic, listenerType ListenerType) TopicListener
NewTopicListener creates a topic listener that subscribes an EventCollector to a Subscriber with a desidered Listener