fluxmonitorv2

package
v2.14.0-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: MIT Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PriorityFlagChangedLog   uint = 0
	PriorityNewRoundLog      uint = 1
	PriorityAnswerUpdatedLog uint = 2
)
View Source
const DefaultHibernationPollPeriod = 24 * time.Hour

DefaultHibernationPollPeriod defines the hibernation polling period

View Source
const MinFundedRounds int64 = 3

MinFundedRounds defines the minimum number of rounds that needs to be paid to oracles on a contract

Variables

View Source
var (
	// ErrNotEligible defines when the round is not eligible for submission
	ErrNotEligible = errors.New("not eligible to submit")
	// ErrUnderfunded defines when the aggregator does not have sufficient funds
	ErrUnderfunded = errors.New("aggregator is underfunded")
	// ErrPaymentTooLow defines when the round payment is too low
	ErrPaymentTooLow = errors.New("round payment amount < minimum contract payment")
)

FluxAggregatorABI initializes the Flux Aggregator ABI

Functions

func MinimumPollingInterval

func MinimumPollingInterval(c JobPipelineConfig) time.Duration

MinimumPollingInterval returns the minimum duration between polling ticks

func ValidatedFluxMonitorSpec

func ValidatedFluxMonitorSpec(config ValidationConfig, ts string) (job.Job, error)

Types

type Config

type Config interface {
	FlagsContractAddress() string     // Evm
	MinContractPayment() *assets.Link // Evm
}

Config defines the Flux Monitor configuration.

type ContractFlags

type ContractFlags struct {
	flags_wrapper.FlagsInterface
}

ContractFlags wraps the a contract

func (*ContractFlags) Contract

Contract returns the flags contract

func (*ContractFlags) ContractExists

func (f *ContractFlags) ContractExists() bool

ContractExists returns whether a flag contract exists

func (*ContractFlags) IsLowered

func (f *ContractFlags) IsLowered(contractAddr common.Address) (bool, error)

IsLowered determines whether the flag is lowered for a given contract. If a contract does not exist, it is considered to be lowered

type ContractSubmitter

type ContractSubmitter interface {
	Submit(ctx context.Context, roundID *big.Int, submission *big.Int, idempotencyKey *string) error
}

ContractSubmitter defines an interface to submit an eth tx.

type Delegate

type Delegate struct {
	// contains filtered or unexported fields
}

Delegate represents a Flux Monitor delegate

func NewDelegate

func NewDelegate(
	cfg DelegateConfig,
	ethKeyStore keystore.Eth,
	jobORM job.ORM,
	pipelineORM pipeline.ORM,
	pipelineRunner pipeline.Runner,
	ds sqlutil.DataSource,
	legacyChains legacyevm.LegacyChainContainer,
	lggr logger.Logger,
) *Delegate

NewDelegate constructs a new delegate

func (*Delegate) AfterJobCreated

func (d *Delegate) AfterJobCreated(spec job.Job)

func (*Delegate) BeforeJobCreated

func (d *Delegate) BeforeJobCreated(spec job.Job)

func (*Delegate) BeforeJobDeleted

func (d *Delegate) BeforeJobDeleted(spec job.Job)

func (*Delegate) JobType

func (d *Delegate) JobType() job.Type

JobType implements the job.Delegate interface

func (*Delegate) OnDeleteJob

func (d *Delegate) OnDeleteJob(context.Context, job.Job) error

func (*Delegate) ServicesForSpec

func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []job.ServiceCtx, err error)

ServicesForSpec returns the flux monitor service for the job spec

type DelegateConfig added in v2.12.0

type DelegateConfig interface {
	FluxMonitor() config.FluxMonitor
	JobPipeline() config.JobPipeline
}

type DeviationChecker

type DeviationChecker struct {
	Thresholds DeviationThresholds
	// contains filtered or unexported fields
}

DeviationChecker checks the deviation of the next answer against the current answer.

func NewDeviationChecker

func NewDeviationChecker(rel, abs float64, lggr logger.Logger) *DeviationChecker

NewDeviationChecker constructs a new deviation checker with thresholds.

func NewZeroDeviationChecker

func NewZeroDeviationChecker(lggr logger.Logger) *DeviationChecker

NewZeroDeviationChecker constructs a new deviation checker with 0 as thresholds.

func (*DeviationChecker) OutsideDeviation

func (c *DeviationChecker) OutsideDeviation(curAnswer, nextAnswer decimal.Decimal) bool

OutsideDeviation checks whether the next price is outside the threshold. If both thresholds are zero (default value), always returns true.

type DeviationThresholds

type DeviationThresholds struct {
	Rel float64 // Relative change required, i.e. |new-old|/|old| >= Rel
	Abs float64 // Absolute change required, i.e. |new-old| >= Abs
}

DeviationThresholds carries parameters used by the threshold-trigger logic

type EvmFeeConfig added in v2.3.0

type EvmFeeConfig interface {
	LimitDefault() uint64 // Evm
	LimitJobType() config.LimitJobType
}

type EvmTransactionsConfig added in v2.3.0

type EvmTransactionsConfig interface {
	MaxQueued() uint64 // Evm
}

type Flags

type Flags interface {
	ContractExists() bool
	IsLowered(contractAddr common.Address) (bool, error)
	Address() common.Address
	ParseLog(log types.Log) (generated.AbigenLog, error)
}

func NewFlags

func NewFlags(addrHex string, ethClient evmclient.Client) (Flags, error)

NewFlags constructs a new Flags from a flags contract address

type FluxAggregatorContractSubmitter

type FluxAggregatorContractSubmitter struct {
	flux_aggregator_wrapper.FluxAggregatorInterface
	// contains filtered or unexported fields
}

FluxAggregatorContractSubmitter submits the polled answer in an eth tx.

func NewFluxAggregatorContractSubmitter

func NewFluxAggregatorContractSubmitter(
	contract flux_aggregator_wrapper.FluxAggregatorInterface,
	orm ORM,
	keyStore KeyStoreInterface,
	gasLimit uint64,
	forwardingAllowed bool,
	chainID *big.Int,
) *FluxAggregatorContractSubmitter

NewFluxAggregatorContractSubmitter constructs a new NewFluxAggregatorContractSubmitter

func (*FluxAggregatorContractSubmitter) Submit

func (c *FluxAggregatorContractSubmitter) Submit(ctx context.Context, roundID *big.Int, submission *big.Int, idempotencyKey *string) error

Submit submits the answer by writing a EthTx for the txmgr to pick up

type FluxMonitor

type FluxMonitor struct {
	services.StateMachine
	// contains filtered or unexported fields
}

FluxMonitor polls external price adapters via HTTP to check for price swings.

func NewFluxMonitor

func NewFluxMonitor(
	pipelineRunner pipeline.Runner,
	jobSpec job.Job,
	spec pipeline.Spec,
	ds sqlutil.DataSource,
	orm ORM,
	jobORM job.ORM,
	pipelineORM pipeline.ORM,
	keyStore KeyStoreInterface,
	pollManager *PollManager,
	paymentChecker *PaymentChecker,
	contractAddress common.Address,
	contractSubmitter ContractSubmitter,
	deviationChecker *DeviationChecker,
	submissionChecker *SubmissionChecker,
	flags Flags,
	fluxAggregator flux_aggregator_wrapper.FluxAggregatorInterface,
	logBroadcaster log.Broadcaster,
	fmLogger logger.Logger,
	chainID *big.Int,
) (*FluxMonitor, error)

NewFluxMonitor returns a new instance of PollingDeviationChecker.

func NewFromJobSpec

func NewFromJobSpec(
	jobSpec job.Job,
	ds sqlutil.DataSource,
	orm ORM,
	jobORM job.ORM,
	pipelineORM pipeline.ORM,
	keyStore KeyStoreInterface,
	ethClient evmclient.Client,
	logBroadcaster log.Broadcaster,
	pipelineRunner pipeline.Runner,
	cfg Config,
	fcfg EvmFeeConfig,
	jcfg JobPipelineConfig,
	lggr logger.Logger,
) (*FluxMonitor, error)

NewFromJobSpec constructs an instance of FluxMonitor with sane defaults and validation.

func (*FluxMonitor) Close

func (fm *FluxMonitor) Close() error

Close implements the job.Service interface. It stops this instance from polling, cleaning up resources.

func (*FluxMonitor) HandleLog

func (fm *FluxMonitor) HandleLog(ctx context.Context, broadcast log.Broadcast)

HandleLog processes the contract logs

func (*FluxMonitor) IsHibernating

func (fm *FluxMonitor) IsHibernating() bool

func (*FluxMonitor) JobID

func (fm *FluxMonitor) JobID() int32

JobID implements the listener.Listener interface.

func (*FluxMonitor) SetOracleAddress

func (fm *FluxMonitor) SetOracleAddress() error

SetOracleAddress sets the oracle address which matches the node's keys. If none match, it uses the first available key

func (*FluxMonitor) Start

func (fm *FluxMonitor) Start(context.Context) error

Start implements the job.Service interface. It begins the CSP consumer in a single goroutine to poll the price adapters and listen to NewRound events.

func (*FluxMonitor) Transact added in v2.12.0

func (fm *FluxMonitor) Transact(ctx context.Context, fn func(sqlutil.DataSource) error) error

type FluxMonitorConfig added in v2.3.0

type FluxMonitorConfig interface {
	DefaultTransactionQueueDepth() uint32
}

type FluxMonitorRoundStatsV2

type FluxMonitorRoundStatsV2 struct {
	ID              uint64
	PipelineRunID   null.Int64
	Aggregator      common.Address
	RoundID         uint32
	NumNewRoundLogs uint64
	NumSubmissions  uint64
}

FluxMonitorRoundStatsV2 defines the stats for a round

type JobPipelineConfig added in v2.3.0

type JobPipelineConfig interface {
	DefaultHTTPTimeout() commonconfig.Duration
}

type KeyStore

type KeyStore struct {
	keystore.Eth
}

KeyStore implements KeyStoreInterface

func NewKeyStore

func NewKeyStore(ks keystore.Eth) *KeyStore

NewKeyStore initializes a new keystore

type KeyStoreInterface

type KeyStoreInterface interface {
	EnabledKeysForChain(ctx context.Context, chainID *big.Int) ([]ethkey.KeyV2, error)
	GetRoundRobinAddress(ctx context.Context, chainID *big.Int, addrs ...common.Address) (common.Address, error)
}

KeyStoreInterface defines an interface to interact with the keystore

type ORM

type ORM interface {
	MostRecentFluxMonitorRoundID(ctx context.Context, aggregator common.Address) (uint32, error)
	DeleteFluxMonitorRoundsBackThrough(ctx context.Context, aggregator common.Address, roundID uint32) error
	FindOrCreateFluxMonitorRoundStats(ctx context.Context, aggregator common.Address, roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, error)
	UpdateFluxMonitorRoundStats(ctx context.Context, aggregator common.Address, roundID uint32, runID int64, newRoundLogsAddition uint) error
	CreateEthTransaction(ctx context.Context, fromAddress, toAddress common.Address, payload []byte, gasLimit uint64, idempotencyKey *string) error
	CountFluxMonitorRoundStats(ctx context.Context) (count int, err error)

	WithDataSource(sqlutil.DataSource) ORM
}

ORM defines an interface for database commands related to Flux Monitor v2

func NewORM

func NewORM(ds sqlutil.DataSource, lggr logger.Logger, txm transmitter, strategy types.TxStrategy, checker txmgr.TransmitCheckerSpec) ORM

NewORM initializes a new ORM

type PaymentChecker

type PaymentChecker struct {
	// The minimum amount for a payment set in the ENV Var
	MinContractPayment *assets.Link
	// The minimum amount for a payment set in the job
	MinJobPayment *assets.Link
}

PaymentChecker provides helper functions to check whether payments are valid

func NewPaymentChecker

func NewPaymentChecker(minContractPayment, minJobPayment *assets.Link) *PaymentChecker

NewPaymentChecker constructs a new payment checker

func (*PaymentChecker) SufficientFunds

func (c *PaymentChecker) SufficientFunds(availableFunds *big.Int, paymentAmount *big.Int, oracleCount uint8) bool

SufficientFunds checks if the contract has sufficient funding to pay all the oracles on a contract for a minimum number of rounds, based on the payment amount in the contract

func (*PaymentChecker) SufficientPayment

func (c *PaymentChecker) SufficientPayment(payment *big.Int) bool

SufficientPayment checks if the available payment is enough to submit an answer. It compares the payment amount on chain with the min payment amount listed in the job / ENV var.

type PollManager

type PollManager struct {
	// contains filtered or unexported fields
}

PollManager manages the tickers/timers which cause the Flux Monitor to start a poll. It contains 4 types of tickers and timers which determine when to initiate a poll

HibernationTimer - The PollManager can be set to hibernate, which disables all other ticker/timers, and enables the hibernation timer. Upon expiry of the hibernation timer, a poll is requested. When the PollManager is awakened, the other tickers and timers are enabled with the current round state, and the hibernation timer is disabled.

PollTicker - The poll ticker requests a poll at a given interval defined in PollManagerConfig. Disabling this through config will permanently disable the ticker, even through a resets.

IdleTimer - The idle timer requests a poll after no poll has taken place since the last round was start and the IdleTimerPeriod has elapsed. This can also be known as a heartbeat.

RoundTimer - The round timer requests a poll when the round state provided by the contract has timed out.

RetryTicker - The retry ticker requests a poll with a backoff duration. This is started when the idle timer fails, and will poll with a maximum backoff of either 1 hour or the idle timer period if it is lower

func NewPollManager

func NewPollManager(cfg PollManagerConfig, logger logger.Logger) (*PollManager, error)

NewPollManager initializes a new PollManager

func (*PollManager) Awaken

func (pm *PollManager) Awaken(roundState flux_aggregator_wrapper.OracleRoundState)

Awaken sets hibernation to false, stops the hibernation timer and starts all other tickers

func (*PollManager) DrumbeatTicks

func (pm *PollManager) DrumbeatTicks() <-chan time.Time

DrumbeatTicks ticks on a cron schedule when the drumbeat ticker is activated

func (*PollManager) Hibernate

func (pm *PollManager) Hibernate()

Hibernate sets hibernation to true, starts the hibernation timer and stops all other ticker/timers

func (*PollManager) HibernationTimerTicks

func (pm *PollManager) HibernationTimerTicks() <-chan time.Time

HibernationTimerTicks ticks after a given period

func (*PollManager) IdleTimerTicks

func (pm *PollManager) IdleTimerTicks() <-chan time.Time

IdleTimerTicks ticks after a given period

func (*PollManager) Poll

func (pm *PollManager) Poll() <-chan PollRequest

Poll returns a channel which the manager will use to send polling requests

Note: In the future, we should change the tickers above to send their request through this channel to simplify the listener.

func (*PollManager) PollTickerTicks

func (pm *PollManager) PollTickerTicks() <-chan time.Time

PollTickerTicks ticks on a given interval

func (*PollManager) Reset

Reset resets the timers except for the hibernation timer. Will not reset if hibernating.

func (*PollManager) ResetIdleTimer

func (pm *PollManager) ResetIdleTimer(roundStartedAtUTC uint64)

ResetIdleTimer resets the idle timer unless hibernating

func (*PollManager) RetryTickerTicks

func (pm *PollManager) RetryTickerTicks() <-chan time.Time

RetryTickerTicks ticks with a backoff when the retry ticker is activated

func (*PollManager) RoundTimerTicks

func (pm *PollManager) RoundTimerTicks() <-chan time.Time

RoundTimerTicks ticks after a given period

func (*PollManager) ShouldPerformInitialPoll

func (pm *PollManager) ShouldPerformInitialPoll() bool

ShouldPerformInitialPoll determines whether to perform an initial poll

func (*PollManager) Start

func (pm *PollManager) Start(hibernate bool, roundState flux_aggregator_wrapper.OracleRoundState)

Start initializes all the timers and determines whether to go into immediate hibernation.

func (*PollManager) StartRetryTicker

func (pm *PollManager) StartRetryTicker() bool

StartRetryTicker starts the retry ticker

func (*PollManager) Stop

func (pm *PollManager) Stop()

Stop stops all timers/tickers

func (*PollManager) StopRetryTicker

func (pm *PollManager) StopRetryTicker()

StopRetryTicker stops the retry ticker

type PollManagerConfig

type PollManagerConfig struct {
	IsHibernating           bool
	PollTickerInterval      time.Duration
	PollTickerDisabled      bool
	IdleTimerPeriod         time.Duration
	IdleTimerDisabled       bool
	DrumbeatSchedule        string
	DrumbeatEnabled         bool
	DrumbeatRandomDelay     time.Duration
	HibernationPollPeriod   time.Duration
	MinRetryBackoffDuration time.Duration
	MaxRetryBackoffDuration time.Duration
}

type PollRequest

type PollRequest struct {
	Type      PollRequestType
	Timestamp time.Time
}

PollRequest defines a request to initiate a poll

type PollRequestType

type PollRequestType int

PollRequestType defines which method was used to request a poll

const (
	PollRequestTypeUnknown PollRequestType = iota
	PollRequestTypeInitial
	PollRequestTypePoll
	PollRequestTypeIdle
	PollRequestTypeRound
	PollRequestTypeHibernation
	PollRequestTypeRetry
	PollRequestTypeAwaken
	PollRequestTypeDrumbeat
)

type SubmissionChecker

type SubmissionChecker struct {
	Min decimal.Decimal
	Max decimal.Decimal
}

SubmissionChecker checks whether an answer is inside the allowable range.

func NewSubmissionChecker

func NewSubmissionChecker(min *big.Int, max *big.Int) *SubmissionChecker

NewSubmissionChecker initializes a new SubmissionChecker

func (*SubmissionChecker) IsValid

func (c *SubmissionChecker) IsValid(answer decimal.Decimal) bool

IsValid checks if the submission is between the min and max

type ValidationConfig

type ValidationConfig interface {
	DefaultHTTPTimeout() commonconfig.Duration
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL