Documentation ¶
Index ¶
- Variables
- func RunMarketParamUpdaterTaskLoop(ctx context.Context, configs types.PricefeedMutableMarketConfigs, ...)
- func RunPriceUpdaterTaskLoop(ctx context.Context, exchangeToMarketPrices types.ExchangeToMarketPrices, ...) error
- type Client
- type SubTaskRunner
- type SubTaskRunnerImpl
- func (s *SubTaskRunnerImpl) StartMarketParamUpdater(ctx context.Context, ticker *time.Ticker, stop <-chan bool, ...)
- func (s *SubTaskRunnerImpl) StartPriceEncoder(exchangeId types.ExchangeId, configs types.PricefeedMutableMarketConfigs, ...)
- func (s *SubTaskRunnerImpl) StartPriceFetcher(ticker *time.Ticker, stop <-chan bool, ...)
- func (s *SubTaskRunnerImpl) StartPriceUpdater(c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, ...)
Constants ¶
This section is empty.
Variables ¶
var ( HttpClient = http.Client{ Transport: &http.Transport{MaxConnsPerHost: constants.MaxConnectionsPerExchange}, } )
Functions ¶
func RunMarketParamUpdaterTaskLoop ¶
func RunMarketParamUpdaterTaskLoop( ctx context.Context, configs types.PricefeedMutableMarketConfigs, pricesQueryClient pricetypes.QueryClient, logger log.Logger, isPastGracePeriod bool, )
RunMarketParamUpdaterTaskLoop queries all market params from the query client, and then updates the shared, in-memory `PricefeedMutableMarketConfigs` object with the latest market params.
func RunPriceUpdaterTaskLoop ¶
func RunPriceUpdaterTaskLoop( ctx context.Context, exchangeToMarketPrices types.ExchangeToMarketPrices, priceFeedServiceClient api.PriceFeedServiceClient, logger log.Logger, ) error
RunPriceUpdaterTaskLoop copies the map of current `exchangeId -> MarketPriceTimestamp`, transforms the map values into a market price update request and sends the request to the socket where the pricefeed server is listening.
Types ¶
type Client ¶
type Client struct { // include HealthCheckable to track the health of the daemon. daemontypes.HealthCheckable // contains filtered or unexported fields }
Client encapsulates the logic for executing and cleanly stopping all subtasks associated with the pricefeed client daemon. Access to the client's internal state is synchronized. The pricefeed daemon is a job that periodically queries external exchanges and transmits price data to the pricefeed service, which is then used by the application to compute index prices for proposing and validating oracle price updates on the blockchain. Note: price fetchers manage their own subtasks by blocking on their completion on every subtask run. When the price fetcher is stopped, it will wait for all of its own subtasks to complete before returning.
func StartNewClient ¶
func StartNewClient( ctx context.Context, daemonFlags flags.DaemonFlags, appFlags appflags.Flags, logger log.Logger, grpcClient daemontypes.GrpcClient, exchangeIdToQueryConfig map[types.ExchangeId]*types.ExchangeQueryConfig, exchangeIdToExchangeDetails map[types.ExchangeId]types.ExchangeQueryDetails, subTaskRunner SubTaskRunner, ) (client *Client)
StartNewClient initializes and starts a new pricefeed daemon as a subtask of the calling process. The pricefeed daemon is a job that periodically queries external exchanges and transmits price data to the pricefeed service, which is then used by the application to compute index prices for proposing and validating oracle price updates on the blockchain. Note: the daemon will panic if it fails to start up.
type SubTaskRunner ¶
type SubTaskRunner interface { StartPriceUpdater( c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, exchangeToMarketPrices types.ExchangeToMarketPrices, priceFeedServiceClient api.PriceFeedServiceClient, logger log.Logger, ) StartPriceEncoder( exchangeId types.ExchangeId, configs types.PricefeedMutableMarketConfigs, exchangeToMarketPrices types.ExchangeToMarketPrices, logger log.Logger, bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse, ) StartPriceFetcher( ticker *time.Ticker, stop <-chan bool, configs types.PricefeedMutableMarketConfigs, exchangeQueryConfig types.ExchangeQueryConfig, exchangeDetails types.ExchangeQueryDetails, queryHandler handler.ExchangeQueryHandler, logger log.Logger, bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse, ) StartMarketParamUpdater( ctx context.Context, ticker *time.Ticker, stop <-chan bool, configs types.PricefeedMutableMarketConfigs, pricesQueryClient pricetypes.QueryClient, logger log.Logger, ) }
SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunnerImpl ¶
type SubTaskRunnerImpl struct{}
SubTaskRunnerImpl is the struct that implements the `SubTaskRunner` interface.
func (*SubTaskRunnerImpl) StartMarketParamUpdater ¶
func (s *SubTaskRunnerImpl) StartMarketParamUpdater( ctx context.Context, ticker *time.Ticker, stop <-chan bool, configs types.PricefeedMutableMarketConfigs, pricesQueryClient pricetypes.QueryClient, logger log.Logger, )
StartMarketParamUpdater periodically starts a goroutine to update the market parameters that control which markets the daemon queries and how they are queried and computed from each exchange.
func (*SubTaskRunnerImpl) StartPriceEncoder ¶
func (s *SubTaskRunnerImpl) StartPriceEncoder( exchangeId types.ExchangeId, configs types.PricefeedMutableMarketConfigs, exchangeToMarketPrices types.ExchangeToMarketPrices, logger log.Logger, bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse, )
StartPriceEncoder continuously reads from a buffered channel, reading encoded API responses for exchange requests and inserting them into an `ExchangeToMarketPrices` cache, performing currency conversions based on the index price of other markets as necessary. StartPriceEncoder reads price fetcher responses from a shared channel, and does not need a ticker or stop signal from the daemon to exit. It marks itself as done in the daemon's wait group when the price fetcher closes the shared channel.
func (*SubTaskRunnerImpl) StartPriceFetcher ¶
func (s *SubTaskRunnerImpl) StartPriceFetcher( ticker *time.Ticker, stop <-chan bool, configs types.PricefeedMutableMarketConfigs, exchangeQueryConfig types.ExchangeQueryConfig, exchangeDetails types.ExchangeQueryDetails, queryHandler handler.ExchangeQueryHandler, logger log.Logger, bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse, )
StartPriceFetcher periodically starts goroutines to "fetch" market prices from a specific exchange. Each goroutine does the following: 1) query a single market price from a specific exchange 2) transform response to `MarketPriceTimestamp` 3) send transformed response to a buffered channel that's shared across multiple goroutines NOTE: the subtask response shared channel has a buffer size and goroutines will block if the buffer is full. NOTE: the price fetcher kicks off 1 to n go routines every time the subtask loop runs, but the subtask loop blocks until all go routines are done. This means that these go routines are not tracked by the wait group.
func (*SubTaskRunnerImpl) StartPriceUpdater ¶
func (s *SubTaskRunnerImpl) StartPriceUpdater( c *Client, ctx context.Context, ticker *time.Ticker, stop <-chan bool, exchangeToMarketPrices types.ExchangeToMarketPrices, priceFeedServiceClient api.PriceFeedServiceClient, logger log.Logger, )
StartPriceUpdater periodically runs a task loop to send price updates to the pricefeed server via: 1) Get `MarketPriceTimestamps` for all exchanges in an `ExchangeToMarketPrices` struct. 2) Transform `MarketPriceTimestamps` and exchange ids into an `UpdateMarketPricesRequest` struct. StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group to signal task completion.