Documentation ¶
Index ¶
- func Consume(items [][]byte, w io.WriteCloser) bool
- func CreateGossipStreamer() (*EventBus, *GossipStreamer)
- type Broker
- type CallbackListener
- type ChanListener
- type Collector
- type EventBus
- func (bus *EventBus) AddDefaultTopic(tpcs ...topics.Topic)
- func (bus *EventBus) Publish(topic topics.Topic, m message.Message) (errorList []error)
- func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32
- func (bus *EventBus) SubscribeDefault(listener Listener) uint32
- func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)
- type GossipStreamer
- type Listener
- func NewCallbackListener(callback func(message.Message)) Listener
- func NewChanListener(msgChan chan<- message.Message) Listener
- func NewSafeCallbackListener(callback func(message.Message)) Listener
- func NewSafeChanListener(msgChan chan<- message.Message) Listener
- func NewStreamListener(w io.WriteCloser) Listener
- type Multicaster
- type Publisher
- type RouterStreamer
- type SimpleStreamer
- type StreamListener
- type StupidStreamer
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume(items [][]byte, w io.WriteCloser) bool
Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation.
func CreateGossipStreamer ¶
func CreateGossipStreamer() (*EventBus, *GossipStreamer)
CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
Types ¶
type Broker ¶
type Broker interface { Subscriber Publisher }
Broker is an Publisher and an Subscriber.
type CallbackListener ¶
type CallbackListener struct {
// contains filtered or unexported fields
}
CallbackListener subscribes using callbacks.
func (*CallbackListener) Close ¶
func (c *CallbackListener) Close()
Close as part of the Listener method.
type ChanListener ¶
type ChanListener struct {
// contains filtered or unexported fields
}
ChanListener dispatches a message using a channel.
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets.
func NewSimpleCollector ¶
NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus - box for listeners and callbacks.
func CreateFrameStreamer ¶
func CreateFrameStreamer(topic topics.Topic) (*EventBus, io.WriteCloser)
CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.
func (*EventBus) AddDefaultTopic ¶
AddDefaultTopic add topics to the default multiListener.
func (*EventBus) Publish ¶
Publish executes callback defined for a topic. topic is explicitly set as it might be different from the message Category (i.e. in the Gossip case). Publishing is a fire and forget. If there is no listener for a topic, the messages are lost. FIXME: Publish should fail fast and return one error. Since the code is largely asynchronous, we don't expect errors and if they happen, this should be reported asap.
func (*EventBus) SubscribeDefault ¶
SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture).
type GossipStreamer ¶
type GossipStreamer struct {
*SimpleStreamer
}
GossipStreamer is a SimpleStreamer which removes the checksum and the topic when reading. It is supposed to be used when testing data that needs to be streamed over the network.
func NewGossipStreamer ¶
func NewGossipStreamer(magic protocol.Magic) *GossipStreamer
NewGossipStreamer creates a new GossipStreamer instance.
func (*GossipStreamer) SeenTopics ¶
func (ms *GossipStreamer) SeenTopics() []topics.Topic
SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.
type Listener ¶
type Listener interface { // Notify a listener of a new message. Notify(message.Message) error // Close the listener. Close() }
Listener publishes a byte array that subscribers of the EventBus can use.
func NewCallbackListener ¶
NewCallbackListener creates a callback based dispatcher.
func NewChanListener ¶
NewChanListener creates a channel based dispatcher. Although the message is passed by value, this is not enough to enforce thread-safety when the listener tries to read/change slices or arrays carried by the message.
func NewSafeCallbackListener ¶ added in v0.4.0
NewSafeCallbackListener creates a callback based dispatcher.
func NewSafeChanListener ¶ added in v0.4.0
NewSafeChanListener creates a channel based dispatcher which is thread-safe.
func NewStreamListener ¶
func NewStreamListener(w io.WriteCloser) Listener
NewStreamListener creates a new StreamListener.
type Multicaster ¶
Multicaster allows for a single Listener to listen to multiple topics.
type RouterStreamer ¶ added in v0.4.0
type RouterStreamer struct {
// contains filtered or unexported fields
}
RouterStreamer reroutes a gossiped message to a list of EventBus instances.
func NewRouterStreamer ¶ added in v0.4.0
func NewRouterStreamer() *RouterStreamer
NewRouterStreamer instantiate RouterStreamer with empty list.
func (*RouterStreamer) Add ¶ added in v0.4.0
func (r *RouterStreamer) Add(p *EventBus)
Add adds a dest eventBus.
func (*RouterStreamer) Close ¶ added in v0.4.0
func (r *RouterStreamer) Close() error
Close implements io.WriteCloser.
func (*RouterStreamer) Read ¶ added in v0.4.0
func (r *RouterStreamer) Read() ([]byte, error)
type SimpleStreamer ¶
type SimpleStreamer struct { *bufio.Reader *bufio.Writer // contains filtered or unexported fields }
SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.
func NewSimpleStreamer ¶
func NewSimpleStreamer(magic protocol.Magic) *SimpleStreamer
NewSimpleStreamer returns an initialized SimpleStreamer.
func (*SimpleStreamer) Close ¶
func (ms *SimpleStreamer) Close() error
Close implements io.WriteCloser.
func (*SimpleStreamer) Read ¶
func (ms *SimpleStreamer) Read() ([]byte, error)
type StreamListener ¶
type StreamListener struct {
// contains filtered or unexported fields
}
StreamListener uses a ring buffer to dispatch messages. It is inherently thread-safe.
type StupidStreamer ¶ added in v0.3.0
StupidStreamer is a streamer meant for using when testing internal forwarding of binary packets through the ring buffer. It does *not* add magic, frames or other wraps on the forwarded packet.
func NewStupidStreamer ¶ added in v0.3.0
func NewStupidStreamer() *StupidStreamer
NewStupidStreamer returns an initialized SimpleStreamer.
func (*StupidStreamer) Close ¶ added in v0.3.0
func (sss *StupidStreamer) Close() error
Close implements io.WriteCloser.
func (*StupidStreamer) Read ¶ added in v0.3.0
func (sss *StupidStreamer) Read() ([]byte, error)