eventbus

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2019 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(items [][]byte, w io.WriteCloser) bool

Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation

func CreateGossipStreamer

func CreateGossipStreamer() (*EventBus, *GossipStreamer)

CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

Types

type Adder

type Adder struct {
	// contains filtered or unexported fields
}

Adder is a very simple Preprocessor for test purposes

func NewAdder

func NewAdder(tkn string) *Adder

NewAdder creates a new Adder

func (*Adder) Process

func (a *Adder) Process(buf *bytes.Buffer) error

Process a buffer by appending a string to it

type Broker

type Broker interface {
	ProcessorRegistry
	Subscriber
	Publisher
}

Broker is an Publisher and an Subscriber

type CallbackListener

type CallbackListener struct {
	// contains filtered or unexported fields
}

CallbackListener subscribes using callbacks

func (*CallbackListener) Close

func (c *CallbackListener) Close()

Close as part of the Listener method

func (*CallbackListener) Notify

func (c *CallbackListener) Notify(m bytes.Buffer) error

Notify the copy of a message as a parameter to a callback

type ChanListener

type ChanListener struct {
	// contains filtered or unexported fields
}

ChanListener dispatches a message using a channel

func (*ChanListener) Close

func (c *ChanListener) Close()

Close has no effect

func (*ChanListener) Notify

func (c *ChanListener) Notify(m bytes.Buffer) error

Notify sends a message to the internal dispatcher channel

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets

func NewSimpleCollector

func NewSimpleCollector(rChan chan bytes.Buffer, f func(bytes.Buffer) error) *Collector

NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel

func (*Collector) Collect

func (m *Collector) Collect(b bytes.Buffer) error

Collect redirects a buffer copy to a channel

type EventBus

type EventBus struct {
	ProcessorRegistry
	// contains filtered or unexported fields
}

EventBus - box for listeners and callbacks.

func CreateFrameStreamer

func CreateFrameStreamer(topic topics.Topic) (*EventBus, io.WriteCloser)

CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

func New

func New() *EventBus

New returns new EventBus with empty listeners.

func (*EventBus) AddDefaultTopic

func (bus *EventBus) AddDefaultTopic(topic topics.Topic)

AddDefaultTopic adds a topic to the default multiListener

func (*EventBus) Publish

func (bus *EventBus) Publish(topic topics.Topic, messageBuffer *bytes.Buffer)

Publish executes callback defined for a topic.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32

Subscribe subscribes to a topic with a channel.

func (*EventBus) SubscribeDefault

func (bus *EventBus) SubscribeDefault(listener Listener) uint32

SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture)

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)

Unsubscribe removes all listeners defined for a topic.

type GossipStreamer

type GossipStreamer struct {
	*SimpleStreamer
}

func NewGossipStreamer

func NewGossipStreamer(magic protocol.Magic) *GossipStreamer

func (*GossipStreamer) Read

func (ms *GossipStreamer) Read() ([]byte, error)

func (*GossipStreamer) SeenTopics

func (ms *GossipStreamer) SeenTopics() []topics.Topic

SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.

type Listener

type Listener interface {
	// Notify a listener of a new message
	Notify(bytes.Buffer) error
	// Close the listener
	Close()
}

Listener publishes a byte array that subscribers of the EventBus can use

func NewCallbackListener

func NewCallbackListener(callback func(bytes.Buffer) error) Listener

NewCallbackListener creates a callback based dispatcher

func NewChanListener

func NewChanListener(msgChan chan<- bytes.Buffer) Listener

NewChanListener creates a channel based dispatcher

func NewStreamListener

func NewStreamListener(w io.WriteCloser) Listener

NewStreamListener creates a new StreamListener

type ListenerType

type ListenerType int

ListenerType is the enum of the type of available listeners

const (
	// ChannelType is the Listener type that relies on channels to communicate
	ChannelType ListenerType = iota
	// CallbackType is the Listener type that relies on Callbacks to notify
	// messages
	CallbackType
)

type Multicaster

type Multicaster interface {
	AddDefaultTopic(topics.Topic)
	SubscribeDefault(Listener) uint32
}

Multicaster allows for a single Listener to listen to multiple topics

type Preprocessor

type Preprocessor interface {
	Process(*bytes.Buffer) error
}

Preprocessor is for mutating a message before it gets notified to the subscribers of a topic

type ProcessorRegistry

type ProcessorRegistry interface {
	Preprocess(topics.Topic, *bytes.Buffer) error
	Register(topics.Topic, ...Preprocessor) []uint32
	RemoveProcessor(topics.Topic, uint32)
	RemoveProcessors(topics.Topic)
	RemoveAllProcessors()
}

ProcessorRegistry is a registry of TopicProcessor

func NewSafeProcessorRegistry

func NewSafeProcessorRegistry() ProcessorRegistry

NewSafeProcessorRegistry creates a new Preprocessor

type Publisher

type Publisher interface {
	Publish(topics.Topic, *bytes.Buffer)
}

Publisher publishes serialized messages on a specific topic

type SafeProcessorRegistry

type SafeProcessorRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SafeProcessorRegistry allows registration of preprocessors to be applied to incoming Event on a specific topic It is threadsafe

func (*SafeProcessorRegistry) Preprocess

func (p *SafeProcessorRegistry) Preprocess(topic topics.Topic, messageBuffer *bytes.Buffer) error

Preprocess applies to a message all preprocessors registered for a topic

func (*SafeProcessorRegistry) Register

func (p *SafeProcessorRegistry) Register(topic topics.Topic, preprocessors ...Preprocessor) []uint32

Register creates a new set of TopicProcessor to a specified topic.

func (*SafeProcessorRegistry) RemoveAllProcessors

func (p *SafeProcessorRegistry) RemoveAllProcessors()

RemoveAllProcessors removes all TopicProcessors from all topics

func (*SafeProcessorRegistry) RemoveProcessor

func (p *SafeProcessorRegistry) RemoveProcessor(topic topics.Topic, id uint32)

RemoveProcessor removes all TopicProcessor previously registered on a given topic using its ID

func (*SafeProcessorRegistry) RemoveProcessors

func (p *SafeProcessorRegistry) RemoveProcessors(topic topics.Topic)

RemoveProcessors removes all TopicProcessor from a topic

type SimpleStreamer

type SimpleStreamer struct {
	*bufio.Reader
	*bufio.Writer
	// contains filtered or unexported fields
}

SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.

func NewSimpleStreamer

func NewSimpleStreamer(magic protocol.Magic) *SimpleStreamer

NewSimpleStreamer returns an initialized SimpleStreamer.

func (*SimpleStreamer) Close

func (ms *SimpleStreamer) Close() error

Close implements io.WriteCloser.

func (*SimpleStreamer) Read

func (ms *SimpleStreamer) Read() ([]byte, error)

func (*SimpleStreamer) Write

func (ms *SimpleStreamer) Write(p []byte) (n int, err error)

type StreamListener

type StreamListener struct {
	// contains filtered or unexported fields
}

StreamListener uses a ring buffer to dispatch messages

func (*StreamListener) Close

func (s *StreamListener) Close()

Close the internal ringbuffer

func (*StreamListener) Notify

func (s *StreamListener) Notify(m bytes.Buffer) error

Notify puts a message to the Listener's ringbuffer

type Subscriber

type Subscriber interface {
	Subscribe(topics.Topic, Listener) uint32
	Unsubscribe(topics.Topic, uint32)
}

Subscriber subscribes a channel to Event notifications on a specific topic

type TopicListener

type TopicListener interface {
	Quit()
}

TopicListener is a helper interface to connect an EventCollector to the EventBus. Considering that the EventCollector handles finite messages rather than packages, this interface is implemented only by callback and channel subscribers Deprecated: use eventbus.NewChanListener or eventbus.NewCallbackListener instead

func NewCallbackTopicListener

func NewCallbackTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener

func NewChanTopicListener

func NewChanTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic) TopicListener

NewCallbackTopicListener creates the TopicListener listening to a topic on the EventBus. The EventBus, EventCollector and Topic are injected

func NewTopicListener

func NewTopicListener(subscriber Subscriber, collector wire.EventCollector, topic topics.Topic, listenerType ListenerType) TopicListener

NewTopicListener creates a topic listener that subscribes an EventCollector to a Subscriber with a desidered Listener

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL