run

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package run provides components for dispatching orders.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConflateTrade

func ConflateTrade(existing *mkt.Trade, latest *mkt.Trade) *mkt.Trade

ConflateTrade is a convenience function for a trade utl.ConflatingQueue.

func NewCompositeConflatingQueue added in v0.2.0

func NewCompositeConflatingQueue[T mkt.AnyOrder](fn CompositeConflator[T]) *utl.ConflatingQueue[string, *Composite[T]]

NewCompositeConflatingQueue makes a composite queue for an OrderProcess.

func SubscriberQuoteQueueConnector

func SubscriberQuoteQueueConnector(queue *utl.ConflatingQueue[string, *mkt.Quote]) func(*mkt.Quote)

SubscriberQuoteQueueConnector provides the 'onQuote' callback function for a dma.Subscriber.

func SubscriberTradeQueueConnector

func SubscriberTradeQueueConnector(queue *utl.ConflatingQueue[string, *mkt.Trade]) func(*mkt.Trade)

SubscriberTradeQueueConnector provides the 'onTrade' callback function for a dma.Subscriber.

Types

type Composite

type Composite[T mkt.AnyOrder] struct {
	Instructions []T
	Reports      []*mkt.Report
	Quote        *mkt.Quote
	Trade        *mkt.Trade
}

Composite is a set of independent updates to be pushed into a utl.ConflatingQueue for an OrderProcess.

func ConflateComposite

func ConflateComposite[T mkt.AnyOrder](existing *Composite[T], latest *Composite[T]) *Composite[T]

ConflateComposite implements CompositeConflator.

type CompositeConflator added in v0.2.0

type CompositeConflator[T mkt.AnyOrder] func(existing *Composite[T], latest *Composite[T]) *Composite[T]

CompositeConflator is any function that can conflate items for the order utl.ConflatingQueue of *Composite updates.

type Delegate added in v0.2.0

type Delegate[T mkt.AnyOrder] interface {
	// Action the [*Composite] update. Return true if the order is now complete
	// and no further action is necessary.
	Action(*Composite[T]) bool
	// CleanUp instructs the [Delegate] to prepare for the [Dispatcher] to exit.
	// This does not mean the order is cancelled.
	CleanUp()
}

Delegate is the interface for an OrderProcess to delegate work on an order.

type DelegateFactory added in v0.2.0

type DelegateFactory[T mkt.AnyOrder] interface {
	New(order T) Delegate[T]
}

DelegateFactory is used by Dispatcher to manufacture a Delegate for new orders.

type Dispatcher

type Dispatcher[T mkt.AnyOrder] struct {
	// contains filtered or unexported fields
}

Dispatcher owns all orders and routes information to them.

func NewDispatcher

func NewDispatcher[T mkt.AnyOrder](
	instructions chan T,
	factory DelegateFactory[T],
	conflator CompositeConflator[T],
	reports chan *mkt.Report,
	subscriber dma.Subscribable,
	quotes *utl.ConflatingQueue[string, *mkt.Quote],
	trades *utl.ConflatingQueue[string, *mkt.Trade],
	onError func(string, error),
) *Dispatcher[T]

NewDispatcher returns a *Dispatcher ready to use.

func (*Dispatcher[T]) Run

func (x *Dispatcher[T]) Run(ctx context.Context, shutdown *sync.WaitGroup)

Run dispatching until the given context is cancelled. That cancellation is a signal that dispatching must stop, not that orders are cancelled.

type OrderProcess

type OrderProcess[T mkt.AnyOrder] struct {
	// contains filtered or unexported fields
}

An OrderProcess runs for the lifetime of an order.

func NewOrderProcess added in v0.2.0

func NewOrderProcess[T mkt.AnyOrder](order T, factory DelegateFactory[T], conflate CompositeConflator[T]) *OrderProcess[T]

NewOrderProcess returns an *OrderProcess for a new order.

func (*OrderProcess[T]) Definition added in v0.2.0

func (x *OrderProcess[T]) Definition() *mkt.Order

Definition returns the mkt.Order.Definition for the Dispatcher.

func (*OrderProcess[T]) Queue added in v0.2.0

func (x *OrderProcess[T]) Queue() *utl.ConflatingQueue[string, *Composite[T]]

Queue returns the queue for the Dispatcher.

func (*OrderProcess[T]) Run

func (x *OrderProcess[T]) Run(ctx context.Context, shutdown *sync.WaitGroup, completed chan<- string)

Run until the context is cancelled or the Delegate has completed.

type Reporter added in v0.2.2

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter feeds the channel into Dispatcher.

func (*Reporter) Acknowledge added in v0.2.2

func (x *Reporter) Acknowledge()

Acknowledge signals that the last *mkt.Report sent on the channel has been processed.

func (*Reporter) OnReport added in v0.2.2

func (x *Reporter) OnReport(report *mkt.Report)

OnReport sends the *mkt.Report onto the channel. This will block until the report has been acknowledged (Reporter.Acknowledge), as reports must not be lost. Typically the caller will be ready from a checkpointed stream, such as Redis.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL