Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.
func (*Broker) SendBatch ¶
SendBatch sends a slice of events to subscribers that can handle the events in the slice the events don't have to be of the same type, and most subscribers will ignore unknown events but this will slow down those subscribers, so avoid doing silly things.
func (*Broker) SetStreaming ¶
SetStreaming allows the ability to toggle on and off sending events to the socketClient.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(s Subscriber) int
Subscribe registers a new subscriber, returning the key.
func (*Broker) SubscribeBatch ¶
func (b *Broker) SubscribeBatch(subs ...Subscriber)
func (*Broker) Unsubscribe ¶
Unsubscribe removes subscriber from broker this does not change the state of the subscriber.
type BrokerI ¶
type BrokerI interface { Send(event events.Event) SendBatch(events []events.Event) Subscribe(s Subscriber) int SubscribeBatch(subs ...Subscriber) Unsubscribe(k int) SetStreaming(on bool) bool }
BrokerI interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions
type Config ¶
type Config struct { Level encoding.LogLevel `long:"log-level"` Socket SocketConfig `group:"Socket" namespace:"socket"` File FileConfig `group:"File" namespace:"file"` }
Config represents the configuration of the broker.
func NewDefaultConfig ¶
func NewDefaultConfig() Config
NewDefaultConfig creates an instance of config with default values.
type FileClient ¶
type FileConfig ¶
type SocketClient ¶
SocketClient is an interface to send serialized events over a socket.
type SocketConfig ¶
type SocketConfig struct { DialTimeout encoding.Duration `long:"dial-timeout" description:" "` DialRetryInterval encoding.Duration `long:"dial-retry-interval" description:" "` SocketQueueTimeout encoding.Duration `long:"socket-queue-timeout" description:" "` EventChannelBufferSize int `long:"event-channel-buffer-size" description:" "` SocketChannelBufferSize int `long:"socket-channel-buffer-size" description:" "` MaxSendTimeouts int `long:"max-send-timeouts" description:" "` Address string `long:"address" description:"Data node's address"` Port int `long:"port" description:"Data node port"` Enabled encoding.Bool `long:"enabled" description:"Enable streaming of bus events over socket"` Transport string `long:"transport" description:"Transport of socket. tcp/inproc are allowed. Default is TCP"` }
type Subscriber ¶
type Subscriber interface { Push(val ...events.Event) Skip() <-chan struct{} Closed() <-chan struct{} C() chan<- []events.Event Types() []events.Type SetID(id int) ID() int Ack() bool }
Subscriber interface allows pushing values to subscribers, can be set to a Skip state (temporarily not receiving any events), or closed. Otherwise events are pushed