Documentation ¶
Index ¶
- Constants
- Variables
- func MinimumPollingInterval(c Config) time.Duration
- func NewORM(db *gorm.DB, txm transmitter, strategy bulletprooftxmanager.TxStrategy) *orm
- func ValidatedFluxMonitorSpec(config ValidationConfig, ts string) (job.Job, error)
- type Config
- type ContractFlags
- type ContractSubmitter
- type Delegate
- type DeviationChecker
- type DeviationThresholds
- type Flags
- type FluxAggregatorContractSubmitter
- type FluxMonitor
- type FluxMonitorRoundStatsV2
- type KeyStore
- type KeyStoreInterface
- type ORM
- type PaymentChecker
- type PollManager
- func (pm *PollManager) Awaken(roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) DrumbeatTicks() <-chan time.Time
- func (pm *PollManager) Hibernate()
- func (pm *PollManager) HibernationTimerTicks() <-chan time.Time
- func (pm *PollManager) IdleTimerTicks() <-chan time.Time
- func (pm *PollManager) Poll() <-chan PollRequest
- func (pm *PollManager) PollTickerTicks() <-chan time.Time
- func (pm *PollManager) Reset(roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) ResetIdleTimer(roundStartedAtUTC uint64)
- func (pm *PollManager) RetryTickerTicks() <-chan time.Time
- func (pm *PollManager) RoundTimerTicks() <-chan time.Time
- func (pm *PollManager) ShouldPerformInitialPoll() bool
- func (pm *PollManager) Start(hibernate bool, roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) StartRetryTicker() bool
- func (pm *PollManager) Stop()
- func (pm *PollManager) StopRetryTicker()
- type PollManagerConfig
- type PollRequest
- type PollRequestType
- type SubmissionChecker
- type ValidationConfig
Constants ¶
const ( PriorityFlagChangedLog uint = 0 PriorityNewRoundLog uint = 1 PriorityAnswerUpdatedLog uint = 2 )
const DefaultHibernationPollPeriod = 168 * time.Hour
const MinFundedRounds int64 = 3
MinFundedRounds defines the minimum number of rounds that needs to be paid to oracles on a contract
Variables ¶
var ( // ErrNotEligible defines when the round is not eligible for submission ErrNotEligible = errors.New("not eligible to submit") // ErrUnderfunded defines when the aggregator does not have sufficient funds ErrUnderfunded = errors.New("aggregator is underfunded") // ErrPaymentTooLow defines when the round payment is too low ErrPaymentTooLow = errors.New("round payment amount < minimum contract payment") )
var FluxAggregatorABI = eth.MustGetABI(flux_aggregator_wrapper.FluxAggregatorABI)
FluxAggregatorABI initializes the Flux Aggregator ABI
Functions ¶
func MinimumPollingInterval ¶ added in v1.10.17
MinimumPollingInterval returns the minimum duration between polling ticks
func NewORM ¶
func NewORM(db *gorm.DB, txm transmitter, strategy bulletprooftxmanager.TxStrategy) *orm
NewORM initializes a new ORM
func ValidatedFluxMonitorSpec ¶
func ValidatedFluxMonitorSpec(config ValidationConfig, ts string) (job.Job, error)
Types ¶
type Config ¶
type Config interface { DefaultHTTPTimeout() models.Duration FlagsContractAddress() string MinimumContractPayment() *assets.Link EvmGasLimitDefault() uint64 EvmMaxQueuedTransactions() uint64 FMDefaultTransactionQueueDepth() uint32 }
Config defines the Flux Monitor configuration.
type ContractFlags ¶
type ContractFlags struct {
flags_wrapper.FlagsInterface
}
ContractFlags wraps the a contract
func (*ContractFlags) Contract ¶
func (f *ContractFlags) Contract() flags_wrapper.FlagsInterface
Contract returns the flags contract
func (*ContractFlags) ContractExists ¶
func (f *ContractFlags) ContractExists() bool
ContractExists returns whether a flag contract exists
type ContractSubmitter ¶
type ContractSubmitter interface {
Submit(db *gorm.DB, roundID *big.Int, submission *big.Int) error
}
ContractSubmitter defines an interface to submit an eth tx.
type Delegate ¶
type Delegate struct {
// contains filtered or unexported fields
}
Delegate represents a Flux Monitor delegate
func NewDelegate ¶
func NewDelegate( ethKeyStore keystore.Eth, jobORM job.ORM, pipelineORM pipeline.ORM, pipelineRunner pipeline.Runner, db *gorm.DB, chainSet evm.ChainSet, lggr logger.Logger, ) *Delegate
NewDelegate constructs a new delegate
func (Delegate) AfterJobCreated ¶
func (Delegate) BeforeJobDeleted ¶
type DeviationChecker ¶
type DeviationChecker struct {
Thresholds DeviationThresholds
}
DeviationChecker checks the deviation of the next answer against the current answer.
func NewDeviationChecker ¶
func NewDeviationChecker(rel, abs float64) *DeviationChecker
NewDeviationChecker constructs a new deviation checker with thresholds.
func NewZeroDeviationChecker ¶
func NewZeroDeviationChecker() *DeviationChecker
NewZeroDeviationChecker constructs a new deviation checker with 0 as thresholds.
func (*DeviationChecker) OutsideDeviation ¶
func (c *DeviationChecker) OutsideDeviation(curAnswer, nextAnswer decimal.Decimal) bool
OutsideDeviation checks whether the next price is outside the threshold. If both thresholds are zero (default value), always returns true.
type DeviationThresholds ¶
type DeviationThresholds struct { Rel float64 // Relative change required, i.e. |new-old|/|old| >= Rel Abs float64 // Absolute change required, i.e. |new-old| >= Abs }
DeviationThresholds carries parameters used by the threshold-trigger logic
type Flags ¶
type FluxAggregatorContractSubmitter ¶
type FluxAggregatorContractSubmitter struct { flux_aggregator_wrapper.FluxAggregatorInterface // contains filtered or unexported fields }
FluxAggregatorContractSubmitter submits the polled answer in an eth tx.
func NewFluxAggregatorContractSubmitter ¶
func NewFluxAggregatorContractSubmitter( contract flux_aggregator_wrapper.FluxAggregatorInterface, orm ORM, keyStore KeyStoreInterface, gasLimit uint64, ) *FluxAggregatorContractSubmitter
NewFluxAggregatorContractSubmitter constructs a new NewFluxAggregatorContractSubmitter
type FluxMonitor ¶
type FluxMonitor struct { utils.StartStopOnce // contains filtered or unexported fields }
FluxMonitor polls external price adapters via HTTP to check for price swings.
func NewFluxMonitor ¶
func NewFluxMonitor( pipelineRunner pipeline.Runner, jobSpec job.Job, spec pipeline.Spec, db *gorm.DB, orm ORM, jobORM job.ORM, pipelineORM pipeline.ORM, keyStore KeyStoreInterface, pollManager *PollManager, paymentChecker *PaymentChecker, contractAddress common.Address, contractSubmitter ContractSubmitter, deviationChecker *DeviationChecker, submissionChecker *SubmissionChecker, flags Flags, fluxAggregator flux_aggregator_wrapper.FluxAggregatorInterface, logBroadcaster log.Broadcaster, fmLogger logger.Logger, ) (*FluxMonitor, error)
NewFluxMonitor returns a new instance of PollingDeviationChecker.
func NewFromJobSpec ¶
func NewFromJobSpec( jobSpec job.Job, db *gorm.DB, orm ORM, jobORM job.ORM, pipelineORM pipeline.ORM, keyStore KeyStoreInterface, ethClient eth.Client, logBroadcaster log.Broadcaster, pipelineRunner pipeline.Runner, cfg Config, lggr logger.Logger, ) (*FluxMonitor, error)
NewFromJobSpec constructs an instance of FluxMonitor with sane defaults and validation.
func (*FluxMonitor) Close ¶
func (fm *FluxMonitor) Close() error
Close implements the job.Service interface. It stops this instance from polling, cleaning up resources.
func (*FluxMonitor) HandleLog ¶
func (fm *FluxMonitor) HandleLog(broadcast log.Broadcast)
HandleLog processes the contract logs
func (*FluxMonitor) IsHibernating ¶
func (fm *FluxMonitor) IsHibernating() bool
func (*FluxMonitor) JobID ¶
func (fm *FluxMonitor) JobID() int32
JobID implements the listener.Listener interface.
func (*FluxMonitor) SetOracleAddress ¶
func (fm *FluxMonitor) SetOracleAddress() error
SetOracleAddress sets the oracle address which matches the node's keys. If none match, it uses the first available key
func (*FluxMonitor) Start ¶
func (fm *FluxMonitor) Start() error
Start implements the job.Service interface. It begins the CSP consumer in a single goroutine to poll the price adapters and listen to NewRound events.
type FluxMonitorRoundStatsV2 ¶
type FluxMonitorRoundStatsV2 struct { ID uint64 `gorm:"primary key;not null;auto_increment"` PipelineRunID null.Int64 `gorm:"default:null"` Aggregator common.Address `gorm:"not null"` RoundID uint32 `gorm:"not null"` NumNewRoundLogs uint64 `gorm:"not null;default 0"` NumSubmissions uint64 `gorm:"not null;default 0"` }
FluxMonitorRoundStatsV2 defines the stats for a round
type KeyStore ¶
KeyStore implements KeyStoreInterface
func NewKeyStore ¶
NewKeyStore initializes a new keystore
type KeyStoreInterface ¶
type KeyStoreInterface interface { SendingKeys() ([]ethkey.KeyV2, error) GetRoundRobinAddress(...common.Address) (common.Address, error) }
KeyStoreInterface defines an interface to interact with the keystore
type ORM ¶
type ORM interface { MostRecentFluxMonitorRoundID(aggregator common.Address) (uint32, error) DeleteFluxMonitorRoundsBackThrough(aggregator common.Address, roundID uint32) error FindOrCreateFluxMonitorRoundStats(aggregator common.Address, roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, error) UpdateFluxMonitorRoundStats(db *gorm.DB, aggregator common.Address, roundID uint32, runID int64, newRoundLogsAddition uint) error CreateEthTransaction(db *gorm.DB, fromAddress, toAddress common.Address, payload []byte, gasLimit uint64) error }
ORM defines an interface for database commands related to Flux Monitor v2
type PaymentChecker ¶
type PaymentChecker struct { // The minimum amount for a payment set in the ENV Var MinContractPayment *assets.Link // The minimum amount for a payment set in the job MinJobPayment *assets.Link }
PaymentChecker provides helper functions to check whether payments are valid
func NewPaymentChecker ¶
func NewPaymentChecker(minContractPayment, minJobPayment *assets.Link) *PaymentChecker
NewPaymentChecker constructs a new payment checker
func (*PaymentChecker) SufficientFunds ¶
func (c *PaymentChecker) SufficientFunds(availableFunds *big.Int, paymentAmount *big.Int, oracleCount uint8) bool
SufficientFunds checks if the contract has sufficient funding to pay all the oracles on a contract for a minimum number of rounds, based on the payment amount in the contract
func (*PaymentChecker) SufficientPayment ¶
func (c *PaymentChecker) SufficientPayment(payment *big.Int) bool
SufficientPayment checks if the available payment is enough to submit an answer. It compares the payment amount on chain with the min payment amount listed in the job / ENV var.
type PollManager ¶
type PollManager struct {
// contains filtered or unexported fields
}
PollManager manages the tickers/timers which cause the Flux Monitor to start a poll. It contains 4 types of tickers and timers which determine when to initiate a poll
HibernationTimer - The PollManager can be set to hibernate, which disables all other ticker/timers, and enables the hibernation timer. Upon expiry of the hibernation timer, a poll is requested. When the PollManager is awakened, the other tickers and timers are enabled with the current round state, and the hibernation timer is disabled.
PollTicker - The poll ticker requests a poll at a given interval defined in PollManagerConfig. Disabling this through config will permanently disable the ticker, even through a resets.
IdleTimer - The idle timer requests a poll after no poll has taken place since the last round was start and the IdleTimerPeriod has elapsed. This can also be known as a heartbeat.
RoundTimer - The round timer requests a poll when the round state provided by the contract has timed out.
RetryTicker - The retry ticker requests a poll with a backoff duration. This is started when the idle timer fails, and will poll with a maximum backoff of either 1 hour or the idle timer period if it is lower
func NewPollManager ¶
func NewPollManager(cfg PollManagerConfig, logger logger.Logger) (*PollManager, error)
NewPollManager initializes a new PollManager
func (*PollManager) Awaken ¶
func (pm *PollManager) Awaken(roundState flux_aggregator_wrapper.OracleRoundState)
Awaken sets hibernation to false, stops the hibernation timer and starts all other tickers
func (*PollManager) DrumbeatTicks ¶
func (pm *PollManager) DrumbeatTicks() <-chan time.Time
DrumbeatTicks ticks on a cron schedule when the drumbeat ticker is activated
func (*PollManager) Hibernate ¶
func (pm *PollManager) Hibernate()
Hibernate sets hibernation to true, starts the hibernation timer and stops all other ticker/timers
func (*PollManager) HibernationTimerTicks ¶
func (pm *PollManager) HibernationTimerTicks() <-chan time.Time
HibernationTimerTicks ticks after a given period
func (*PollManager) IdleTimerTicks ¶
func (pm *PollManager) IdleTimerTicks() <-chan time.Time
IdleTimerTicks ticks after a given period
func (*PollManager) Poll ¶
func (pm *PollManager) Poll() <-chan PollRequest
Poll returns a channel which the manager will use to send polling requests
Note: In the future, we should change the tickers above to send their request through this channel to simplify the listener.
func (*PollManager) PollTickerTicks ¶
func (pm *PollManager) PollTickerTicks() <-chan time.Time
PollTickerTicks ticks on a given interval
func (*PollManager) Reset ¶
func (pm *PollManager) Reset(roundState flux_aggregator_wrapper.OracleRoundState)
Reset resets the timers except for the hibernation timer. Will not reset if hibernating.
func (*PollManager) ResetIdleTimer ¶
func (pm *PollManager) ResetIdleTimer(roundStartedAtUTC uint64)
ResetIdleTimer resets the idle timer unless hibernating
func (*PollManager) RetryTickerTicks ¶
func (pm *PollManager) RetryTickerTicks() <-chan time.Time
RetryTickerTicks ticks with a backoff when the retry ticker is activated
func (*PollManager) RoundTimerTicks ¶
func (pm *PollManager) RoundTimerTicks() <-chan time.Time
RoundTimerTicks ticks after a given period
func (*PollManager) ShouldPerformInitialPoll ¶
func (pm *PollManager) ShouldPerformInitialPoll() bool
ShouldPerformInitialPoll determines whether to perform an initial poll
func (*PollManager) Start ¶
func (pm *PollManager) Start(hibernate bool, roundState flux_aggregator_wrapper.OracleRoundState)
Start initializes all the timers and determines whether to go into immediate hibernation.
func (*PollManager) StartRetryTicker ¶
func (pm *PollManager) StartRetryTicker() bool
StartRetryTicker starts the retry ticker
func (*PollManager) StopRetryTicker ¶
func (pm *PollManager) StopRetryTicker()
StopRetryTicker stops the retry ticker
type PollManagerConfig ¶
type PollManagerConfig struct { IsHibernating bool PollTickerInterval time.Duration PollTickerDisabled bool IdleTimerPeriod time.Duration IdleTimerDisabled bool DrumbeatSchedule string DrumbeatEnabled bool DrumbeatRandomDelay time.Duration HibernationPollPeriod time.Duration MinRetryBackoffDuration time.Duration MaxRetryBackoffDuration time.Duration }
type PollRequest ¶
type PollRequest struct { Type PollRequestType Timestamp time.Time }
PollRequest defines a request to initiate a poll
type PollRequestType ¶
type PollRequestType int
PollRequestType defines which method was used to request a poll
const ( PollRequestTypeUnknown PollRequestType = iota PollRequestTypeInitial PollRequestTypePoll PollRequestTypeIdle PollRequestTypeRound PollRequestTypeHibernation PollRequestTypeRetry PollRequestTypeAwaken PollRequestTypeDrumbeat )
type SubmissionChecker ¶
SubmissionChecker checks whether an answer is inside the allowable range.
func NewSubmissionChecker ¶
func NewSubmissionChecker(min *big.Int, max *big.Int) *SubmissionChecker
NewSubmissionChecker initializes a new SubmissionChecker