Documentation ¶
Index ¶
- Variables
- func Consume(elems []ring.Elem, w ring.Writer) bool
- func CreateGossipStreamer() (*EventBus, *GossipStreamer)
- type Broker
- type CallbackListener
- type ChanListener
- type Collector
- type EventBus
- func (bus *EventBus) AddDefaultTopic(tpcs ...topics.Topic)
- func (e *EventBus) Close()
- func (bus *EventBus) Publish(topic topics.Topic, m message.Message) (errorList []error)
- func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32
- func (bus *EventBus) SubscribeDefault(listener Listener) uint32
- func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)
- type GossipStreamer
- type Listener
- func NewCallbackListener(callback func(message.Message)) Listener
- func NewChanListener(msgChan chan<- message.Message) Listener
- func NewSafeCallbackListener(callback func(message.Message)) Listener
- func NewSafeChanListener(msgChan chan<- message.Message) Listener
- func NewStreamListener(w ring.Writer) Listener
- func NewStreamListenerWithParams(w ring.Writer, bufLen int, mapper func(topic topics.Topic) byte) Listener
- type Multicaster
- type Publisher
- type RouterStreamer
- type SimpleStreamer
- type StreamListener
- type StupidStreamer
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMsgChanFull underlying queue fails to accept new message due to full buffer. ErrMsgChanFull = errors.New("message channel buffer is full") // ErrRingBufferClosed underlying ring buffer is closed. ErrRingBufferClosed = errors.New("ringbuffer is closed") )
Functions ¶
func Consume ¶
Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation.
func CreateGossipStreamer ¶
func CreateGossipStreamer() (*EventBus, *GossipStreamer)
CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
Types ¶
type Broker ¶
type Broker interface { Subscriber Publisher }
Broker is an Publisher and an Subscriber.
type CallbackListener ¶
type CallbackListener struct {
// contains filtered or unexported fields
}
CallbackListener subscribes using callbacks.
func (*CallbackListener) Close ¶
func (c *CallbackListener) Close()
Close as part of the Listener method.
func (*CallbackListener) Notify ¶
func (c *CallbackListener) Notify(m message.Message) error
Notify the copy of a message as a parameter to a callback.
func (*CallbackListener) SetLogLevel ¶ added in v0.4.4
func (c *CallbackListener) SetLogLevel(logrus.Level)
SetLogLevel empty implementation.
type ChanListener ¶
type ChanListener struct {
// contains filtered or unexported fields
}
ChanListener dispatches a message using a channel.
func (*ChanListener) Notify ¶
func (c *ChanListener) Notify(m message.Message) error
Notify sends a message to the internal dispatcher channel. It forwards the message if the listener is unsafe. Otherwise, it forwards a message clone.
func (*ChanListener) SetLogLevel ¶ added in v0.4.4
func (c *ChanListener) SetLogLevel(lv logrus.Level)
SetLogLevel updates log level.
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets.
func NewSimpleCollector ¶
NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus - box for listeners and callbacks.
func CreateFrameStreamer ¶
CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
func (*EventBus) AddDefaultTopic ¶
AddDefaultTopic add topics to the default multiListener.
func (*EventBus) Close ¶ added in v0.4.4
func (e *EventBus) Close()
Close closes all topic listeners.
func (*EventBus) Publish ¶
Publish executes callback defined for a topic. topic is explicitly set as it might be different from the message Category (i.e. in the Gossip case). Publishing is a fire and forget. If there is no listener for a topic, the messages are lost. FIXME: Publish should fail fast and return one error. Since the code is largely asynchronous, we don't expect errors and if they happen, this should be reported asap.
func (*EventBus) SubscribeDefault ¶
SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture).
type GossipStreamer ¶
type GossipStreamer struct {
*SimpleStreamer
}
GossipStreamer is a SimpleStreamer which removes the checksum and the topic when reading. It is supposed to be used when testing data that needs to be streamed over the network.
func NewGossipStreamer ¶
func NewGossipStreamer(magic protocol.Magic) *GossipStreamer
NewGossipStreamer creates a new GossipStreamer instance.
func (*GossipStreamer) SeenTopics ¶
func (ms *GossipStreamer) SeenTopics() []topics.Topic
SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.
type Listener ¶
type Listener interface { // Notify a listener of a new message. Notify(message.Message) error // Update Listeners log level. From verbose to silent. SetLogLevel(logrus.Level) // Close the listener. Close() }
Listener publishes a byte array that subscribers of the EventBus can use.
func NewCallbackListener ¶
NewCallbackListener creates a callback based dispatcher.
func NewChanListener ¶
NewChanListener creates a channel based dispatcher. Although the message is passed by value, this is not enough to enforce thread-safety when the listener tries to read/change slices or arrays carried by the message.
func NewSafeCallbackListener ¶ added in v0.4.0
NewSafeCallbackListener creates a callback based dispatcher.
func NewSafeChanListener ¶ added in v0.4.0
NewSafeChanListener creates a channel based dispatcher which is thread-safe.
func NewStreamListener ¶
NewStreamListener creates a new StreamListener.
type Multicaster ¶
Multicaster allows for a single Listener to listen to multiple topics.
type RouterStreamer ¶ added in v0.4.0
type RouterStreamer struct {
// contains filtered or unexported fields
}
RouterStreamer reroutes a gossiped message to a list of EventBus instances.
func NewRouterStreamer ¶ added in v0.4.0
func NewRouterStreamer() *RouterStreamer
NewRouterStreamer instantiate RouterStreamer with empty list.
func (*RouterStreamer) Add ¶ added in v0.4.0
func (r *RouterStreamer) Add(p *EventBus)
Add adds a dest eventBus.
func (*RouterStreamer) Close ¶ added in v0.4.0
func (r *RouterStreamer) Close() error
Close implements io.WriteCloser.
func (*RouterStreamer) Read ¶ added in v0.4.0
func (r *RouterStreamer) Read() ([]byte, error)
type SimpleStreamer ¶
type SimpleStreamer struct { *bufio.Reader *bufio.Writer // contains filtered or unexported fields }
SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.
func NewSimpleStreamer ¶
func NewSimpleStreamer(magic protocol.Magic) *SimpleStreamer
NewSimpleStreamer returns an initialized SimpleStreamer.
func (*SimpleStreamer) Close ¶
func (ms *SimpleStreamer) Close() error
Close implements io.WriteCloser.
func (*SimpleStreamer) Read ¶
func (ms *SimpleStreamer) Read() ([]byte, error)
type StreamListener ¶
type StreamListener struct {
// contains filtered or unexported fields
}
StreamListener uses a ring buffer to dispatch messages. It is inherently thread-safe.
func (*StreamListener) Notify ¶
func (s *StreamListener) Notify(m message.Message) error
Notify puts a message to the Listener's ringbuffer. It uses a goroutine so to not block while the item is put in the ringbuffer.
func (*StreamListener) SetLogLevel ¶ added in v0.4.4
func (s *StreamListener) SetLogLevel(logrus.Level)
SetLogLevel empty implementation.
type StupidStreamer ¶ added in v0.3.0
StupidStreamer is a streamer meant for using when testing internal forwarding of binary packets through the ring buffer. It does *not* add magic, frames or other wraps on the forwarded packet.
func NewStupidStreamer ¶ added in v0.3.0
func NewStupidStreamer() *StupidStreamer
NewStupidStreamer returns an initialized SimpleStreamer.
func (*StupidStreamer) Close ¶ added in v0.3.0
func (sss *StupidStreamer) Close() error
Close implements io.WriteCloser.
func (*StupidStreamer) Read ¶ added in v0.3.0
func (sss *StupidStreamer) Read() ([]byte, error)