README
¶
Event bus
The broker is the entry point for core events. Core engines only ever use a single method of the broker interface. Subscribers don't interact with the broker directly, but are called by the broker (which pushes events to them).
For the core
Where engines previously depended on any number of buffers to push data to that plugins, API's, and stores needed to handle, they will now push events onto the bus via the broker. In the events
package, you'll find a number of events (trade event, order event, market data, etc...). These events all have a constructor for convenience. Most events can even be created through the generic constructor. Say, for example, we receive a new order, or the state of an order changed:
func (e *Engine) Foo(ctx, order *types.Order) {
// some code, order changes:
e.broker.Send(events.NewOrderEvent(ctx, *order))
// or, using the generic constructor
if evt, err := events.New(ctx, *order); err == nil {
e.broker.Send(evt)
}
}
The buffers needed to be flushed at the end of a block. Now, the end of a block is considered an event (TimeUpdate
). If some events/data needs to be batched, and only processed at the end of the block, the subscriber should subscribe to the time event, and use that as a trigger/signal to do what is expected.
Some events (e.g. position related events) are created a ton of times. Sending them individually creates a metric ton of routines sending out all the data, which is bad for performance. To get around this issue, the broker includes a SendBatch(events []Events)
function. The events in the slice can be of different types, but we derive the subscriber types from the first event in the slice, and assume that subscribers who listen for this event can either safely ignore any other events in the slice, or can handle all of them (currently, all subscribers can handle unknown events - they simply ignore them).
It might be a good idea to have some placeholder event added for things like MarketEvent
(which basically is a catch-all event type for things we want to log).
For non-core (subscribers etc...)
The core spits out events, without caring where the data ends up. The subscribers need to be registered with the broker to receive the data and process it. There are 2 main categories of subscribers: required (think of it as ack) and non-required subscribers. If a subscriber is registered as required, the broker will make sure that the subscriber receives all events, in the correct order. The non-required subscribers have a channel onto which the broker pushes events, unless the channel buffer is full. The subscriber is not required, so rather than blocking the broker, the broker is free to skip these subscribers and simply carry on. As a result the latter type of subscriber is not guaranteed to receive every single event, but unlike their required counterparts, they are unable to have a meaningful impact on the performance of the broker.
Subscribers implement a fairly simple interface:
type Subscriber interface {
Ack() bool
Push(val ...events.Event)
Skip() <-chan struct{}
Closed() <-chan struct{}
C() chan<- events.Event
Types() []events.Type
SetID(id int)
ID() int
}
Ack()
: Indicates whether or not this subscriber "Ack's" the events it receives. In this case: the event has to be passed to thePush
function.Types
: The broker uses this call to determine what events this subscriber wants to receive.Push
: A required subscriber will receive all its events from the broker through a normal function call. This ensures the event is indeed received. This function accepts one or more events. IfSendBatch()
was called on the broker, ack'ing subscribers will receive the entire batch of events in a single call.C
: This is used for non-required subscribers. This returns a write channel where the prober attempts to push an event onto. If this fails (because the buffer is full), the event is dropped for that subscriberClosed
: A subscriber can be halted (if it's no longer needed). This function will return a closed channel indicating that this subscriber is redundant, and should be removedSkip
: If a subscriber only periodically needs to get data, we kan keep it registered, but put it in a "paused" state. A paused subscriber will return a closed channel for as long as we're not interested in receiving data.ID
andSetID
: Subscribers have a broker ID (int). This is unique for all subscribers, and allows us to remove a specific subscriber manually, should we need to. The broker will also callSetID
when the subscriber is registered.
Documentation
¶
Index ¶
- Constants
- func WriteRawToBufferFile(bufferFile *os.File, bufferSeqNum uint64, rawEvent []byte) error
- func WriteToBufferFile(bufferFile *os.File, bufferSeqNum uint64, event events.Event) error
- type Broker
- func (b *Broker) OnTick(ctx context.Context, _ time.Time)
- func (b *Broker) ReloadConf(config Config)
- func (b *Broker) Send(event events.Event)
- func (b *Broker) SendBatch(events []events.Event)
- func (b *Broker) SetStreaming(on bool) bool
- func (b *Broker) SocketClient() SocketClient
- func (b *Broker) Stage(event events.Event)
- func (b *Broker) StreamingEnabled() bool
- func (b *Broker) Subscribe(s broker.Subscriber) int
- func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
- func (b *Broker) Unsubscribe(k int)
- type Config
- type FileClient
- type FileClientSend
- type FileConfig
- type Interface
- type SocketClient
- type SocketConfig
- type Stats
Constants ¶
const ( NumberOfSeqNumBytes = 8 NumberOfSizeBytes = 4 )
Variables ¶
This section is empty.
Functions ¶
func WriteRawToBufferFile ¶ added in v0.69.0
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.
func (*Broker) ReloadConf ¶ added in v0.77.0
func (*Broker) SendBatch ¶
SendBatch sends a slice of events to subscribers that can handle the events in the slice the events don't have to be of the same type, and most subscribers will ignore unknown events but this will slow down those subscribers, so avoid doing silly things.
func (*Broker) SetStreaming ¶
SetStreaming allows the ability to toggle on and off sending events to the socketClient.
func (*Broker) SocketClient ¶ added in v0.65.0
func (b *Broker) SocketClient() SocketClient
func (*Broker) StreamingEnabled ¶ added in v0.65.0
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(s broker.Subscriber) int
Subscribe registers a new subscriber, returning the key.
func (*Broker) SubscribeBatch ¶
func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
func (*Broker) Unsubscribe ¶
Unsubscribe removes subscriber from broker this does not change the state of the subscriber.
type Config ¶
type Config struct { Level encoding.LogLevel `long:"log-level"` Socket SocketConfig `group:"Socket" namespace:"socket"` File FileConfig `group:"File" namespace:"file"` EventBusClientBufferSize int `long:"event-bus-client-buffer-size"` }
Config represents the configuration of the broker.
func NewDefaultConfig ¶
func NewDefaultConfig() Config
NewDefaultConfig creates an instance of config with default values.
type FileClient ¶
type FileClient struct {
// contains filtered or unexported fields
}
func NewFileClient ¶ added in v0.57.0
func NewFileClient(log *logging.Logger, config *FileConfig) (*FileClient, error)
func (*FileClient) Close ¶ added in v0.57.0
func (fc *FileClient) Close() error
func (*FileClient) Replace ¶ added in v0.77.0
func (fc *FileClient) Replace(newFC *FileClient)
Replace - in case of a config change, just replace the file we're writing to with the new one.
type FileClientSend ¶ added in v0.57.0
type FileClientSend interface { SendBatch(events []events.Event) error Replace(*FileClient) Close() error }
type FileConfig ¶
type Interface ¶ added in v0.55.0
type Interface interface { Stage(event events.Event) Send(event events.Event) SendBatch(events []events.Event) Subscribe(s broker.Subscriber) int SubscribeBatch(subs ...broker.Subscriber) Unsubscribe(k int) SetStreaming(on bool) bool }
Interface interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions
type SocketClient ¶
type SocketClient interface { SendBatch(events []events.Event) error Receive(ctx context.Context) (<-chan events.Event, <-chan error) }
SocketClient is an interface to send serialized events over a socket.
type SocketConfig ¶
type SocketConfig struct { DialTimeout encoding.Duration `description:" " long:"dial-timeout"` DialRetryInterval encoding.Duration `description:" " long:"dial-retry-interval"` SocketQueueTimeout encoding.Duration `description:" " long:"socket-queue-timeout"` EventChannelBufferSize int `description:" " long:"event-channel-buffer-size"` SocketChannelBufferSize int `description:" " long:"socket-channel-buffer-size"` MaxSendTimeouts int `description:" " long:"max-send-timeouts"` Address string `description:"Data node's address" long:"address"` Port int `description:"Data node port" long:"port"` Enabled encoding.Bool `description:"Enable streaming of bus events over socket" long:"enabled"` Transport string `description:"Transport of socket. tcp/inproc are allowed. Default is TCP" long:"transport"` }