subscribers

package
v0.64.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Base

type Base struct {
	// contains filtered or unexported fields
}

func NewBase

func NewBase(ctx context.Context, buf int, ack bool) *Base

func (*Base) Ack

func (b *Base) Ack() bool

Ack returns whether or not this is a synchronous/async subscriber.

func (*Base) C

func (b *Base) C() chan<- []events.Event

C returns the event channel for optional subscribers.

func (*Base) Closed

func (b *Base) Closed() <-chan struct{}

Closed indicates to the broker that the subscriber is closed for business.

func (*Base) Halt

func (b *Base) Halt()

Halt is called by the broker on shutdown, this closes the open channels.

func (*Base) ID

func (b *Base) ID() int

ID returns the subscriber ID.

func (*Base) Pause

func (b *Base) Pause()

Pause the current subscriber will not receive events from the channel.

func (*Base) Resume

func (b *Base) Resume()

Resume unpauzes the subscriber.

func (*Base) SetID

func (b *Base) SetID(id int)

SetID set the ID (exposed only to broker).

func (*Base) Skip

func (b *Base) Skip() <-chan struct{}

Skip lets the broker know that the subscriber is not receiving events.

type Broker

type Broker interface {
	Subscribe(s broker.Subscriber) int
	Unsubscribe(id int)
}

type EventFilter

type EventFilter func(events.Event) bool

type MarketDepth

type MarketDepth struct {
	// contains filtered or unexported fields
}

MarketDepth holds all the details about a single markets MarketDepth.

type MarketDepthBuilder

type MarketDepthBuilder struct {
	*Base
	// contains filtered or unexported fields
}

MarketDepthBuilder is a subscriber of order events used to build the live market depth structure.

func NewMarketDepthBuilder

func NewMarketDepthBuilder(ctx context.Context, log *logging.Logger, ack bool) *MarketDepthBuilder

NewMarketDepthBuilder constructor to create a market depth subscriber.

func (*MarketDepthBuilder) GetAllOrders

func (mdb *MarketDepthBuilder) GetAllOrders(market string) map[string]*types.Order

func (*MarketDepthBuilder) GetBestAskPrice

func (mdb *MarketDepthBuilder) GetBestAskPrice(market string) *num.Uint

GetBestAskPrice returns the highest bid price in the book.

func (*MarketDepthBuilder) GetBestBidPrice

func (mdb *MarketDepthBuilder) GetBestBidPrice(market string) *num.Uint

GetBestBidPrice returns the highest bid price in the book.

func (*MarketDepthBuilder) GetBuyPriceLevels

func (mdb *MarketDepthBuilder) GetBuyPriceLevels(market string) int

GetBuyPriceLevels returns the number of non empty buy price levels.

func (*MarketDepthBuilder) GetMarketDepth

func (mdb *MarketDepthBuilder) GetMarketDepth(ctx context.Context, market string, limit uint64) (*types.MarketDepth, error)

GetMarketDepth builds up the structure to be sent out to any market depth listeners.

func (*MarketDepthBuilder) GetOrderCount

func (mdb *MarketDepthBuilder) GetOrderCount(market string) int64

GetOrderCount returns the number of live orders for the given market.

func (*MarketDepthBuilder) GetOrderCountAtPrice

func (mdb *MarketDepthBuilder) GetOrderCountAtPrice(market string, side types.Side, price uint64) uint64

GetOrderCountAtPrice returns the number of orders at the given price level.

func (*MarketDepthBuilder) GetPriceLevels

func (mdb *MarketDepthBuilder) GetPriceLevels(market string) int

GetPriceLevels returns the number of non empty price levels.

func (*MarketDepthBuilder) GetSellPriceLevels

func (mdb *MarketDepthBuilder) GetSellPriceLevels(market string) int

GetSellPriceLevels returns the number of non empty sell price levels.

func (*MarketDepthBuilder) GetTotalVolume

func (mdb *MarketDepthBuilder) GetTotalVolume(market string) int64

GetTotalVolume returns the total volume in the order book.

func (*MarketDepthBuilder) GetVolumeAtPrice

func (mdb *MarketDepthBuilder) GetVolumeAtPrice(market string, side types.Side, price uint64) uint64

GetVolumeAtPrice returns the order volume at the given price level.

func (*MarketDepthBuilder) Push

func (mdb *MarketDepthBuilder) Push(evts ...events.Event)

Push takes order messages and applied them to the makret depth structure.

func (*MarketDepthBuilder) Subscribe

func (mdb *MarketDepthBuilder) Subscribe(updates chan<- *types.MarketDepthUpdate) uint64

Subscribe allows a client to register for updates of the market depth book.

func (*MarketDepthBuilder) Types

func (mdb *MarketDepthBuilder) Types() []events.Type

Types returns all the message types this subscriber wants to receive.

func (*MarketDepthBuilder) Unsubscribe

func (mdb *MarketDepthBuilder) Unsubscribe(id uint64) error

Unsubscribe allows the client to unregister interest in market depth updates.

type MarketStreamEvent

type MarketStreamEvent interface {
	StreamEvent
	StreamMarketMessage() *eventspb.BusEvent
}

type OE

type OE interface {
	events.Event
	Order() *ptypes.Order
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(broker Broker) *Service

func (*Service) ObserveEvents

func (s *Service) ObserveEvents(ctx context.Context, retries int, eTypes []events.Type, batchSize int, filters ...EventFilter) (<-chan []*eventspb.BusEvent, chan<- int)

type StreamEvent

type StreamEvent interface {
	events.Event
	StreamMessage() *eventspb.BusEvent
}

type StreamSub

type StreamSub struct {
	*Base
	// contains filtered or unexported fields
}

func NewStreamSub

func NewStreamSub(ctx context.Context, types []events.Type, batchSize int, filters ...EventFilter) *StreamSub

func (*StreamSub) GetData

func (s *StreamSub) GetData(ctx context.Context) []*eventspb.BusEvent

GetData returns events from buffer, all if bufSize == 0, or max buffer size (rest are kept in data slice).

func (*StreamSub) Halt

func (s *StreamSub) Halt()

func (*StreamSub) Push

func (s *StreamSub) Push(evts ...events.Event)

func (StreamSub) Types

func (s StreamSub) Types() []events.Type

func (*StreamSub) UpdateBatchSize

func (s *StreamSub) UpdateBatchSize(ctx context.Context, size int) []*eventspb.BusEvent

UpdateBatchSize changes the batch size, and returns whatever the current buffer contains it's effectively a poll of current events ignoring requested batch size.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL