Documentation ¶
Index ¶
- Constants
- func WriteToBufferFile(bufferFile *os.File, bufferSeqNum uint64, event events.Event) error
- type Broker
- func (b *Broker) Send(event events.Event)
- func (b *Broker) SendBatch(events []events.Event)
- func (b *Broker) SetStreaming(on bool) bool
- func (b *Broker) SocketClient() SocketClient
- func (b *Broker) StreamingEnabled() bool
- func (b *Broker) Subscribe(s broker.Subscriber) int
- func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
- func (b *Broker) Unsubscribe(k int)
- type Config
- type FileClient
- type FileClientSend
- type FileConfig
- type Interface
- type SocketClient
- type SocketConfig
- type Stats
Constants ¶
const ( NumberOfSeqNumBytes = 8 NumberOfSizeBytes = 4 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.
func (*Broker) SendBatch ¶
SendBatch sends a slice of events to subscribers that can handle the events in the slice the events don't have to be of the same type, and most subscribers will ignore unknown events but this will slow down those subscribers, so avoid doing silly things.
func (*Broker) SetStreaming ¶
SetStreaming allows the ability to toggle on and off sending events to the socketClient.
func (*Broker) SocketClient ¶
func (b *Broker) SocketClient() SocketClient
func (*Broker) StreamingEnabled ¶
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(s broker.Subscriber) int
Subscribe registers a new subscriber, returning the key.
func (*Broker) SubscribeBatch ¶
func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
func (*Broker) Unsubscribe ¶
Unsubscribe removes subscriber from broker this does not change the state of the subscriber.
type Config ¶
type Config struct { Level encoding.LogLevel `long:"log-level"` Socket SocketConfig `group:"Socket" namespace:"socket"` File FileConfig `group:"File" namespace:"file"` EventBusClientBufferSize int `long:"event-bus-client-buffer-size"` }
Config represents the configuration of the broker.
func NewDefaultConfig ¶
func NewDefaultConfig() Config
NewDefaultConfig creates an instance of config with default values.
type FileClient ¶
type FileClient struct {
// contains filtered or unexported fields
}
func NewFileClient ¶
func NewFileClient(log *logging.Logger, config *FileConfig) (*FileClient, error)
func (*FileClient) Close ¶
func (fc *FileClient) Close() error
type FileClientSend ¶
type FileConfig ¶
type Interface ¶
type Interface interface { Send(event events.Event) SendBatch(events []events.Event) Subscribe(s broker.Subscriber) int SubscribeBatch(subs ...broker.Subscriber) Unsubscribe(k int) SetStreaming(on bool) bool }
Interface interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions
type SocketClient ¶
type SocketClient interface { SendBatch(events []events.Event) error Receive(ctx context.Context) (<-chan events.Event, <-chan error) }
SocketClient is an interface to send serialized events over a socket.
type SocketConfig ¶
type SocketConfig struct { DialTimeout encoding.Duration `long:"dial-timeout" description:" "` DialRetryInterval encoding.Duration `long:"dial-retry-interval" description:" "` SocketQueueTimeout encoding.Duration `long:"socket-queue-timeout" description:" "` EventChannelBufferSize int `long:"event-channel-buffer-size" description:" "` SocketChannelBufferSize int `long:"socket-channel-buffer-size" description:" "` MaxSendTimeouts int `long:"max-send-timeouts" description:" "` Address string `long:"address" description:"Data node's address"` Port int `long:"port" description:"Data node port"` Enabled encoding.Bool `long:"enabled" description:"Enable streaming of bus events over socket"` Transport string `long:"transport" description:"Transport of socket. tcp/inproc are allowed. Default is TCP"` }