Documentation ¶
Index ¶
- Constants
- Variables
- func CheckEthTxQueueCapacity(q pg.Queryer, fromAddress common.Address, maxQueuedTransactions uint64, ...) (err error)
- func CountUnconfirmedTransactions(q pg.Q, fromAddress common.Address, chainID big.Int) (count uint32, err error)
- func CountUnstartedTransactions(q pg.Q, fromAddress common.Address, chainID big.Int) (count uint32, err error)
- type ChainKeyStore
- func (c *ChainKeyStore) NewDynamicFeeAttempt(etx EthTx, fee gas.DynamicFee, gasLimit uint32) (attempt EthTxAttempt, err error)
- func (c *ChainKeyStore) NewLegacyAttempt(etx EthTx, gasPrice *assets.Wei, gasLimit uint32) (attempt EthTxAttempt, err error)
- func (c *ChainKeyStore) SignTx(address common.Address, tx *gethTypes.Transaction) (common.Hash, []byte, error)
- type CheckerFactory
- type Config
- type DropOldestStrategy
- type EthBroadcaster
- type EthConfirmer
- func (ec *EthConfirmer) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
- func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error
- func (ec *EthConfirmer) Close() error
- func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head *evmtypes.Head) error
- func (ec *EthConfirmer) ForceRebroadcast(beginningNonce uint, endingNonce uint, gasPriceWei uint64, ...) error
- func (ec *EthConfirmer) ProcessHead(ctx context.Context, head *evmtypes.Head) error
- func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
- func (ec *EthConfirmer) ResumePendingTaskRuns(ctx context.Context, head *evmtypes.Head) error
- func (ec *EthConfirmer) SetBroadcastBeforeBlockNum(blockNum int64) error
- func (ec *EthConfirmer) Start(_ context.Context) error
- type EthReceipt
- type EthResender
- type EthTx
- func FindEthTxsRequiringGasBump(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, ...) (etxs []*EthTx, err error)
- func FindEthTxsRequiringRebroadcast(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, ...) (etxs []*EthTx, err error)
- func FindEthTxsRequiringResubmissionDueToInsufficientEth(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, ...) (etxs []*EthTx, err error)
- type EthTxAttempt
- func (a EthTxAttempt) DynamicFee() gas.DynamicFee
- func (a EthTxAttempt) GetBroadcastBeforeBlockNum() *int64
- func (a EthTxAttempt) GetChainSpecificGasLimit() uint32
- func (a EthTxAttempt) GetGasPrice() *assets.Wei
- func (a EthTxAttempt) GetHash() common.Hash
- func (a EthTxAttempt) GetSignedTx() (*types.Transaction, error)
- func (a EthTxAttempt) GetTxType() int
- type EthTxAttemptState
- type EthTxMeta
- type EthTxState
- type KeyStore
- type NSinserttx
- type NewTx
- type NonceSyncer
- type NonceSyncerKeyStore
- type NullTxManager
- func (n *NullTxManager) Close() error
- func (n *NullTxManager) CreateEthTransaction(NewTx, ...pg.QOpt) (etx EthTx, err error)
- func (n *NullTxManager) GetForwarderForEOA(addr common.Address) (fwdr common.Address, err error)
- func (n *NullTxManager) GetGasEstimator() gas.Estimator
- func (n *NullTxManager) Healthy() error
- func (n *NullTxManager) OnNewLongestChain(context.Context, *evmtypes.Head)
- func (n *NullTxManager) Ready() error
- func (n *NullTxManager) RegisterResumeCallback(fn ResumeCallback)
- func (n *NullTxManager) Reset(f func(), addr common.Address, abandon bool) error
- func (n *NullTxManager) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)
- func (n *NullTxManager) Start(context.Context) error
- func (n *NullTxManager) Trigger(common.Address)
- type NullableEIP2930AccessList
- type ORM
- type Reaper
- type ReaperConfig
- type ResumeCallback
- type SendEveryStrategy
- type SimulateChecker
- type TransmitChecker
- type TransmitCheckerFactory
- type TransmitCheckerSpec
- type TransmitCheckerType
- type TxManager
- type TxStrategy
- type Txm
- func (b *Txm) Close() (merr error)
- func (b *Txm) CreateEthTransaction(newTx NewTx, qs ...pg.QOpt) (etx EthTx, err error)
- func (b *Txm) GetForwarderForEOA(eoa common.Address) (forwarder common.Address, err error)
- func (b *Txm) GetGasEstimator() gas.Estimator
- func (b *Txm) OnNewLongestChain(ctx context.Context, head *evmtypes.Head)
- func (b *Txm) RegisterResumeCallback(fn ResumeCallback)
- func (b *Txm) Reset(callback func(), addr common.Address, abandon bool) (err error)
- func (b *Txm) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)
- func (b *Txm) Start(ctx context.Context) (merr error)
- func (b *Txm) Trigger(addr common.Address)
- type VRFV1Checker
- type VRFV2Checker
Constants ¶
const ( // InFlightTransactionRecheckInterval controls how often the EthBroadcaster // will poll the unconfirmed queue to see if it is allowed to send another // transaction InFlightTransactionRecheckInterval = 1 * time.Second // TransmitCheckTimeout controls the maximum amount of time that will be // spent on the transmit check. TransmitCheckTimeout = 2 * time.Second )
const ( EthTxUnstarted = EthTxState("unstarted") EthTxInProgress = EthTxState("in_progress") EthTxFatalError = EthTxState("fatal_error") EthTxUnconfirmed = EthTxState("unconfirmed") EthTxConfirmed = EthTxState("confirmed") EthTxConfirmedMissingReceipt = EthTxState("confirmed_missing_receipt") EthTxAttemptInProgress = EthTxAttemptState("in_progress") EthTxAttemptInsufficientEth = EthTxAttemptState("insufficient_eth") EthTxAttemptBroadcast = EthTxAttemptState("broadcast") // TransmitCheckerTypeSimulate is a checker that simulates the transaction before executing on // chain. TransmitCheckerTypeSimulate = TransmitCheckerType("simulate") // TransmitCheckerTypeVRFV1 is a checker that will not submit VRF V1 fulfillment requests that // have already been fulfilled. This could happen if the request was fulfilled by another node. TransmitCheckerTypeVRFV1 = TransmitCheckerType("vrf_v1") // TransmitCheckerTypeVRFV2 is a checker that will not submit VRF V2 fulfillment requests that // have already been fulfilled. This could happen if the request was fulfilled by another node. TransmitCheckerTypeVRFV2 = TransmitCheckerType("vrf_v2") )
Variables ¶
var ( // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt // This most likely happened because an external wallet used the account for this nonce ErrCouldNotGetReceipt = "could not get receipt" )
Functions ¶
func CheckEthTxQueueCapacity ¶
func CheckEthTxQueueCapacity(q pg.Queryer, fromAddress common.Address, maxQueuedTransactions uint64, chainID big.Int) (err error)
CheckEthTxQueueCapacity returns an error if inserting this transaction would exceed the maximum queue size.
Types ¶
type ChainKeyStore ¶
type ChainKeyStore struct {
// contains filtered or unexported fields
}
func NewChainKeyStore ¶
func NewChainKeyStore(chainID big.Int, config Config, keystore KeyStore) ChainKeyStore
func (*ChainKeyStore) NewDynamicFeeAttempt ¶
func (c *ChainKeyStore) NewDynamicFeeAttempt(etx EthTx, fee gas.DynamicFee, gasLimit uint32) (attempt EthTxAttempt, err error)
func (*ChainKeyStore) NewLegacyAttempt ¶
func (c *ChainKeyStore) NewLegacyAttempt(etx EthTx, gasPrice *assets.Wei, gasLimit uint32) (attempt EthTxAttempt, err error)
type CheckerFactory ¶
CheckerFactory is a real implementation of TransmitCheckerFactory.
func (*CheckerFactory) BuildChecker ¶
func (c *CheckerFactory) BuildChecker(spec TransmitCheckerSpec) (TransmitChecker, error)
BuildChecker satisfies the TransmitCheckerFactory interface.
type Config ¶
type Config interface { gas.Config pg.QConfig EthTxReaperInterval() time.Duration EthTxReaperThreshold() time.Duration EthTxResendAfterThreshold() time.Duration EvmGasBumpThreshold() uint64 EvmGasBumpTxDepth() uint16 EvmGasLimitDefault() uint32 EvmMaxInFlightTransactions() uint32 EvmMaxQueuedTransactions() uint64 EvmNonceAutoSync() bool EvmUseForwarders() bool EvmRPCDefaultBatchSize() uint32 KeySpecificMaxGasPriceWei(addr common.Address) *assets.Wei TriggerFallbackDBPollInterval() time.Duration }
Config encompasses config used by txmgr package Unless otherwise specified, these should support changing at runtime
type DropOldestStrategy ¶
type DropOldestStrategy struct {
// contains filtered or unexported fields
}
DropOldestStrategy will send the newest N transactions, older ones will be removed from the queue
func NewDropOldestStrategy ¶
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy
NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded.
func (DropOldestStrategy) PruneQueue ¶
func (s DropOldestStrategy) PruneQueue(q pg.Queryer) (n int64, err error)
func (DropOldestStrategy) Subject ¶
func (s DropOldestStrategy) Subject() uuid.NullUUID
type EthBroadcaster ¶
type EthBroadcaster struct { ChainKeyStore utils.StartStopOnce // contains filtered or unexported fields }
EthBroadcaster monitors eth_txes for transactions that need to be broadcast, assigns nonces and ensures that at least one eth node somewhere has received the transaction successfully.
This does not guarantee delivery! A whole host of other things can subsequently go wrong such as transactions being evicted from the mempool, eth nodes going offline etc. Responsibility for ensuring eventual inclusion into the chain falls on the shoulders of the ethConfirmer.
What EthBroadcaster does guarantee is: - a monotonic series of increasing nonces for eth_txes that can all eventually be confirmed if you retry enough times - transition of eth_txes out of unstarted into either fatal_error or unconfirmed - existence of a saved eth_tx_attempt
func NewEthBroadcaster ¶
func NewEthBroadcaster(db *sqlx.DB, ethClient evmclient.Client, config Config, keystore KeyStore, eventBroadcaster pg.EventBroadcaster, keyStates []ethkey.State, estimator gas.Estimator, resumeCallback ResumeCallback, logger logger.Logger, checkerFactory TransmitCheckerFactory) *EthBroadcaster
NewEthBroadcaster returns a new concrete EthBroadcaster
func (*EthBroadcaster) Close ¶
func (eb *EthBroadcaster) Close() error
Close closes the EthBroadcaster
func (*EthBroadcaster) ProcessUnstartedEthTxs ¶
func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, keyState ethkey.State) (err error, retryable bool)
ProcessUnstartedEthTxs picks up and handles all eth_txes in the queue revive:disable:error-return
func (*EthBroadcaster) Start ¶
func (eb *EthBroadcaster) Start(ctx context.Context) error
Start starts EthBroadcaster service. The provided context can be used to terminate Start sequence.
func (*EthBroadcaster) Trigger ¶
func (eb *EthBroadcaster) Trigger(addr gethCommon.Address)
Trigger forces the monitor for a particular address to recheck for new eth_txes Logs error and does nothing if address was not registered on startup
type EthConfirmer ¶
type EthConfirmer struct { utils.StartStopOnce ChainKeyStore // contains filtered or unexported fields }
EthConfirmer is a broad service which performs four different tasks in sequence on every new longest chain Step 1: Mark that all currently pending transaction attempts were broadcast before this block Step 2: Check pending transactions for receipts Step 3: See if any transactions have exceeded the gas bumping block threshold and, if so, bump them Step 4: Check confirmed transactions to make sure they are still in the longest chain (reorg protection)
func NewEthConfirmer ¶
func NewEthConfirmer(db *sqlx.DB, ethClient evmclient.Client, config Config, keystore KeyStore, keyStates []ethkey.State, estimator gas.Estimator, resumeCallback ResumeCallback, lggr logger.Logger) *EthConfirmer
NewEthConfirmer instantiates a new eth confirmer
func (*EthConfirmer) CheckConfirmedMissingReceipt ¶
func (ec *EthConfirmer) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
CheckConfirmedMissingReceipt will attempt to re-send any transaction in the state of "confirmed_missing_receipt". If we get back any type of senderror other than "nonce too low" it means that this transaction isn't actually confirmed and needs to be put back into "unconfirmed" state, so that it can enter the gas bumping cycle. This is necessary in rare cases (e.g. Polygon) where network conditions are extremely hostile.
For example, assume the following scenario:
0. We are connected to multiple primary nodes via load balancer 1. We send a transaction, it is confirmed and, we get a receipt 2. A new head comes in from RPC node 1 indicating that this transaction was re-org'd, so we put it back into unconfirmed state 3. We re-send that transaction to a RPC node 2 **which hasn't caught up to this re-org yet** 4. RPC node 2 still has an old view of the chain, so it returns us "nonce too low" indicating "no problem this transaction is already mined" 5. Now the transaction is marked "confirmed_missing_receipt" but the latest chain does not actually include it 6. Now we are reliant on the EthResender to propagate it, and this transaction will not be gas bumped, so in the event of gas spikes it could languish or even be evicted from the mempool and hold up the queue 7. Even if/when RPC node 2 catches up, the transaction is still stuck in state "confirmed_missing_receipt"
This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.
func (*EthConfirmer) CheckForReceipts ¶
func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error
CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (*EthConfirmer) Close ¶
func (ec *EthConfirmer) Close() error
Close is a comment to appease the linter
func (*EthConfirmer) EnsureConfirmedTransactionsInLongestChain ¶
func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head *evmtypes.Head) error
EnsureConfirmedTransactionsInLongestChain finds all confirmed eth_txes up to the depth of the given chain and ensures that every one has a receipt with a block hash that is in the given chain.
If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.
func (*EthConfirmer) ForceRebroadcast ¶
func (ec *EthConfirmer) ForceRebroadcast(beginningNonce uint, endingNonce uint, gasPriceWei uint64, address gethCommon.Address, overrideGasLimit uint32) error
ForceRebroadcast sends a transaction for every nonce in the given nonce range at the given gas price. If an eth_tx exists for this nonce, we re-send the existing eth_tx with the supplied parameters. If an eth_tx doesn't exist for this nonce, we send a zero transaction. This operates completely orthogonal to the normal EthConfirmer and can result in untracked attempts! Only for emergency usage. This is in case of some unforeseen scenario where the node is refusing to release the lock. KISS.
func (*EthConfirmer) ProcessHead ¶
ProcessHead takes all required transactions for the confirmer on a new head
func (*EthConfirmer) RebroadcastWhereNecessary ¶
func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
RebroadcastWhereNecessary bumps gas or resends transactions that were previously out-of-eth
func (*EthConfirmer) ResumePendingTaskRuns ¶
ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (*EthConfirmer) SetBroadcastBeforeBlockNum ¶
func (ec *EthConfirmer) SetBroadcastBeforeBlockNum(blockNum int64) error
SetBroadcastBeforeBlockNum updates already broadcast attempts with the current block number. This is safe no matter how old the head is because if the attempt is already broadcast it _must_ have been before this head.
type EthReceipt ¶
type EthResender ¶
type EthResender struct {
// contains filtered or unexported fields
}
EthResender periodically picks up transactions that have been languishing unconfirmed for a configured amount of time without being sent, and sends their highest priced attempt again. This helps to defend against geth/parity silently dropping txes, or txes being ejected from the mempool.
Previously we relied on the bumper to do this for us implicitly but there can occasionally be problems with this (e.g. abnormally long block times, or if gas bumping is disabled)
func NewEthResender ¶
func NewEthResender(lggr logger.Logger, db *sqlx.DB, ethClient evmclient.Client, ks KeyStore, pollInterval time.Duration, config Config) *EthResender
NewEthResender creates a new concrete EthResender
func (*EthResender) Start ¶
func (er *EthResender) Start()
Start is a comment which satisfies the linter
func (*EthResender) Stop ¶
func (er *EthResender) Stop()
Stop is a comment which satisfies the linter
type EthTx ¶
type EthTx struct { ID int64 Nonce *int64 FromAddress common.Address ToAddress common.Address EncodedPayload []byte Value assets.Eth // GasLimit on the EthTx is always the conceptual gas limit, which is not // necessarily the same as the on-chain encoded value (i.e. Optimism) GasLimit uint32 Error null.String // BroadcastAt is updated every time an attempt for this eth_tx is re-sent // In almost all cases it will be within a second or so of the actual send time. BroadcastAt *time.Time // InitialBroadcastAt is recorded once, the first ever time this eth_tx is sent InitialBroadcastAt *time.Time CreatedAt time.Time State EthTxState EthTxAttempts []EthTxAttempt `json:"-"` // Marshalled EthTxMeta // Used for additional context around transactions which you want to log // at send time. Meta *datatypes.JSON Subject uuid.NullUUID EVMChainID utils.Big PipelineTaskRunID uuid.NullUUID MinConfirmations cnull.Uint32 // AccessList is optional and only has an effect on DynamicFee transactions // on chains that support it (e.g. Ethereum Mainnet after London hard fork) AccessList NullableEIP2930AccessList // TransmitChecker defines the check that should be performed before a transaction is submitted on // chain. TransmitChecker *datatypes.JSON }
func FindEthTxsRequiringGasBump ¶
func FindEthTxsRequiringGasBump(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, blockNum, gasBumpThreshold, depth int64, chainID big.Int) (etxs []*EthTx, err error)
FindEthTxsRequiringGasBump returns transactions that have all attempts which are unconfirmed for at least gasBumpThreshold blocks, limited by limit pending transactions
It also returns eth_txes that are unconfirmed with no eth_tx_attempts
func FindEthTxsRequiringRebroadcast ¶
func FindEthTxsRequiringRebroadcast(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID big.Int) (etxs []*EthTx, err error)
FindEthTxsRequiringRebroadcast returns attempts that hit insufficient eth, and attempts that need bumping, in nonce ASC order
func FindEthTxsRequiringResubmissionDueToInsufficientEth ¶
func FindEthTxsRequiringResubmissionDueToInsufficientEth(ctx context.Context, q pg.Q, lggr logger.Logger, address gethCommon.Address, chainID big.Int) (etxs []*EthTx, err error)
FindEthTxsRequiringResubmissionDueToInsufficientEth returns transactions that need to be re-sent because they hit an out-of-eth error on a previous block
func (EthTx) GetChecker ¶
func (e EthTx) GetChecker() (TransmitCheckerSpec, error)
GetChecker returns an EthTx's transmit checker spec in struct form, unmarshalling it from JSON first.
type EthTxAttempt ¶
type EthTxAttempt struct { ID int64 EthTxID int64 EthTx EthTx // GasPrice applies to LegacyTx GasPrice *assets.Wei // GasTipCap and GasFeeCap are used instead for DynamicFeeTx GasTipCap *assets.Wei GasFeeCap *assets.Wei // ChainSpecificGasLimit on the EthTxAttempt is always the same as the on-chain encoded value for gas limit ChainSpecificGasLimit uint32 SignedRawTx []byte Hash common.Hash CreatedAt time.Time BroadcastBeforeBlockNum *int64 State EthTxAttemptState EthReceipts []EthReceipt `json:"-"` TxType int }
func FindEthTxAttemptsRequiringResend ¶
func FindEthTxAttemptsRequiringResend(db *sqlx.DB, olderThan time.Time, maxInFlightTransactions uint32, chainID big.Int, address common.Address) (attempts []EthTxAttempt, err error)
FindEthTxAttemptsRequiringResend returns the highest priced attempt for each eth_tx that was last sent before or at the given time (up to limit)
func (EthTxAttempt) DynamicFee ¶
func (a EthTxAttempt) DynamicFee() gas.DynamicFee
func (EthTxAttempt) GetBroadcastBeforeBlockNum ¶ added in v1.10.0
func (a EthTxAttempt) GetBroadcastBeforeBlockNum() *int64
func (EthTxAttempt) GetChainSpecificGasLimit ¶ added in v1.10.0
func (a EthTxAttempt) GetChainSpecificGasLimit() uint32
func (EthTxAttempt) GetGasPrice ¶ added in v1.10.0
func (a EthTxAttempt) GetGasPrice() *assets.Wei
func (EthTxAttempt) GetHash ¶ added in v1.10.0
func (a EthTxAttempt) GetHash() common.Hash
func (EthTxAttempt) GetSignedTx ¶
func (a EthTxAttempt) GetSignedTx() (*types.Transaction, error)
GetSignedTx decodes the SignedRawTx into a types.Transaction struct
func (EthTxAttempt) GetTxType ¶ added in v1.10.0
func (a EthTxAttempt) GetTxType() int
type EthTxAttemptState ¶
type EthTxAttemptState string
type EthTxMeta ¶
type EthTxMeta struct { JobID *int32 `json:"JobID,omitempty"` // Pipeline fields FailOnRevert null.Bool `json:"FailOnRevert,omitempty"` // VRF-only fields RequestID *common.Hash `json:"RequestID,omitempty"` RequestTxHash *common.Hash `json:"RequestTxHash,omitempty"` // Batch variants of the above RequestIDs []common.Hash `json:"RequestIDs,omitempty"` RequestTxHashes []common.Hash `json:"RequestTxHashes,omitempty"` // Used for the VRFv2 - max link this tx will bill // should it get bumped MaxLink *string `json:"MaxLink,omitempty"` // Used for the VRFv2 - the subscription ID of the // requester of the VRF. SubID *uint64 `json:"SubId,omitempty"` // Used for keepers UpkeepID *string `json:"UpkeepID,omitempty"` // Used only for forwarded txs, tracks the original destination address. // When this is set, it indicates tx is forwarded through To address. FwdrDestAddress *common.Address `json:"ForwarderDestAddress,omitempty"` }
EthTxMeta contains fields of the transaction metadata Not all fields are guaranteed to be present
type EthTxState ¶
type EthTxState string
type KeyStore ¶
type KeyStore interface { CheckEnabled(address common.Address, chainID *big.Int) error EnabledKeysForChain(chainID *big.Int) (keys []ethkey.KeyV2, err error) GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) GetStatesForChain(chainID *big.Int) ([]ethkey.State, error) IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error SignTx(fromAddress common.Address, tx *gethTypes.Transaction, chainID *big.Int) (*gethTypes.Transaction, error) SubscribeToKeyChanges() (ch chan struct{}, unsub func()) }
KeyStore encompasses the subset of keystore used by txmgr
type NSinserttx ¶
type NSinserttx struct { Etx EthTx Attempt EthTxAttempt }
NSinserttx represents an EthTx and Attempt to be inserted together
type NewTx ¶
type NewTx struct { FromAddress common.Address ToAddress common.Address EncodedPayload []byte GasLimit uint32 Meta *EthTxMeta ForwarderAddress common.Address // Pipeline variables - if you aren't calling this from ethtx task within // the pipeline, you don't need these variables MinConfirmations null.Uint32 PipelineTaskRunID *uuid.UUID Strategy TxStrategy // Checker defines the check that should be run before a transaction is submitted on chain. Checker TransmitCheckerSpec }
type NonceSyncer ¶
type NonceSyncer struct {
// contains filtered or unexported fields
}
NonceSyncer manages the delicate task of syncing the local nonce with the chain nonce in case of divergence.
On startup, we check each key for the nonce value on chain and compare it to our local value.
Usually the on-chain nonce will be the same as (or lower than) the next_nonce in the DB, in which case we do nothing.
If we are restoring from a backup however, or another wallet has used the account, the chain nonce might be higher than our local one. In this scenario, we must fastforward the local nonce to match the chain nonce.
The problem with doing this is that now Chainlink does not have any ownership or control over potentially pending transactions with nonces between our local highest nonce and the chain nonce. If one of those transactions is pushed out of the mempool or re-org'd out of the chain, we run the risk of being stuck with a gap in the nonce sequence that will never be filled.
The solution is to query the chain for our own transactions and take ownership of them by writing them to the database and letting the EthConfirmer handle them as it would any other transaction.
This is not quite as straightforward as one might expect. We cannot query transactions from our account to infinite depth (geth does not support this). The best we can do is to query for all transactions sent within the past ETH_FINALITY_DEPTH blocks and find the ones sent by our address(es).
This gives us re-org protection up to ETH_FINALITY_DEPTH deep in the worst case, which is in line with our other guarantees.
func NewNonceSyncer ¶
func NewNonceSyncer(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, ethClient evmclient.Client, kst NonceSyncerKeyStore) *NonceSyncer
NewNonceSyncer returns a new syncer
type NonceSyncerKeyStore ¶ added in v1.8.0
type NullTxManager ¶
type NullTxManager struct {
ErrMsg string
}
func (*NullTxManager) Close ¶
func (n *NullTxManager) Close() error
Close does noop for NullTxManager.
func (*NullTxManager) CreateEthTransaction ¶
func (*NullTxManager) GetForwarderForEOA ¶ added in v1.9.0
func (*NullTxManager) GetGasEstimator ¶
func (n *NullTxManager) GetGasEstimator() gas.Estimator
func (*NullTxManager) Healthy ¶
func (n *NullTxManager) Healthy() error
func (*NullTxManager) OnNewLongestChain ¶
func (n *NullTxManager) OnNewLongestChain(context.Context, *evmtypes.Head)
func (*NullTxManager) Ready ¶
func (n *NullTxManager) Ready() error
func (*NullTxManager) RegisterResumeCallback ¶
func (n *NullTxManager) RegisterResumeCallback(fn ResumeCallback)
func (*NullTxManager) Reset ¶ added in v1.9.0
func (n *NullTxManager) Reset(f func(), addr common.Address, abandon bool) error
func (*NullTxManager) SendEther ¶
func (n *NullTxManager) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)
SendEther does nothing, null functionality
func (*NullTxManager) Start ¶
func (n *NullTxManager) Start(context.Context) error
Start does noop for NullTxManager.
func (*NullTxManager) Trigger ¶
func (n *NullTxManager) Trigger(common.Address)
Trigger does noop for NullTxManager.
type NullableEIP2930AccessList ¶
type NullableEIP2930AccessList struct { AccessList types.AccessList Valid bool }
func NullableEIP2930AccessListFrom ¶
func NullableEIP2930AccessListFrom(al types.AccessList) (n NullableEIP2930AccessList)
func (NullableEIP2930AccessList) MarshalJSON ¶
func (e NullableEIP2930AccessList) MarshalJSON() ([]byte, error)
func (*NullableEIP2930AccessList) Scan ¶
func (e *NullableEIP2930AccessList) Scan(value interface{}) error
Scan returns the selector from its serialization in the database
func (*NullableEIP2930AccessList) UnmarshalJSON ¶
func (e *NullableEIP2930AccessList) UnmarshalJSON(input []byte) error
type ORM ¶
type ORM interface { EthTransactions(offset, limit int) ([]EthTx, int, error) EthTransactionsWithAttempts(offset, limit int) ([]EthTx, int, error) EthTxAttempts(offset, limit int) ([]EthTxAttempt, int, error) FindEthTxAttempt(hash common.Hash) (*EthTxAttempt, error) FindEthTxAttemptsByEthTxIDs(ids []int64) ([]EthTxAttempt, error) FindEthTxByHash(hash common.Hash) (*EthTx, error) InsertEthTxAttempt(attempt *EthTxAttempt) error InsertEthTx(etx *EthTx) error InsertEthReceipt(receipt *EthReceipt) error FindEthTxWithAttempts(etxID int64) (etx EthTx, err error) }
type Reaper ¶
type Reaper struct {
// contains filtered or unexported fields
}
Reaper handles periodic database cleanup for Txm
func (*Reaper) ReapEthTxes ¶
ReapEthTxes deletes old eth_txes
func (*Reaper) SetLatestBlockNum ¶
SetLatestBlockNum should be called on every new highest block number
type ReaperConfig ¶
type ReaperConfig interface { EthTxReaperInterval() time.Duration EthTxReaperThreshold() time.Duration EvmFinalityDepth() uint32 }
ReaperConfig is the config subset used by the reaper
type ResumeCallback ¶
ResumeCallback is assumed to be idempotent
type SendEveryStrategy ¶
type SendEveryStrategy struct{}
SendEveryStrategy will always send the tx
func (SendEveryStrategy) PruneQueue ¶
func (SendEveryStrategy) PruneQueue(pg.Queryer) (int64, error)
func (SendEveryStrategy) Subject ¶
func (SendEveryStrategy) Subject() uuid.NullUUID
type SimulateChecker ¶
SimulateChecker simulates transactions, producing an error if they revert on chain.
func (*SimulateChecker) Check ¶
func (s *SimulateChecker) Check( ctx context.Context, l logger.Logger, tx EthTx, a EthTxAttempt, ) error
Check satisfies the TransmitChecker interface.
type TransmitChecker ¶
type TransmitChecker interface { // Check the given transaction. If the transaction should not be sent, an error indicating why // is returned. Errors should only be returned if the checker can confirm that a transaction // should not be sent, other errors (for example connection or other unexpected errors) should // be logged and swallowed. Check(ctx context.Context, l logger.Logger, tx EthTx, a EthTxAttempt) error }
TransmitChecker determines whether a transaction should be submitted on-chain.
var ( // NoChecker is a TransmitChecker that always determines a transaction should be submitted. NoChecker TransmitChecker = noChecker{} )
type TransmitCheckerFactory ¶
type TransmitCheckerFactory interface { // BuildChecker builds a new TransmitChecker based on the given spec. BuildChecker(spec TransmitCheckerSpec) (TransmitChecker, error) }
TransmitCheckerFactory creates a transmit checker based on a spec.
type TransmitCheckerSpec ¶
type TransmitCheckerSpec struct { // CheckerType is the type of check that should be performed. Empty indicates no check. CheckerType TransmitCheckerType `json:",omitempty"` // VRFCoordinatorAddress is the address of the VRF coordinator that should be used to perform // VRF transmit checks. This should be set iff CheckerType is TransmitCheckerTypeVRFV2. VRFCoordinatorAddress *common.Address `json:",omitempty"` // VRFRequestBlockNumber is the block number in which the provided VRF request has been made. // This should be set iff CheckerType is TransmitCheckerTypeVRFV2. VRFRequestBlockNumber *big.Int `json:",omitempty"` }
TransmitCheckerSpec defines the check that should be performed before a transaction is submitted on chain.
type TransmitCheckerType ¶
type TransmitCheckerType string
TransmitCheckerType describes the type of check that should be performed before a transaction is executed on-chain.
type TxManager ¶
type TxManager interface { httypes.HeadTrackable services.ServiceCtx Trigger(addr common.Address) CreateEthTransaction(newTx NewTx, qopts ...pg.QOpt) (etx EthTx, err error) GetForwarderForEOA(eoa common.Address) (forwarder common.Address, err error) GetGasEstimator() gas.Estimator RegisterResumeCallback(fn ResumeCallback) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error) Reset(f func(), addr common.Address, abandon bool) error }
type TxStrategy ¶
type TxStrategy interface { // Subject will be saved to eth_txes.subject if not null Subject() uuid.NullUUID // PruneQueue is called after eth_tx insertion PruneQueue(q pg.Queryer) (n int64, err error) }
TxStrategy controls how txes are queued and sent
func NewQueueingTxStrategy ¶
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy TxStrategy)
NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.
func NewSendEveryStrategy ¶
func NewSendEveryStrategy() TxStrategy
NewSendEveryStrategy creates a new TxStrategy that does not drop transactions.
type Txm ¶
type Txm struct { utils.StartStopOnce // contains filtered or unexported fields }
func NewTxm ¶
func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore KeyStore, eventBroadcaster pg.EventBroadcaster, lggr logger.Logger, checkerFactory TransmitCheckerFactory, logPoller logpoller.LogPoller) *Txm
NewTxm creates a new Txm with the given configuration.
func (*Txm) CreateEthTransaction ¶
CreateEthTransaction inserts a new transaction
func (*Txm) GetForwarderForEOA ¶ added in v1.9.0
Calls forwarderMgr to get a proper forwarder for a given EOA.
func (*Txm) GetGasEstimator ¶
GetGasEstimator returns the gas estimator, mostly useful for tests
func (*Txm) OnNewLongestChain ¶
OnNewLongestChain conforms to HeadTrackable
func (*Txm) RegisterResumeCallback ¶
func (b *Txm) RegisterResumeCallback(fn ResumeCallback)
func (*Txm) Reset ¶ added in v1.9.0
Reset stops EthBroadcaster/EthConfirmer, executes callback, then starts them again
func (*Txm) SendEther ¶
func (b *Txm) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)
SendEther creates a transaction that transfers the given value of ether
type VRFV1Checker ¶
type VRFV1Checker struct { // Callbacks checks whether a VRF V1 request has already been fulfilled on the VRFCoordinator // Solidity contract Callbacks func(opts *bind.CallOpts, reqID [32]byte) (v1.Callbacks, error) }
VRFV1Checker is an implementation of TransmitChecker that checks whether a VRF V1 fulfillment has already been fulfilled.
func (*VRFV1Checker) Check ¶
func (v *VRFV1Checker) Check( ctx context.Context, l logger.Logger, tx EthTx, _ EthTxAttempt, ) error
Check satisfies the TransmitChecker interface.
type VRFV2Checker ¶
type VRFV2Checker struct { // GetCommitment checks whether a VRF V2 request has been fulfilled on the VRFCoordinatorV2 // Solidity contract. GetCommitment func(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error) // HeaderByNumber fetches the header given the number. If nil is provided, // the latest header is fetched. HeaderByNumber func(ctx context.Context, n *big.Int) (*gethtypes.Header, error) // RequestBlockNumber is the block number of the VRFV2 request. RequestBlockNumber *big.Int }
VRFV2Checker is an implementation of TransmitChecker that checks whether a VRF V2 fulfillment has already been fulfilled.
func (*VRFV2Checker) Check ¶
func (v *VRFV2Checker) Check( ctx context.Context, l logger.Logger, tx EthTx, _ EthTxAttempt, ) error
Check satisfies the TransmitChecker interface.