Documentation ¶
Index ¶
- Constants
- Variables
- func MinimumPollingInterval(c JobPipelineConfig) time.Duration
- func ValidatedFluxMonitorSpec(config ValidationConfig, ts string) (job.Job, error)
- type Config
- type ContractFlags
- type ContractSubmitter
- type Delegate
- func (d *Delegate) AfterJobCreated(spec job.Job)
- func (d *Delegate) BeforeJobCreated(spec job.Job)
- func (d *Delegate) BeforeJobDeleted(spec job.Job)
- func (d *Delegate) JobType() job.Type
- func (d *Delegate) OnDeleteJob(context.Context, job.Job) error
- func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []job.ServiceCtx, err error)
- type DelegateConfig
- type DeviationChecker
- type DeviationThresholds
- type EvmFeeConfig
- type EvmTransactionsConfig
- type Flags
- type FluxAggregatorContractSubmitter
- type FluxMonitor
- func (fm *FluxMonitor) Close() error
- func (fm *FluxMonitor) HandleLog(ctx context.Context, broadcast log.Broadcast)
- func (fm *FluxMonitor) IsHibernating() bool
- func (fm *FluxMonitor) JobID() int32
- func (fm *FluxMonitor) SetOracleAddress() error
- func (fm *FluxMonitor) Start(context.Context) error
- func (fm *FluxMonitor) Transact(ctx context.Context, fn func(sqlutil.DataSource) error) error
- type FluxMonitorConfig
- type FluxMonitorRoundStatsV2
- type JobPipelineConfig
- type KeyStore
- type KeyStoreInterface
- type ORM
- type PaymentChecker
- type PollManager
- func (pm *PollManager) Awaken(roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) DrumbeatTicks() <-chan time.Time
- func (pm *PollManager) Hibernate()
- func (pm *PollManager) HibernationTimerTicks() <-chan time.Time
- func (pm *PollManager) IdleTimerTicks() <-chan time.Time
- func (pm *PollManager) Poll() <-chan PollRequest
- func (pm *PollManager) PollTickerTicks() <-chan time.Time
- func (pm *PollManager) Reset(roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) ResetIdleTimer(roundStartedAtUTC uint64)
- func (pm *PollManager) RetryTickerTicks() <-chan time.Time
- func (pm *PollManager) RoundTimerTicks() <-chan time.Time
- func (pm *PollManager) ShouldPerformInitialPoll() bool
- func (pm *PollManager) Start(hibernate bool, roundState flux_aggregator_wrapper.OracleRoundState)
- func (pm *PollManager) StartRetryTicker() bool
- func (pm *PollManager) Stop()
- func (pm *PollManager) StopRetryTicker()
- type PollManagerConfig
- type PollRequest
- type PollRequestType
- type SubmissionChecker
- type ValidationConfig
Constants ¶
const ( PriorityFlagChangedLog uint = 0 PriorityNewRoundLog uint = 1 PriorityAnswerUpdatedLog uint = 2 )
const DefaultHibernationPollPeriod = 24 * time.Hour
DefaultHibernationPollPeriod defines the hibernation polling period
const MinFundedRounds int64 = 3
MinFundedRounds defines the minimum number of rounds that needs to be paid to oracles on a contract
Variables ¶
var ( // ErrNotEligible defines when the round is not eligible for submission ErrNotEligible = errors.New("not eligible to submit") // ErrUnderfunded defines when the aggregator does not have sufficient funds ErrUnderfunded = errors.New("aggregator is underfunded") // ErrPaymentTooLow defines when the round payment is too low ErrPaymentTooLow = errors.New("round payment amount < minimum contract payment") )
var FluxAggregatorABI = evmtypes.MustGetABI(flux_aggregator_wrapper.FluxAggregatorABI)
FluxAggregatorABI initializes the Flux Aggregator ABI
Functions ¶
func MinimumPollingInterval ¶
func MinimumPollingInterval(c JobPipelineConfig) time.Duration
MinimumPollingInterval returns the minimum duration between polling ticks
func ValidatedFluxMonitorSpec ¶
func ValidatedFluxMonitorSpec(config ValidationConfig, ts string) (job.Job, error)
Types ¶
type Config ¶
type Config interface { FlagsContractAddress() string // Evm MinContractPayment() *assets.Link // Evm }
Config defines the Flux Monitor configuration.
type ContractFlags ¶
type ContractFlags struct {
flags_wrapper.FlagsInterface
}
ContractFlags wraps the a contract
func (*ContractFlags) Contract ¶
func (f *ContractFlags) Contract() flags_wrapper.FlagsInterface
Contract returns the flags contract
func (*ContractFlags) ContractExists ¶
func (f *ContractFlags) ContractExists() bool
ContractExists returns whether a flag contract exists
type ContractSubmitter ¶
type ContractSubmitter interface {
Submit(ctx context.Context, roundID *big.Int, submission *big.Int, idempotencyKey *string) error
}
ContractSubmitter defines an interface to submit an eth tx.
type Delegate ¶
type Delegate struct {
// contains filtered or unexported fields
}
Delegate represents a Flux Monitor delegate
func NewDelegate ¶
func NewDelegate( cfg DelegateConfig, ethKeyStore keystore.Eth, jobORM job.ORM, pipelineORM pipeline.ORM, pipelineRunner pipeline.Runner, ds sqlutil.DataSource, legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger, ) *Delegate
NewDelegate constructs a new delegate
func (*Delegate) AfterJobCreated ¶
func (*Delegate) BeforeJobCreated ¶
func (*Delegate) BeforeJobDeleted ¶
func (*Delegate) ServicesForSpec ¶
func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []job.ServiceCtx, err error)
ServicesForSpec returns the flux monitor service for the job spec
type DelegateConfig ¶ added in v2.12.0
type DelegateConfig interface { FluxMonitor() config.FluxMonitor JobPipeline() config.JobPipeline }
type DeviationChecker ¶
type DeviationChecker struct { Thresholds DeviationThresholds // contains filtered or unexported fields }
DeviationChecker checks the deviation of the next answer against the current answer.
func NewDeviationChecker ¶
func NewDeviationChecker(rel, abs float64, lggr logger.Logger) *DeviationChecker
NewDeviationChecker constructs a new deviation checker with thresholds.
func NewZeroDeviationChecker ¶
func NewZeroDeviationChecker(lggr logger.Logger) *DeviationChecker
NewZeroDeviationChecker constructs a new deviation checker with 0 as thresholds.
func (*DeviationChecker) OutsideDeviation ¶
func (c *DeviationChecker) OutsideDeviation(curAnswer, nextAnswer decimal.Decimal) bool
OutsideDeviation checks whether the next price is outside the threshold. If both thresholds are zero (default value), always returns true.
type DeviationThresholds ¶
type DeviationThresholds struct { Rel float64 // Relative change required, i.e. |new-old|/|old| >= Rel Abs float64 // Absolute change required, i.e. |new-old| >= Abs }
DeviationThresholds carries parameters used by the threshold-trigger logic
type EvmFeeConfig ¶ added in v2.3.0
type EvmFeeConfig interface { LimitDefault() uint64 // Evm LimitJobType() config.LimitJobType }
type EvmTransactionsConfig ¶ added in v2.3.0
type EvmTransactionsConfig interface {
MaxQueued() uint64 // Evm
}
type Flags ¶
type FluxAggregatorContractSubmitter ¶
type FluxAggregatorContractSubmitter struct { flux_aggregator_wrapper.FluxAggregatorInterface // contains filtered or unexported fields }
FluxAggregatorContractSubmitter submits the polled answer in an eth tx.
func NewFluxAggregatorContractSubmitter ¶
func NewFluxAggregatorContractSubmitter( contract flux_aggregator_wrapper.FluxAggregatorInterface, orm ORM, keyStore KeyStoreInterface, gasLimit uint64, forwardingAllowed bool, chainID *big.Int, ) *FluxAggregatorContractSubmitter
NewFluxAggregatorContractSubmitter constructs a new NewFluxAggregatorContractSubmitter
type FluxMonitor ¶
type FluxMonitor struct { services.StateMachine // contains filtered or unexported fields }
FluxMonitor polls external price adapters via HTTP to check for price swings.
func NewFluxMonitor ¶
func NewFluxMonitor( pipelineRunner pipeline.Runner, jobSpec job.Job, spec pipeline.Spec, ds sqlutil.DataSource, orm ORM, jobORM job.ORM, pipelineORM pipeline.ORM, keyStore KeyStoreInterface, pollManager *PollManager, paymentChecker *PaymentChecker, contractAddress common.Address, contractSubmitter ContractSubmitter, deviationChecker *DeviationChecker, submissionChecker *SubmissionChecker, flags Flags, fluxAggregator flux_aggregator_wrapper.FluxAggregatorInterface, logBroadcaster log.Broadcaster, fmLogger logger.Logger, chainID *big.Int, ) (*FluxMonitor, error)
NewFluxMonitor returns a new instance of PollingDeviationChecker.
func NewFromJobSpec ¶
func NewFromJobSpec( jobSpec job.Job, ds sqlutil.DataSource, orm ORM, jobORM job.ORM, pipelineORM pipeline.ORM, keyStore KeyStoreInterface, ethClient evmclient.Client, logBroadcaster log.Broadcaster, pipelineRunner pipeline.Runner, cfg Config, fcfg EvmFeeConfig, jcfg JobPipelineConfig, lggr logger.Logger, ) (*FluxMonitor, error)
NewFromJobSpec constructs an instance of FluxMonitor with sane defaults and validation.
func (*FluxMonitor) Close ¶
func (fm *FluxMonitor) Close() error
Close implements the job.Service interface. It stops this instance from polling, cleaning up resources.
func (*FluxMonitor) HandleLog ¶
func (fm *FluxMonitor) HandleLog(ctx context.Context, broadcast log.Broadcast)
HandleLog processes the contract logs
func (*FluxMonitor) IsHibernating ¶
func (fm *FluxMonitor) IsHibernating() bool
func (*FluxMonitor) JobID ¶
func (fm *FluxMonitor) JobID() int32
JobID implements the listener.Listener interface.
func (*FluxMonitor) SetOracleAddress ¶
func (fm *FluxMonitor) SetOracleAddress() error
SetOracleAddress sets the oracle address which matches the node's keys. If none match, it uses the first available key
func (*FluxMonitor) Start ¶
func (fm *FluxMonitor) Start(context.Context) error
Start implements the job.Service interface. It begins the CSP consumer in a single goroutine to poll the price adapters and listen to NewRound events.
func (*FluxMonitor) Transact ¶ added in v2.12.0
func (fm *FluxMonitor) Transact(ctx context.Context, fn func(sqlutil.DataSource) error) error
type FluxMonitorConfig ¶ added in v2.3.0
type FluxMonitorConfig interface {
DefaultTransactionQueueDepth() uint32
}
type FluxMonitorRoundStatsV2 ¶
type FluxMonitorRoundStatsV2 struct { ID uint64 PipelineRunID null.Int64 Aggregator common.Address RoundID uint32 NumNewRoundLogs uint64 NumSubmissions uint64 }
FluxMonitorRoundStatsV2 defines the stats for a round
type JobPipelineConfig ¶ added in v2.3.0
type JobPipelineConfig interface {
DefaultHTTPTimeout() commonconfig.Duration
}
type KeyStore ¶
KeyStore implements KeyStoreInterface
func NewKeyStore ¶
NewKeyStore initializes a new keystore
type KeyStoreInterface ¶
type KeyStoreInterface interface { EnabledKeysForChain(ctx context.Context, chainID *big.Int) ([]ethkey.KeyV2, error) GetRoundRobinAddress(ctx context.Context, chainID *big.Int, addrs ...common.Address) (common.Address, error) }
KeyStoreInterface defines an interface to interact with the keystore
type ORM ¶
type ORM interface { MostRecentFluxMonitorRoundID(ctx context.Context, aggregator common.Address) (uint32, error) DeleteFluxMonitorRoundsBackThrough(ctx context.Context, aggregator common.Address, roundID uint32) error FindOrCreateFluxMonitorRoundStats(ctx context.Context, aggregator common.Address, roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, error) UpdateFluxMonitorRoundStats(ctx context.Context, aggregator common.Address, roundID uint32, runID int64, newRoundLogsAddition uint) error CreateEthTransaction(ctx context.Context, fromAddress, toAddress common.Address, payload []byte, gasLimit uint64, idempotencyKey *string) error CountFluxMonitorRoundStats(ctx context.Context) (count int, err error) WithDataSource(sqlutil.DataSource) ORM }
ORM defines an interface for database commands related to Flux Monitor v2
func NewORM ¶
func NewORM(ds sqlutil.DataSource, lggr logger.Logger, txm transmitter, strategy types.TxStrategy, checker txmgr.TransmitCheckerSpec) ORM
NewORM initializes a new ORM
type PaymentChecker ¶
type PaymentChecker struct { // The minimum amount for a payment set in the ENV Var MinContractPayment *assets.Link // The minimum amount for a payment set in the job MinJobPayment *assets.Link }
PaymentChecker provides helper functions to check whether payments are valid
func NewPaymentChecker ¶
func NewPaymentChecker(minContractPayment, minJobPayment *assets.Link) *PaymentChecker
NewPaymentChecker constructs a new payment checker
func (*PaymentChecker) SufficientFunds ¶
func (c *PaymentChecker) SufficientFunds(availableFunds *big.Int, paymentAmount *big.Int, oracleCount uint8) bool
SufficientFunds checks if the contract has sufficient funding to pay all the oracles on a contract for a minimum number of rounds, based on the payment amount in the contract
func (*PaymentChecker) SufficientPayment ¶
func (c *PaymentChecker) SufficientPayment(payment *big.Int) bool
SufficientPayment checks if the available payment is enough to submit an answer. It compares the payment amount on chain with the min payment amount listed in the job / ENV var.
type PollManager ¶
type PollManager struct {
// contains filtered or unexported fields
}
PollManager manages the tickers/timers which cause the Flux Monitor to start a poll. It contains 4 types of tickers and timers which determine when to initiate a poll
HibernationTimer - The PollManager can be set to hibernate, which disables all other ticker/timers, and enables the hibernation timer. Upon expiry of the hibernation timer, a poll is requested. When the PollManager is awakened, the other tickers and timers are enabled with the current round state, and the hibernation timer is disabled.
PollTicker - The poll ticker requests a poll at a given interval defined in PollManagerConfig. Disabling this through config will permanently disable the ticker, even through a resets.
IdleTimer - The idle timer requests a poll after no poll has taken place since the last round was start and the IdleTimerPeriod has elapsed. This can also be known as a heartbeat.
RoundTimer - The round timer requests a poll when the round state provided by the contract has timed out.
RetryTicker - The retry ticker requests a poll with a backoff duration. This is started when the idle timer fails, and will poll with a maximum backoff of either 1 hour or the idle timer period if it is lower
func NewPollManager ¶
func NewPollManager(cfg PollManagerConfig, logger logger.Logger) (*PollManager, error)
NewPollManager initializes a new PollManager
func (*PollManager) Awaken ¶
func (pm *PollManager) Awaken(roundState flux_aggregator_wrapper.OracleRoundState)
Awaken sets hibernation to false, stops the hibernation timer and starts all other tickers
func (*PollManager) DrumbeatTicks ¶
func (pm *PollManager) DrumbeatTicks() <-chan time.Time
DrumbeatTicks ticks on a cron schedule when the drumbeat ticker is activated
func (*PollManager) Hibernate ¶
func (pm *PollManager) Hibernate()
Hibernate sets hibernation to true, starts the hibernation timer and stops all other ticker/timers
func (*PollManager) HibernationTimerTicks ¶
func (pm *PollManager) HibernationTimerTicks() <-chan time.Time
HibernationTimerTicks ticks after a given period
func (*PollManager) IdleTimerTicks ¶
func (pm *PollManager) IdleTimerTicks() <-chan time.Time
IdleTimerTicks ticks after a given period
func (*PollManager) Poll ¶
func (pm *PollManager) Poll() <-chan PollRequest
Poll returns a channel which the manager will use to send polling requests
Note: In the future, we should change the tickers above to send their request through this channel to simplify the listener.
func (*PollManager) PollTickerTicks ¶
func (pm *PollManager) PollTickerTicks() <-chan time.Time
PollTickerTicks ticks on a given interval
func (*PollManager) Reset ¶
func (pm *PollManager) Reset(roundState flux_aggregator_wrapper.OracleRoundState)
Reset resets the timers except for the hibernation timer. Will not reset if hibernating.
func (*PollManager) ResetIdleTimer ¶
func (pm *PollManager) ResetIdleTimer(roundStartedAtUTC uint64)
ResetIdleTimer resets the idle timer unless hibernating
func (*PollManager) RetryTickerTicks ¶
func (pm *PollManager) RetryTickerTicks() <-chan time.Time
RetryTickerTicks ticks with a backoff when the retry ticker is activated
func (*PollManager) RoundTimerTicks ¶
func (pm *PollManager) RoundTimerTicks() <-chan time.Time
RoundTimerTicks ticks after a given period
func (*PollManager) ShouldPerformInitialPoll ¶
func (pm *PollManager) ShouldPerformInitialPoll() bool
ShouldPerformInitialPoll determines whether to perform an initial poll
func (*PollManager) Start ¶
func (pm *PollManager) Start(hibernate bool, roundState flux_aggregator_wrapper.OracleRoundState)
Start initializes all the timers and determines whether to go into immediate hibernation.
func (*PollManager) StartRetryTicker ¶
func (pm *PollManager) StartRetryTicker() bool
StartRetryTicker starts the retry ticker
func (*PollManager) StopRetryTicker ¶
func (pm *PollManager) StopRetryTicker()
StopRetryTicker stops the retry ticker
type PollManagerConfig ¶
type PollManagerConfig struct { IsHibernating bool PollTickerInterval time.Duration PollTickerDisabled bool IdleTimerPeriod time.Duration IdleTimerDisabled bool DrumbeatSchedule string DrumbeatEnabled bool DrumbeatRandomDelay time.Duration HibernationPollPeriod time.Duration MinRetryBackoffDuration time.Duration MaxRetryBackoffDuration time.Duration }
type PollRequest ¶
type PollRequest struct { Type PollRequestType Timestamp time.Time }
PollRequest defines a request to initiate a poll
type PollRequestType ¶
type PollRequestType int
PollRequestType defines which method was used to request a poll
const ( PollRequestTypeUnknown PollRequestType = iota PollRequestTypeInitial PollRequestTypePoll PollRequestTypeIdle PollRequestTypeRound PollRequestTypeHibernation PollRequestTypeRetry PollRequestTypeAwaken PollRequestTypeDrumbeat )
type SubmissionChecker ¶
SubmissionChecker checks whether an answer is inside the allowable range.
func NewSubmissionChecker ¶
func NewSubmissionChecker(min *big.Int, max *big.Int) *SubmissionChecker
NewSubmissionChecker initializes a new SubmissionChecker
type ValidationConfig ¶
type ValidationConfig interface {
DefaultHTTPTimeout() commonconfig.Duration
}