broker

package
v0.79.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: AGPL-3.0 Imports: 32 Imported by: 0

README

Event bus

The broker is the entry point for core events. Core engines only ever use a single method of the broker interface. Subscribers don't interact with the broker directly, but are called by the broker (which pushes events to them).

For the core

Where engines previously depended on any number of buffers to push data to that plugins, API's, and stores needed to handle, they will now push events onto the bus via the broker. In the events package, you'll find a number of events (trade event, order event, market data, etc...). These events all have a constructor for convenience. Most events can even be created through the generic constructor. Say, for example, we receive a new order, or the state of an order changed:

func (e *Engine) Foo(ctx, order *types.Order) {
    // some code, order changes:
    e.broker.Send(events.NewOrderEvent(ctx, *order))
    // or, using the generic constructor
    if evt, err := events.New(ctx, *order); err == nil {
        e.broker.Send(evt)
    }
}

The buffers needed to be flushed at the end of a block. Now, the end of a block is considered an event (TimeUpdate). If some events/data needs to be batched, and only processed at the end of the block, the subscriber should subscribe to the time event, and use that as a trigger/signal to do what is expected.

Some events (e.g. position related events) are created a ton of times. Sending them individually creates a metric ton of routines sending out all the data, which is bad for performance. To get around this issue, the broker includes a SendBatch(events []Events) function. The events in the slice can be of different types, but we derive the subscriber types from the first event in the slice, and assume that subscribers who listen for this event can either safely ignore any other events in the slice, or can handle all of them (currently, all subscribers can handle unknown events - they simply ignore them). It might be a good idea to have some placeholder event added for things like MarketEvent (which basically is a catch-all event type for things we want to log).

For non-core (subscribers etc...)

The core spits out events, without caring where the data ends up. The subscribers need to be registered with the broker to receive the data and process it. There are 2 main categories of subscribers: required (think of it as ack) and non-required subscribers. If a subscriber is registered as required, the broker will make sure that the subscriber receives all events, in the correct order. The non-required subscribers have a channel onto which the broker pushes events, unless the channel buffer is full. The subscriber is not required, so rather than blocking the broker, the broker is free to skip these subscribers and simply carry on. As a result the latter type of subscriber is not guaranteed to receive every single event, but unlike their required counterparts, they are unable to have a meaningful impact on the performance of the broker.

Subscribers implement a fairly simple interface:

type Subscriber interface {
    Ack() bool
	Push(val ...events.Event)
	Skip() <-chan struct{}
	Closed() <-chan struct{}
	C() chan<- events.Event
	Types() []events.Type
	SetID(id int)
	ID() int
}
  • Ack(): Indicates whether or not this subscriber "Ack's" the events it receives. In this case: the event has to be passed to the Push function.
  • Types: The broker uses this call to determine what events this subscriber wants to receive.
  • Push: A required subscriber will receive all its events from the broker through a normal function call. This ensures the event is indeed received. This function accepts one or more events. If SendBatch() was called on the broker, ack'ing subscribers will receive the entire batch of events in a single call.
  • C: This is used for non-required subscribers. This returns a write channel where the prober attempts to push an event onto. If this fails (because the buffer is full), the event is dropped for that subscriber
  • Closed: A subscriber can be halted (if it's no longer needed). This function will return a closed channel indicating that this subscriber is redundant, and should be removed
  • Skip: If a subscriber only periodically needs to get data, we kan keep it registered, but put it in a "paused" state. A paused subscriber will return a closed channel for as long as we're not interested in receiving data.
  • ID and SetID: Subscribers have a broker ID (int). This is unique for all subscribers, and allows us to remove a specific subscriber manually, should we need to. The broker will also call SetID when the subscriber is registered.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBufferFilesEventSource added in v0.69.0

func NewBufferFilesEventSource(bufferFilesDir string, timeBetweenBlocks time.Duration,
	sendChannelBufferSize int, chainID string) (*bufferFileEventSource, error,
)

func NewEventReceiverSender added in v0.65.0

func NewEventReceiverSender(config Config, log *logging.Logger, chainID string) (rawEventReceiverSender, error)

func NewFanOutEventSource

func NewFanOutEventSource(source EventReceiver, sendChannelBufferSize int, expectedNumSubscribers int) *fanOutEventSource

func ReadRawEvent added in v0.73.0

func ReadRawEvent(eventFile *os.File, offset int64) (event []byte, seqNum uint64,
	totalBytesRead uint32, err error,
)

Types

type BlockStore

type BlockStore interface {
	Add(ctx context.Context, b entities.Block) error
	GetLastBlock(ctx context.Context) (entities.Block, error)
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.

func New

func New(ctx context.Context, log *logging.Logger, config Config, chainID string,
	eventsource EventReceiver,
) (*Broker, error)

New creates a new base broker.

func (*Broker) Receive

func (b *Broker) Receive(ctx context.Context) error

func (*Broker) Send

func (b *Broker) Send(event events.Event)

Send sends an event to all subscribers.

func (*Broker) Subscribe

func (b *Broker) Subscribe(s broker.Subscriber) int

Subscribe registers a new subscriber, returning the key.

func (*Broker) SubscribeBatch

func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)

func (*Broker) Unsubscribe

func (b *Broker) Unsubscribe(k int)

Unsubscribe removes subscriber from broker this does not change the state of the subscriber.

type BufferedEventSourceConfig added in v0.61.0

type BufferedEventSourceConfig struct {
	EventsPerFile           int   `description:"the number of events to store in a file buffer, set to 0 to disable the buffer" long:"events-per-file"`
	SendChannelBufferSize   int   `description:"sink event channel buffer size"                                                 long:"send-buffer-size"`
	Archive                 bool  `description:"archives event buffer files after they have been read, default false"           long:"archive"`
	ArchiveMaximumSizeBytes int64 `description:"the maximum size of the archive directory"                                      long:"archive-maximum-size"`
}

type ChainInfoI

type ChainInfoI interface {
	SetChainID(string) error
	GetChainID() (string, error)
}

type Config

type Config struct {
	Level                          encoding.LogLevel         `long:"log-level"`
	SocketConfig                   SocketConfig              `group:"Socket"                                                                           namespace:"socket"`
	SocketServerInboundBufferSize  int                       `long:"socket-server-inbound-buffer-size"`
	SocketServerOutboundBufferSize int                       `long:"socket-server-outbound-buffer-size"`
	FileEventSourceConfig          FileEventSourceConfig     `group:"FileEventSourceConfig"                                                            namespace:"fileeventsource"`
	UseEventFile                   encoding.Bool             `description:"set to true to source events from a file"                                   long:"use-event-file"`
	PanicOnError                   encoding.Bool             `description:"if an error occurs on event push the broker will panic, else log the error" long:"panic-on-error"`
	UseBufferedEventSource         encoding.Bool             `description:"if true datanode will buffer events"                                        long:"use-buffered-event-source"`
	BufferedEventSourceConfig      BufferedEventSourceConfig `group:"BufferedEventSource"                                                              namespace:"bufferedeventsource"`
	EventBusClientBufferSize       int                       `long:"event-bus-client-buffer-size"`
}

Config represents the configuration of the broker.

func NewDefaultConfig

func NewDefaultConfig() Config

NewDefaultConfig creates an instance of config with default values.

type Deserializer added in v0.69.0

type Deserializer struct {
	// contains filtered or unexported fields
}

func NewDeserializer added in v0.69.0

func NewDeserializer(source RawEventReceiver) *Deserializer

func (*Deserializer) Listen added in v0.69.0

func (e *Deserializer) Listen() error

func (*Deserializer) Receive added in v0.69.0

func (e *Deserializer) Receive(ctx context.Context) (<-chan events.Event, <-chan error)

type EventReceiver added in v0.65.0

type EventReceiver interface {
	Listen() error
	Receive(ctx context.Context) (<-chan events.Event, <-chan error)
}

type FileBufferedEventSource added in v0.61.0

type FileBufferedEventSource struct {
	// contains filtered or unexported fields
}

func NewBufferedEventSource added in v0.61.0

func NewBufferedEventSource(ctx context.Context, log *logging.Logger, config BufferedEventSourceConfig,
	source RawEventReceiver, bufferFilesDir string,
	archiveFilesDir string,
) (*FileBufferedEventSource, error)

func (*FileBufferedEventSource) Listen added in v0.61.0

func (m *FileBufferedEventSource) Listen() error

func (*FileBufferedEventSource) Receive added in v0.61.0

func (m *FileBufferedEventSource) Receive(ctx context.Context) (<-chan []byte, <-chan error)

type FileEventSourceConfig

type FileEventSourceConfig struct {
	Directory             string            `description:"the directory container the event files"               long:"directory"`
	TimeBetweenBlocks     encoding.Duration `description:"the time between sending blocks"                       string:"time-between-blocks"`
	SendChannelBufferSize int               `description:"size of channel buffer used to send events to broker " long:"send-buffer-size"`
}

type OrderEventWithVegaTime

type OrderEventWithVegaTime struct {
	events.Order
	// contains filtered or unexported fields
}

func (*OrderEventWithVegaTime) GetOrder

func (oe *OrderEventWithVegaTime) GetOrder() *vega.Order

func (*OrderEventWithVegaTime) VegaTime

func (oe *OrderEventWithVegaTime) VegaTime() time.Time

type ProtocolUpgradeHandler added in v0.63.0

type ProtocolUpgradeHandler interface {
	OnProtocolUpgradeEvent(ctx context.Context, chainID string, lastCommittedBlockHeight int64) error
	GetProtocolUpgradeStarted() bool
}

type RawEventReceiver added in v0.69.0

type RawEventReceiver interface {
	Listen() error
	Receive(ctx context.Context) (<-chan []byte, <-chan error)
}

type SQLBrokerSubscriber added in v0.55.0

type SQLBrokerSubscriber interface {
	SetVegaTime(vegaTime time.Time)
	Flush(ctx context.Context) error
	Push(ctx context.Context, val events.Event) error
	Types() []events.Type
	Name() string
}

type SQLStoreBroker added in v0.57.0

type SQLStoreBroker struct {
	// contains filtered or unexported fields
}

SQLStoreBroker : push events to each subscriber with a single go routine across all types.

func NewSQLStoreBroker added in v0.55.0

func NewSQLStoreBroker(
	log *logging.Logger,
	config Config,
	chainID string,
	eventsource EventReceiver,
	transactionManager TransactionManager,
	blockStore BlockStore,
	onBlockCommitted func(ctx context.Context, chainId string, lastCommittedBlockHeight int64, snapshotTaken bool),
	protocolUpdateHandler ProtocolUpgradeHandler,
	subs []SQLBrokerSubscriber,
) *SQLStoreBroker

func (*SQLStoreBroker) Receive added in v0.57.0

func (b *SQLStoreBroker) Receive(ctx context.Context) error

type SQLStoreEventBroker added in v0.55.0

type SQLStoreEventBroker interface {
	Receive(ctx context.Context) error
}

type SocketConfig

type SocketConfig struct {
	IP                 string `description:" "             long:"ip"`
	Port               int    `description:" "             long:"port"`
	MaxReceiveTimeouts int    `long:"max-receive-timeouts"`
	TransportType      string `long:"transport-type"`
}

type TestInterface added in v0.55.0

type TestInterface interface {
	Send(event events.Event)
	Subscribe(s broker.Subscriber) int
	SubscribeBatch(subs ...broker.Subscriber)
	Unsubscribe(k int)
	Receive(ctx context.Context) error
}

TestInterface interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions.

type TransactionManager

type TransactionManager interface {
	WithConnection(ctx context.Context) (context.Context, error)
	WithTransaction(ctx context.Context) (context.Context, error)
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
	RefreshMaterializedViews(ctx context.Context) error
}

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL