Documentation ¶
Index ¶
- Constants
- func WriteRawToBufferFile(bufferFile *os.File, bufferSeqNum uint64, rawEvent []byte) error
- func WriteToBufferFile(bufferFile *os.File, bufferSeqNum uint64, event events.Event) error
- type Broker
- func (b *Broker) OnTick(ctx context.Context, _ time.Time)
- func (b *Broker) ReloadConf(config Config)
- func (b *Broker) Send(event events.Event)
- func (b *Broker) SendBatch(events []events.Event)
- func (b *Broker) SetStreaming(on bool) bool
- func (b *Broker) SocketClient() SocketClient
- func (b *Broker) Stage(event events.Event)
- func (b *Broker) StreamingEnabled() bool
- func (b *Broker) Subscribe(s broker.Subscriber) int
- func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
- func (b *Broker) Unsubscribe(k int)
- type Config
- type FileClient
- type FileClientSend
- type FileConfig
- type Interface
- type SocketClient
- type SocketConfig
- type Stats
Constants ¶
const ( NumberOfSeqNumBytes = 8 NumberOfSizeBytes = 4 )
Variables ¶
This section is empty.
Functions ¶
func WriteRawToBufferFile ¶ added in v0.69.0
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.
func (*Broker) ReloadConf ¶ added in v0.77.0
func (*Broker) SendBatch ¶
SendBatch sends a slice of events to subscribers that can handle the events in the slice the events don't have to be of the same type, and most subscribers will ignore unknown events but this will slow down those subscribers, so avoid doing silly things.
func (*Broker) SetStreaming ¶
SetStreaming allows the ability to toggle on and off sending events to the socketClient.
func (*Broker) SocketClient ¶ added in v0.65.0
func (b *Broker) SocketClient() SocketClient
func (*Broker) StreamingEnabled ¶ added in v0.65.0
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(s broker.Subscriber) int
Subscribe registers a new subscriber, returning the key.
func (*Broker) SubscribeBatch ¶
func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
func (*Broker) Unsubscribe ¶
Unsubscribe removes subscriber from broker this does not change the state of the subscriber.
type Config ¶
type Config struct { Level encoding.LogLevel `long:"log-level"` Socket SocketConfig `group:"Socket" namespace:"socket"` File FileConfig `group:"File" namespace:"file"` EventBusClientBufferSize int `long:"event-bus-client-buffer-size"` }
Config represents the configuration of the broker.
func NewDefaultConfig ¶
func NewDefaultConfig() Config
NewDefaultConfig creates an instance of config with default values.
type FileClient ¶
type FileClient struct {
// contains filtered or unexported fields
}
func NewFileClient ¶ added in v0.57.0
func NewFileClient(log *logging.Logger, config *FileConfig) (*FileClient, error)
func (*FileClient) Close ¶ added in v0.57.0
func (fc *FileClient) Close() error
func (*FileClient) Replace ¶ added in v0.77.0
func (fc *FileClient) Replace(newFC *FileClient)
Replace - in case of a config change, just replace the file we're writing to with the new one.
type FileClientSend ¶ added in v0.57.0
type FileClientSend interface { SendBatch(events []events.Event) error Replace(*FileClient) Close() error }
type FileConfig ¶
type Interface ¶ added in v0.55.0
type Interface interface { Stage(event events.Event) Send(event events.Event) SendBatch(events []events.Event) Subscribe(s broker.Subscriber) int SubscribeBatch(subs ...broker.Subscriber) Unsubscribe(k int) SetStreaming(on bool) bool }
Interface interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions
type SocketClient ¶
type SocketClient interface { SendBatch(events []events.Event) error Receive(ctx context.Context) (<-chan events.Event, <-chan error) }
SocketClient is an interface to send serialized events over a socket.
type SocketConfig ¶
type SocketConfig struct { DialTimeout encoding.Duration `description:" " long:"dial-timeout"` DialRetryInterval encoding.Duration `description:" " long:"dial-retry-interval"` SocketQueueTimeout encoding.Duration `description:" " long:"socket-queue-timeout"` EventChannelBufferSize int `description:" " long:"event-channel-buffer-size"` SocketChannelBufferSize int `description:" " long:"socket-channel-buffer-size"` MaxSendTimeouts int `description:" " long:"max-send-timeouts"` Address string `description:"Data node's address" long:"address"` Port int `description:"Data node port" long:"port"` Enabled encoding.Bool `description:"Enable streaming of bus events over socket" long:"enabled"` Transport string `description:"Transport of socket. tcp/inproc are allowed. Default is TCP" long:"transport"` }