Documentation ¶
Index ¶
- Constants
- Variables
- func BumpGas(config Config, originalGasPrice *big.Int) (*big.Int, error)
- func CheckEthTxQueueCapacity(db *gorm.DB, fromAddress common.Address, maxQueuedTransactions uint64) (err error)
- func CountUnconfirmedTransactions(db *gorm.DB, fromAddress common.Address) (count uint32, err error)
- func FindEthTxesRequiringResend(db *gorm.DB, olderThan time.Time, maxInFlightTransactions uint32) (attempts []models.EthTxAttempt, err error)
- func FindEthTxsRequiringGasBump(db *gorm.DB, address gethCommon.Address, ...) (etxs []models.EthTx, err error)
- func FindEthTxsRequiringRebroadcast(db *gorm.DB, address gethCommon.Address, ...) (etxs []models.EthTx, err error)
- func FindEthTxsRequiringResubmissionDueToInsufficientEth(db *gorm.DB, address gethCommon.Address) (etxs []models.EthTx, err error)
- func GetNextNonce(db *gorm.DB, address gethCommon.Address) (int64, error)
- func IncrementNextNonce(db *gorm.DB, address gethCommon.Address, currentNonce int64) error
- func SendEther(db *gorm.DB, from, to common.Address, value assets.Eth, gasLimit uint64) (etx models.EthTx, err error)
- type BulletproofTxManager
- func (b *BulletproofTxManager) Close() (merr error)
- func (b *BulletproofTxManager) Connect(*models.Head) error
- func (b *BulletproofTxManager) CreateEthTransaction(db *gorm.DB, fromAddress, toAddress common.Address, payload []byte, ...) (etx models.EthTx, err error)
- func (b *BulletproofTxManager) OnNewLongestChain(ctx context.Context, head models.Head)
- func (b *BulletproofTxManager) Start() (merr error)
- func (b *BulletproofTxManager) Trigger(addr common.Address)
- type Config
- type EthBroadcaster
- type EthConfirmer
- func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error
- func (ec *EthConfirmer) Close() error
- func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head models.Head) error
- func (ec *EthConfirmer) ForceRebroadcast(beginningNonce uint, endingNonce uint, gasPriceWei uint64, ...) error
- func (ec *EthConfirmer) ProcessHead(ctx context.Context, head models.Head) error
- func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
- func (ec *EthConfirmer) SetBroadcastBeforeBlockNum(blockNum int64) error
- func (ec *EthConfirmer) Start() error
- type EthResender
- type KeyStore
- type Log
- type NSinserttx
- type NonceSyncer
- type NullTxManager
- func (n *NullTxManager) Close() error
- func (n *NullTxManager) Connect(*models.Head) error
- func (n *NullTxManager) CreateEthTransaction(*gorm.DB, common.Address, common.Address, []byte, uint64, interface{}) (etx models.EthTx, err error)
- func (n *NullTxManager) Healthy() error
- func (n *NullTxManager) OnNewLongestChain(context.Context, models.Head)
- func (n *NullTxManager) Ready() error
- func (n *NullTxManager) Start() error
- func (n *NullTxManager) Trigger(common.Address)
- type Reaper
- type ReaperConfig
- type Receipt
- type TxManager
Constants ¶
const EthMaxInFlightTransactionsWarningLabel = `` /* 308-byte string literal not displayed */
const EthMaxQueuedTransactionsLabel = `` /* 592-byte string literal not displayed */
const EthNodeConnectivityProblemLabel = `` /* 150-byte string literal not displayed */
const InFlightTransactionRecheckInterval = 1 * time.Second
InFlightTransactionRecheckInterval controls how often the EthBroadcaster will poll the unconfirmed queue to see if it is allowed to send another transaction
Variables ¶
var ( // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt // This most likely happened because an external wallet used the account for this nonce ErrCouldNotGetReceipt = "could not get receipt" )
Functions ¶
func BumpGas ¶
BumpGas computes the next gas price to attempt as the largest of: - A configured percentage bump (ETH_GAS_BUMP_PERCENT) on top of the baseline price. - A configured fixed amount of Wei (ETH_GAS_PRICE_WEI) on top of the baseline price. The baseline price is the maximum of the previous gas price attempt and the node's current gas price.
func CheckEthTxQueueCapacity ¶
func CheckEthTxQueueCapacity(db *gorm.DB, fromAddress common.Address, maxQueuedTransactions uint64) (err error)
CheckEthTxQueueCapacity returns an error if inserting this transaction would exceed the maximum queue size.
func CountUnconfirmedTransactions ¶
func CountUnconfirmedTransactions(db *gorm.DB, fromAddress common.Address) (count uint32, err error)
CountUnconfirmedTransactions returns the number of unconfirmed transactions
func FindEthTxesRequiringResend ¶
func FindEthTxesRequiringResend(db *gorm.DB, olderThan time.Time, maxInFlightTransactions uint32) (attempts []models.EthTxAttempt, err error)
FindEthTxesRequiringResend returns the highest priced attempt for each eth_tx that was last sent before or at the given time (up to limit)
func FindEthTxsRequiringGasBump ¶
func FindEthTxsRequiringGasBump(db *gorm.DB, address gethCommon.Address, blockNum, gasBumpThreshold, depth int64) (etxs []models.EthTx, err error)
FindEthTxsRequiringGasBump returns transactions that have all attempts which are unconfirmed for at least gasBumpThreshold blocks, limited by limit pending transactions
It also returns eth_txes that are unconfirmed with no eth_tx_attempts
func FindEthTxsRequiringRebroadcast ¶
func FindEthTxsRequiringRebroadcast(db *gorm.DB, address gethCommon.Address, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32) (etxs []models.EthTx, err error)
FindEthTxsRequiringRebroadcast returns attempts that hit insufficient eth, and attempts that need bumping, in nonce ASC order
func FindEthTxsRequiringResubmissionDueToInsufficientEth ¶
func FindEthTxsRequiringResubmissionDueToInsufficientEth(db *gorm.DB, address gethCommon.Address) (etxs []models.EthTx, err error)
FindEthTxsRequiringResubmissionDueToInsufficientEth returns transactions that need to be re-sent because they hit an out-of-eth error on a previous block
func GetNextNonce ¶
GetNextNonce returns keys.next_nonce for the given address
func IncrementNextNonce ¶
IncrementNextNonce increments keys.next_nonce by 1
Types ¶
type BulletproofTxManager ¶
type BulletproofTxManager struct { utils.StartStopOnce // contains filtered or unexported fields }
func NewBulletproofTxManager ¶
func NewBulletproofTxManager(db *gorm.DB, ethClient eth.Client, config Config, keyStore KeyStore, advisoryLocker postgres.AdvisoryLocker, eventBroadcaster postgres.EventBroadcaster) *BulletproofTxManager
func (*BulletproofTxManager) Close ¶
func (b *BulletproofTxManager) Close() (merr error)
func (*BulletproofTxManager) Connect ¶
func (b *BulletproofTxManager) Connect(*models.Head) error
Connect solely exists to conform to HeadTrackable
func (*BulletproofTxManager) CreateEthTransaction ¶
func (b *BulletproofTxManager) CreateEthTransaction(db *gorm.DB, fromAddress, toAddress common.Address, payload []byte, gasLimit uint64, meta interface{}) (etx models.EthTx, err error)
CreateEthTransaction inserts a new transaction
func (*BulletproofTxManager) OnNewLongestChain ¶
func (b *BulletproofTxManager) OnNewLongestChain(ctx context.Context, head models.Head)
OnNewLongestChain conforms to HeadTrackable
func (*BulletproofTxManager) Start ¶
func (b *BulletproofTxManager) Start() (merr error)
func (*BulletproofTxManager) Trigger ¶
func (b *BulletproofTxManager) Trigger(addr common.Address)
Trigger forces the EthBroadcaster to check early for the given address
type Config ¶
type Config interface { ChainID() *big.Int EthFinalityDepth() uint EthGasBumpPercent() uint16 EthGasBumpThreshold() uint64 EthGasBumpTxDepth() uint16 EthGasBumpWei() *big.Int EthGasLimitDefault() uint64 EthGasLimitMultiplier() float32 EthGasPriceDefault() *big.Int EthMaxGasPriceWei() *big.Int EthMaxQueuedTransactions() uint64 EthNonceAutoSync() bool EthRPCDefaultBatchSize() uint32 EthTxReaperThreshold() time.Duration EthTxReaperInterval() time.Duration EthTxResendAfterThreshold() time.Duration TriggerFallbackDBPollInterval() time.Duration EthMaxInFlightTransactions() uint32 OptimismGasFees() bool }
Config encompasses config used by bulletprooftxmanager package Unless otherwise specified, these should support changing at runtime
type EthBroadcaster ¶
type EthBroadcaster struct { utils.StartStopOnce // contains filtered or unexported fields }
EthBroadcaster monitors eth_txes for transactions that need to be broadcast, assigns nonces and ensures that at least one eth node somewhere has received the transaction successfully.
This does not guarantee delivery! A whole host of other things can subsequently go wrong such as transactions being evicted from the mempool, eth nodes going offline etc. Responsibility for ensuring eventual inclusion into the chain falls on the shoulders of the ethConfirmer.
What EthBroadcaster does guarantee is: - a monotonic series of increasing nonces for eth_txes that can all eventually be confirmed if you retry enough times - transition of eth_txes out of unstarted into either fatal_error or unconfirmed - existence of a saved eth_tx_attempt
func NewEthBroadcaster ¶
func NewEthBroadcaster(db *gorm.DB, ethClient eth.Client, config Config, keystore KeyStore, advisoryLocker postgres.AdvisoryLocker, eventBroadcaster postgres.EventBroadcaster, allKeys []ethkey.Key) *EthBroadcaster
NewEthBroadcaster returns a new concrete EthBroadcaster
func (*EthBroadcaster) Close ¶
func (eb *EthBroadcaster) Close() error
func (*EthBroadcaster) ProcessUnstartedEthTxs ¶
func (eb *EthBroadcaster) ProcessUnstartedEthTxs(key ethkey.Key) error
func (*EthBroadcaster) Start ¶
func (eb *EthBroadcaster) Start() error
func (*EthBroadcaster) Trigger ¶
func (eb *EthBroadcaster) Trigger(addr gethCommon.Address)
Trigger forces the monitor for a particular address to recheck for new eth_txes Logs error and does nothing if address was not registered on startup
type EthConfirmer ¶
type EthConfirmer struct { utils.StartStopOnce // contains filtered or unexported fields }
func NewEthConfirmer ¶
func NewEthConfirmer(db *gorm.DB, ethClient eth.Client, config Config, keystore KeyStore, advisoryLocker postgres.AdvisoryLocker, keys []ethkey.Key) *EthConfirmer
NewEthConfirmer instantiates a new eth confirmer
func (*EthConfirmer) CheckForReceipts ¶
func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error
func (*EthConfirmer) Close ¶
func (ec *EthConfirmer) Close() error
func (*EthConfirmer) EnsureConfirmedTransactionsInLongestChain ¶
func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head models.Head) error
EnsureConfirmedTransactionsInLongestChain finds all confirmed eth_txes up to the depth of the given chain and ensures that every one has a receipt with a block hash that is in the given chain.
If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.
func (*EthConfirmer) ForceRebroadcast ¶
func (ec *EthConfirmer) ForceRebroadcast(beginningNonce uint, endingNonce uint, gasPriceWei uint64, address gethCommon.Address, overrideGasLimit uint64) error
ForceRebroadcast sends a transaction for every nonce in the given nonce range at the given gas price. If an eth_tx exists for this nonce, we re-send the existing eth_tx with the supplied parameters. If an eth_tx doesn't exist for this nonce, we send a zero transaction. This operates completely orthogonal to the normal EthConfirmer and can result in untracked attempts! Only for emergency usage. Deliberately does not take the advisory lock (we don't write to the database so this is safe from a data integrity perspective). This is in case of some unforeseen scenario where the node is refusing to release the lock. KISS.
func (*EthConfirmer) ProcessHead ¶
ProcessHead takes all required transactions for the confirmer on a new head
func (*EthConfirmer) RebroadcastWhereNecessary ¶
func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
func (*EthConfirmer) SetBroadcastBeforeBlockNum ¶
func (ec *EthConfirmer) SetBroadcastBeforeBlockNum(blockNum int64) error
SetBroadcastBeforeBlockNum updates already broadcast attempts with the current block number. This is safe no matter how old the head is because if the attempt is already broadcast it _must_ have been before this head.
func (*EthConfirmer) Start ¶
func (ec *EthConfirmer) Start() error
type EthResender ¶
type EthResender struct {
// contains filtered or unexported fields
}
EthResender periodically picks up transactions that have been languishing unconfirmed for a configured amount of time without being sent, and sends their highest priced attempt again. This helps to defend against geth/parity silently dropping txes, or txes being ejected from the mempool.
Previously we relied on the bumper to do this for us implicitly but there can occasionally be problems with this (e.g. abnormally long block times, or if gas bumping is disabled)
func NewEthResender ¶
func (*EthResender) Start ¶
func (er *EthResender) Start()
func (*EthResender) Stop ¶
func (er *EthResender) Stop()
type KeyStore ¶
type KeyStore interface { AllKeys() (keys []ethkey.Key, err error) SignTx(fromAddress common.Address, tx *gethTypes.Transaction, chainID *big.Int) (*gethTypes.Transaction, error) SubscribeToKeyChanges() (ch chan struct{}, unsub func()) }
KeyStore encompasses the subset of keystore used by bulletprooftxmanager
type Log ¶
type Log struct { Address common.Address `json:"address"` Topics []common.Hash `json:"topics"` Data []byte `json:"data"` BlockNumber uint64 `json:"blockNumber"` TxHash common.Hash `json:"transactionHash"` TxIndex uint `json:"transactionIndex"` BlockHash common.Hash `json:"blockHash"` Index uint `json:"logIndex"` Removed bool `json:"removed"` }
Log represents a contract log event.
Copied from go-ethereum: https://github.com/ethereum/go-ethereum/blob/ce9a289fa48e0d2593c4aaa7e207c8a5dd3eaa8a/core/types/log.go
We use our own version because Geth's version specifies various gencodec:"required" fields which cause unhelpful errors when unmarshalling from an empty JSON object which can happen in the batch fetcher.
func FromGethLog ¶
FromGethLog converts a gethTypes.Log to a Log
func (*Log) UnmarshalJSON ¶
UnmarshalJSON unmarshals from JSON.
type NSinserttx ¶
type NSinserttx struct { Etx models.EthTx Attempt models.EthTxAttempt }
NSinserttx represents an EthTx and Attempt to be inserted together
type NonceSyncer ¶
type NonceSyncer struct {
// contains filtered or unexported fields
}
NonceSyncer manages the delicate task of syncing the local nonce with the chain nonce in case of divergence.
On startup, we check each key for the nonce value on chain and compare it to our local value.
Usually the on-chain nonce will be the same as (or lower than) the next_nonce in the DB, in which case we do nothing.
If we are restoring from a backup however, or another wallet has used the account, the chain nonce might be higher than our local one. In this scenario, we must fastforward the local nonce to match the chain nonce.
The problem with doing this is that now Chainlink does not have any ownership or control over potentially pending transactions with nonces between our local highest nonce and the chain nonce. If one of those transactions is pushed out of the mempool or re-org'd out of the chain, we run the risk of being stuck with a gap in the nonce sequence that will never be filled.
The solution is to query the chain for our own transactions and take ownership of them by writing them to the database and letting the EthConfirmer handle them as it would any other transaction.
This is not quite as straightforward as one might expect. We cannot query transactions from our account to infinite depth (geth does not support this). The best we can do is to query for all transactions sent within the past ETH_FINALITY_DEPTH blocks and find the ones sent by our address(es).
This gives us re-org protection up to ETH_FINALITY_DEPTH deep in the worst case, which is in line with our other guarantees.
func NewNonceSyncer ¶
func NewNonceSyncer(db *gorm.DB, ethClient eth.Client) *NonceSyncer
NewNonceSyncer returns a new syncer
type NullTxManager ¶
type NullTxManager struct {
ErrMsg string
}
func (*NullTxManager) Close ¶
func (n *NullTxManager) Close() error
func (*NullTxManager) CreateEthTransaction ¶
func (*NullTxManager) Healthy ¶
func (n *NullTxManager) Healthy() error
func (*NullTxManager) OnNewLongestChain ¶
func (n *NullTxManager) OnNewLongestChain(context.Context, models.Head)
func (*NullTxManager) Ready ¶
func (n *NullTxManager) Ready() error
func (*NullTxManager) Start ¶
func (n *NullTxManager) Start() error
func (*NullTxManager) Trigger ¶
func (n *NullTxManager) Trigger(common.Address)
type Reaper ¶
type Reaper struct {
// contains filtered or unexported fields
}
Reaper handles periodic database cleanup for BPTXM
func NewReaper ¶
func NewReaper(db *gorm.DB, config ReaperConfig) *Reaper
NewReaper instantiates a new reaper object
func (*Reaper) ReapEthTxes ¶
ReapEthTxes deletes old eth_txes
func (*Reaper) ReapJobRuns ¶
ReapJobRuns removes old job runs HACK: This isn't quite the right place for it, but since we are killing the old pipeline this code is temporary anyway
func (*Reaper) SetLatestBlockNum ¶
SetLatestBlockNum should be called on every new highest block number
type ReaperConfig ¶
type ReaperConfig interface { EthTxReaperInterval() time.Duration EthTxReaperThreshold() time.Duration EthFinalityDepth() uint }
ReaperConfig is the config subset used by the reaper
type Receipt ¶
type Receipt struct { PostState []byte `json:"root"` Status uint64 `json:"status"` CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` Bloom gethTypes.Bloom `json:"logsBloom"` Logs []*Log `json:"logs"` TxHash common.Hash `json:"transactionHash"` ContractAddress common.Address `json:"contractAddress"` GasUsed uint64 `json:"gasUsed"` BlockHash common.Hash `json:"blockHash,omitempty"` BlockNumber *big.Int `json:"blockNumber,omitempty"` TransactionIndex uint `json:"transactionIndex"` }
Receipt represents an ethereum receipt.
Copied from go-ethereum: https://github.com/ethereum/go-ethereum/blob/ce9a289fa48e0d2593c4aaa7e207c8a5dd3eaa8a/core/types/receipt.go#L50
We use our own version because Geth's version specifies various gencodec:"required" fields which cause unhelpful errors when unmarshalling from an empty JSON object which can happen in the batch fetcher.
func FromGethReceipt ¶
FromGethReceipt converts a gethTypes.Receipt to a Receipt
func (Receipt) IsUnmined ¶
IsUnmined returns true if the receipt is for a TX that has not been mined yet. Supposedly according to the spec this should never happen, but Parity does it anyway.
func (Receipt) IsZero ¶
IsZero returns true if receipt is the zero receipt Batch calls to the RPC will return a pointer to an empty Receipt struct Easiest way to check if the receipt was missing is to see if the hash is 0x0 Real receipts will always have the TxHash set
func (Receipt) MarshalJSON ¶
MarshalJSON marshals Receipt as JSON. Copied from: https://github.com/ethereum/go-ethereum/blob/ce9a289fa48e0d2593c4aaa7e207c8a5dd3eaa8a/core/types/gen_receipt_json.go
func (*Receipt) UnmarshalJSON ¶
UnmarshalJSON unmarshals from JSON.