Documentation ¶
Index ¶
- func NewBufferFilesEventSource(bufferFilesDir string, timeBetweenBlocks time.Duration, ...) (*bufferFileEventSource, error)
- func NewEventReceiverSender(config Config, log *logging.Logger, chainID string) (rawEventReceiverSender, error)
- func NewFanOutEventSource(source EventReceiver, sendChannelBufferSize int, expectedNumSubscribers int) *fanOutEventSource
- type BlockStore
- type Broker
- type BufferedEventSourceConfig
- type ChainInfoI
- type Config
- type Deserializer
- type EventReceiver
- type FileBufferedEventSource
- type FileEventSourceConfig
- type OrderEventWithFuryTime
- type ProtocolUpgradeHandler
- type RawEventReceiver
- type SQLBrokerSubscriber
- type SQLStoreBroker
- type SQLStoreEventBroker
- type SocketConfig
- type TestInterface
- type TransactionManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewEventReceiverSender ¶
func NewFanOutEventSource ¶
func NewFanOutEventSource(source EventReceiver, sendChannelBufferSize int, expectedNumSubscribers int) *fanOutEventSource
Types ¶
type BlockStore ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker - the base broker type perhaps we can extend this to embed into type-specific brokers.
func New ¶
func New(ctx context.Context, log *logging.Logger, config Config, chainID string, eventsource EventReceiver, ) (*Broker, error)
New creates a new base broker.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(s broker.Subscriber) int
Subscribe registers a new subscriber, returning the key.
func (*Broker) SubscribeBatch ¶
func (b *Broker) SubscribeBatch(subs ...broker.Subscriber)
func (*Broker) Unsubscribe ¶
Unsubscribe removes subscriber from broker this does not change the state of the subscriber.
type BufferedEventSourceConfig ¶
type BufferedEventSourceConfig struct { EventsPerFile int `long:"events-per-file" description:"the number of events to store in a file buffer, set to 0 to disable the buffer"` SendChannelBufferSize int `long:"send-buffer-size" description:"sink event channel buffer size"` Archive bool `long:"archive" description:"archives event buffer files after they have been read, default false"` ArchiveMaximumSizeBytes int64 `long:"archive-maximum-size" description:"the maximum size of the archive directory"` }
type ChainInfoI ¶
type Config ¶
type Config struct { Level encoding.LogLevel `long:"log-level"` SocketConfig SocketConfig `group:"Socket" namespace:"socket"` SocketServerInboundBufferSize int `long:"socket-server-inbound-buffer-size"` SocketServerOutboundBufferSize int `long:"socket-server-outbound-buffer-size"` FileEventSourceConfig FileEventSourceConfig `group:"FileEventSourceConfig" namespace:"fileeventsource"` UseEventFile encoding.Bool `long:"use-event-file" description:"set to true to source events from a file"` PanicOnError encoding.Bool `long:"panic-on-error" description:"if an error occurs on event push the broker will panic, else log the error"` UseBufferedEventSource encoding.Bool `long:"use-buffered-event-source" description:"if true datanode will buffer events"` BufferedEventSourceConfig BufferedEventSourceConfig `group:"BufferedEventSource" namespace:"bufferedeventsource"` EventBusClientBufferSize int `long:"event-bus-client-buffer-size"` }
Config represents the configuration of the broker.
func NewDefaultConfig ¶
func NewDefaultConfig() Config
NewDefaultConfig creates an instance of config with default values.
type Deserializer ¶
type Deserializer struct {
// contains filtered or unexported fields
}
func NewDeserializer ¶
func NewDeserializer(source RawEventReceiver) *Deserializer
func (*Deserializer) Listen ¶
func (e *Deserializer) Listen() error
type EventReceiver ¶
type FileBufferedEventSource ¶
type FileBufferedEventSource struct {
// contains filtered or unexported fields
}
func NewBufferedEventSource ¶
func NewBufferedEventSource(ctx context.Context, log *logging.Logger, config BufferedEventSourceConfig, source RawEventReceiver, bufferFilesDir string, archiveFilesDir string, ) (*FileBufferedEventSource, error)
func (*FileBufferedEventSource) Listen ¶
func (m *FileBufferedEventSource) Listen() error
type FileEventSourceConfig ¶
type FileEventSourceConfig struct { Directory string `long:"directory" description:"the directory container the event files"` TimeBetweenBlocks encoding.Duration `string:"time-between-blocks" description:"the time between sending blocks"` SendChannelBufferSize int `long:"send-buffer-size" description:"size of channel buffer used to send events to broker "` }
type OrderEventWithFuryTime ¶
func (*OrderEventWithFuryTime) FuryTime ¶
func (oe *OrderEventWithFuryTime) FuryTime() time.Time
func (*OrderEventWithFuryTime) GetOrder ¶
func (oe *OrderEventWithFuryTime) GetOrder() *fury.Order
type ProtocolUpgradeHandler ¶
type RawEventReceiver ¶
type SQLBrokerSubscriber ¶
type SQLStoreBroker ¶
type SQLStoreBroker struct {
// contains filtered or unexported fields
}
SQLStoreBroker : push events to each subscriber with a single go routine across all types.
func NewSQLStoreBroker ¶
func NewSQLStoreBroker( log *logging.Logger, config Config, chainID string, eventsource EventReceiver, transactionManager TransactionManager, blockStore BlockStore, onBlockCommitted func(ctx context.Context, chainId string, lastCommittedBlockHeight int64, snapshotTaken bool), protocolUpdateHandler ProtocolUpgradeHandler, subs []SQLBrokerSubscriber, ) *SQLStoreBroker
type SQLStoreEventBroker ¶
type SocketConfig ¶
type TestInterface ¶
type TestInterface interface { Send(event events.Event) Subscribe(s broker.Subscriber) int SubscribeBatch(subs ...broker.Subscriber) Unsubscribe(k int) Receive(ctx context.Context) error }
TestInterface interface (horribly named) is declared here to provide a drop-in replacement for broker mocks used throughout in addition to providing the classical mockgen functionality, this mock can be used to check the actual events that will be generated so we don't have to rely on test-only helper functions.