Documentation ¶
Index ¶
- Constants
- Variables
- func DbEthTxAttemptToEthTxAttempt(dbEthTxAttempt DbEthTxAttempt, evmAttempt *EvmTxAttempt)
- func DbEthTxToEthTx(dbEthTx DbEthTx, evmEthTx *EvmTx)
- func DbReceiptFromEvmReceipt(evmReceipt *EvmReceipt) dbReceipt
- func GetGethSignedTx(signedRawTx []byte) (*types.Transaction, error)
- func NewEvmTxAttemptBuilder(chainID big.Int, config Config, keystore TxAttemptSigner[common.Address], ...) *evmTxAttemptBuilder
- func NewEvmTxmClient(c evmclient.Client) *evmTxmClient
- func NewEvmTxmConfig(c Config) *evmTxmConfig
- func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy)
- func NewSendEveryStrategy() txmgrtypes.TxStrategy
- type Broadcaster
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close() error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport() map[string]error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name() string
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback(callback ResumeCallback)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start(_ context.Context) error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SyncNonce(ctx context.Context, addr ADDR)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Trigger(addr ADDR)
- type CheckerFactory
- type Config
- type Confirmer
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckForReceipts(ctx context.Context, blockNum int64) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close() error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) FindEthTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, ...) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ...)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ForceRebroadcast(seqs []SEQ, fee FEE, address ADDR, overrideGasLimit uint32) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport() map[string]error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name() string
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessHead(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ResumePendingTaskRuns(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback(callback ResumeCallback)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start(_ context.Context) error
- type DbEthTx
- type DbEthTxAttempt
- type DropOldestStrategy
- type EthTxMeta
- type EvmAccessList
- type EvmBroadcaster
- type EvmBroadcasterConfig
- type EvmConfirmer
- type EvmConfirmerConfig
- type EvmFwdMgr
- type EvmKeyStore
- type EvmNewTx
- type EvmNonceSyncer
- type EvmPriorAttempt
- type EvmReaper
- type EvmReaperConfig
- type EvmReceipt
- type EvmReceiptPlus
- type EvmResender
- type EvmResenderConfig
- type EvmTransmitChecker
- type EvmTransmitCheckerFactory
- type EvmTransmitCheckerSpec
- type EvmTx
- type EvmTxAttempt
- type EvmTxAttemptBuilder
- type EvmTxManager
- type EvmTxStore
- type EvmTxm
- type EvmTxmClient
- type EvmTxmConfig
- type NonceSyncer
- type NullEvmTxManager
- type NullTxManager
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Close() error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) CreateEthTransaction(txmgrtypes.NewTx[ADDR, TX_HASH], ...pg.QOpt) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ...)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) HealthReport() map[string]error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Name() string
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) OnNewLongestChain(context.Context, *evmtypes.Head)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Ready() error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) RegisterResumeCallback(fn ResumeCallback)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Reset(f func(), addr ADDR, abandon bool) error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) SendEther(chainID *big.Int, from, to ADDR, value assets.Eth, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ...)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Start(context.Context) error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Trigger(ADDR)
- type ProcessUnstartedTxs
- type Reaper
- type Resender
- type ResumeCallback
- type SendEveryStrategy
- type SimulateChecker
- type TransmitChecker
- type TransmitCheckerFactory
- type TxAttemptSigner
- type TxManager
- type Txm
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close() (merr error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CreateEthTransaction(newTx txmgrtypes.NewTx[ADDR, TX_HASH], qs ...pg.QOpt) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ...)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport() map[string]error
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name() string
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) OnNewLongestChain(ctx context.Context, head HEAD)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RegisterResumeCallback(fn ResumeCallback)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Reset(callback func(), addr ADDR, abandon bool) (err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SendEther(chainID CHAIN_ID, from, to ADDR, value assets.Eth, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ...)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start(ctx context.Context) (merr error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Trigger(addr ADDR)
- type VRFV1Checker
- type VRFV2Checker
Constants ¶
const ( // InFlightTransactionRecheckInterval controls how often the Broadcaster // will poll the unconfirmed queue to see if it is allowed to send another // transaction InFlightTransactionRecheckInterval = 1 * time.Second // TransmitCheckTimeout controls the maximum amount of time that will be // spent on the transmit check. TransmitCheckTimeout = 2 * time.Second )
const ( // TODO: change Eth prefix: https://smartcontract-it.atlassian.net/browse/BCI-1198 EthTxUnstarted = txmgrtypes.TxState("unstarted") EthTxInProgress = txmgrtypes.TxState("in_progress") EthTxFatalError = txmgrtypes.TxState("fatal_error") EthTxUnconfirmed = txmgrtypes.TxState("unconfirmed") EthTxConfirmed = txmgrtypes.TxState("confirmed") EthTxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt") // TransmitCheckerTypeSimulate is a checker that simulates the transaction before executing on // chain. TransmitCheckerTypeSimulate = txmgrtypes.TransmitCheckerType("simulate") // TransmitCheckerTypeVRFV1 is a checker that will not submit VRF V1 fulfillment requests that // have already been fulfilled. This could happen if the request was fulfilled by another node. TransmitCheckerTypeVRFV1 = txmgrtypes.TransmitCheckerType("vrf_v1") // TransmitCheckerTypeVRFV2 is a checker that will not submit VRF V2 fulfillment requests that // have already been fulfilled. This could happen if the request was fulfilled by another node. TransmitCheckerTypeVRFV2 = txmgrtypes.TransmitCheckerType("vrf_v2") )
const DefaultResenderPollInterval = 5 * time.Second
pollInterval is the maximum amount of time in addition to EthTxResendAfterThreshold that we will wait before resending an attempt
Variables ¶
var ( // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt // This most likely happened because an external wallet used the account for this nonce ErrCouldNotGetReceipt = "could not get receipt" )
var ErrInvalidQOpt = errors.New("evmTxStore: Invalid QOpt")
var ErrKeyNotUpdated = errors.New("evmTxStore: Key not updated")
Functions ¶
func DbEthTxAttemptToEthTxAttempt ¶ added in v2.1.0
func DbEthTxAttemptToEthTxAttempt(dbEthTxAttempt DbEthTxAttempt, evmAttempt *EvmTxAttempt)
func DbEthTxToEthTx ¶ added in v2.1.0
func DbReceiptFromEvmReceipt ¶ added in v2.1.0
func DbReceiptFromEvmReceipt(evmReceipt *EvmReceipt) dbReceipt
func GetGethSignedTx ¶ added in v2.2.0
func GetGethSignedTx(signedRawTx []byte) (*types.Transaction, error)
GetGethSignedTx decodes the SignedRawTx into a types.Transaction struct
func NewEvmTxAttemptBuilder ¶
func NewEvmTxAttemptBuilder(chainID big.Int, config Config, keystore TxAttemptSigner[common.Address], estimator gas.EvmFeeEstimator) *evmTxAttemptBuilder
func NewEvmTxmClient ¶ added in v2.2.0
func NewEvmTxmConfig ¶ added in v2.1.0
func NewEvmTxmConfig(c Config) *evmTxmConfig
func NewQueueingTxStrategy ¶
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy)
NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.
func NewSendEveryStrategy ¶
func NewSendEveryStrategy() txmgrtypes.TxStrategy
NewSendEveryStrategy creates a new TxStrategy that does not drop transactions.
Types ¶
type Broadcaster ¶ added in v2.2.0
type Broadcaster[ CHAIN_ID txmgrtypes.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, FEE_UNIT txmgrtypes.Unit, ] struct { txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD] utils.StartStopOnce // contains filtered or unexported fields }
Broadcaster monitors eth_txes for transactions that need to be broadcast, assigns nonces and ensures that at least one eth node somewhere has received the transaction successfully.
This does not guarantee delivery! A whole host of other things can subsequently go wrong such as transactions being evicted from the mempool, eth nodes going offline etc. Responsibility for ensuring eventual inclusion into the chain falls on the shoulders of the ethConfirmer.
What Broadcaster does guarantee is: - a monotonic series of increasing nonces for eth_txes that can all eventually be confirmed if you retry enough times - transition of eth_txes out of unstarted into either fatal_error or unconfirmed - existence of a saved eth_tx_attempt
func NewBroadcaster ¶ added in v2.2.0
func NewBroadcaster[ CHAIN_ID txmgrtypes.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, FEE_UNIT txmgrtypes.Unit, ]( txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], config txmgrtypes.BroadcasterConfig[FEE_UNIT], keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], eventBroadcaster pg.EventBroadcaster, txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], nonceSyncer NonceSyncer[ADDR, TX_HASH, BLOCK_HASH], logger logger.Logger, checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], autoSyncNonce bool, parseAddress func(string) (ADDR, error), ) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close() error
Close closes the Broadcaster
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport() map[string]error
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name() string
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessUnstartedTxs ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error)
ProcessUnstartedTxs picks up and handles all eth_txes in the queue revive:disable:error-return
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback(callback ResumeCallback)
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start(_ context.Context) error
Start starts Broadcaster service. The provided context can be used to terminate Start sequence.
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SyncNonce ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SyncNonce(ctx context.Context, addr ADDR)
syncNonce tries to sync the key nonce, retrying indefinitely until success
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Trigger ¶ added in v2.2.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Trigger(addr ADDR)
Trigger forces the monitor for a particular address to recheck for new eth_txes Logs error and does nothing if address was not registered on startup
type CheckerFactory ¶
CheckerFactory is a real implementation of TransmitCheckerFactory.
func (*CheckerFactory) BuildChecker ¶
func (c *CheckerFactory) BuildChecker(spec EvmTransmitCheckerSpec) (EvmTransmitChecker, error)
BuildChecker satisfies the TransmitCheckerFactory interface.
type Config ¶
type Config interface { gas.Config pg.QConfig EthTxReaperInterval() time.Duration EthTxReaperThreshold() time.Duration EthTxResendAfterThreshold() time.Duration EvmGasBumpThreshold() uint64 EvmGasBumpTxDepth() uint32 EvmGasLimitDefault() uint32 EvmMaxInFlightTransactions() uint32 EvmMaxQueuedTransactions() uint64 EvmNonceAutoSync() bool EvmUseForwarders() bool EvmRPCDefaultBatchSize() uint32 KeySpecificMaxGasPriceWei(addr common.Address) *assets.Wei TriggerFallbackDBPollInterval() time.Duration }
Config encompasses config used by txmgr package Unless otherwise specified, these should support changing at runtime
type Confirmer ¶ added in v2.2.0
type Confirmer[ CHAIN_ID txmgrtypes.ID, HEAD commontypes.Head[BLOCK_HASH], ADDR commontypes.Hashable, TX_HASH commontypes.Hashable, BLOCK_HASH commontypes.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, FEE_UNIT txmgrtypes.Unit, ] struct { utils.StartStopOnce txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD] // contains filtered or unexported fields }
Confirmer is a broad service which performs four different tasks in sequence on every new longest chain Step 1: Mark that all currently pending transaction attempts were broadcast before this block Step 2: Check pending transactions for receipts Step 3: See if any transactions have exceeded the gas bumping block threshold and, if so, bump them Step 4: Check confirmed transactions to make sure they are still in the longest chain (reorg protection)
func NewConfirmer ¶ added in v2.2.0
func NewConfirmer[ CHAIN_ID txmgrtypes.ID, HEAD commontypes.Head[BLOCK_HASH], ADDR commontypes.Hashable, TX_HASH commontypes.Hashable, BLOCK_HASH commontypes.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, FEE_UNIT txmgrtypes.Unit, ]( txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], config txmgrtypes.ConfirmerConfig[FEE_UNIT], keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], lggr logger.Logger, isReceiptNil func(R) bool, ) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckConfirmedMissingReceipt ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
CheckConfirmedMissingReceipt will attempt to re-send any transaction in the state of "confirmed_missing_receipt". If we get back any type of senderror other than "nonce too low" it means that this transaction isn't actually confirmed and needs to be put back into "unconfirmed" state, so that it can enter the gas bumping cycle. This is necessary in rare cases (e.g. Polygon) where network conditions are extremely hostile.
For example, assume the following scenario:
0. We are connected to multiple primary nodes via load balancer 1. We send a transaction, it is confirmed and, we get a receipt 2. A new head comes in from RPC node 1 indicating that this transaction was re-org'd, so we put it back into unconfirmed state 3. We re-send that transaction to a RPC node 2 **which hasn't caught up to this re-org yet** 4. RPC node 2 still has an old view of the chain, so it returns us "nonce too low" indicating "no problem this transaction is already mined" 5. Now the transaction is marked "confirmed_missing_receipt" but the latest chain does not actually include it 6. Now we are reliant on the EthResender to propagate it, and this transaction will not be gas bumped, so in the event of gas spikes it could languish or even be evicted from the mempool and hold up the queue 7. Even if/when RPC node 2 catches up, the transaction is still stuck in state "confirmed_missing_receipt"
This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckForReceipts ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CheckForReceipts(ctx context.Context, blockNum int64) error
CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Close() error
Close is a comment to appease the linter
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) EnsureConfirmedTransactionsInLongestChain ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
EnsureConfirmedTransactionsInLongestChain finds all confirmed eth_txes up to the depth of the given chain and ensures that every one has a receipt with a block hash that is in the given chain.
If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) FindEthTxsRequiringRebroadcast ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) FindEthTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error)
FindEthTxsRequiringRebroadcast returns attempts that hit insufficient eth, and attempts that need bumping, in nonce ASC order
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ForceRebroadcast ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ForceRebroadcast(seqs []SEQ, fee FEE, address ADDR, overrideGasLimit uint32) error
ForceRebroadcast sends a transaction for every nonce in the given nonce range at the given gas price. If an eth_tx exists for this nonce, we re-send the existing eth_tx with the supplied parameters. If an eth_tx doesn't exist for this nonce, we send a zero transaction. This operates completely orthogonal to the normal Confirmer and can result in untracked attempts! Only for emergency usage. This is in case of some unforeseen scenario where the node is refusing to release the lock. KISS.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport ¶ added in v2.2.0
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Name ¶ added in v2.2.0
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessHead ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ProcessHead(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
ProcessHead takes all required transactions for the confirmer on a new head
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RebroadcastWhereNecessary ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
RebroadcastWhereNecessary bumps gas or resends transactions that were previously out-of-eth
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ResumePendingTaskRuns ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) ResumePendingTaskRuns(ctx context.Context, head commontypes.Head[BLOCK_HASH]) error
ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback ¶ added in v2.2.0
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SetResumeCallback(callback ResumeCallback)
type DbEthTx ¶ added in v2.1.0
type DbEthTx struct { ID int64 Nonce *int64 FromAddress common.Address ToAddress common.Address EncodedPayload []byte Value assets.Eth // GasLimit on the EthTx is always the conceptual gas limit, which is not // necessarily the same as the on-chain encoded value (i.e. Optimism) GasLimit uint32 Error nullv4.String // BroadcastAt is updated every time an attempt for this eth_tx is re-sent // In almost all cases it will be within a second or so of the actual send time. BroadcastAt *time.Time // InitialBroadcastAt is recorded once, the first ever time this eth_tx is sent CreatedAt time.Time State txmgrtypes.TxState // Marshalled EthTxMeta // Used for additional context around transactions which you want to log // at send time. Meta *datatypes.JSON Subject uuid.NullUUID PipelineTaskRunID uuid.NullUUID MinConfirmations null.Uint32 EVMChainID utils.Big // AccessList is optional and only has an effect on DynamicFee transactions // on chains that support it (e.g. Ethereum Mainnet after London hard fork) AccessList EvmAccessList // TransmitChecker defines the check that should be performed before a transaction is submitted on // chain. TransmitChecker *datatypes.JSON InitialBroadcastAt *time.Time }
Directly maps to columns of database table "eth_txes". This is exported, as tests and other external code still directly reads DB using this schema.
func DbEthTxFromEthTx ¶ added in v2.1.0
type DbEthTxAttempt ¶ added in v2.1.0
type DbEthTxAttempt struct { ID int64 EthTxID int64 GasPrice *assets.Wei SignedRawTx []byte Hash common.Hash BroadcastBeforeBlockNum *int64 State txmgrtypes.TxAttemptState CreatedAt time.Time ChainSpecificGasLimit uint32 TxType int GasTipCap *assets.Wei GasFeeCap *assets.Wei }
Directly maps to columns of database table "eth_tx_attempts". This is exported, as tests and other external code still directly reads DB using this schema.
func DbEthTxAttemptFromEthTxAttempt ¶ added in v2.1.0
func DbEthTxAttemptFromEthTxAttempt(ethTxAttempt *EvmTxAttempt) DbEthTxAttempt
type DropOldestStrategy ¶
type DropOldestStrategy struct {
// contains filtered or unexported fields
}
DropOldestStrategy will send the newest N transactions, older ones will be removed from the queue
func NewDropOldestStrategy ¶
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy
NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded.
func (DropOldestStrategy) PruneQueue ¶
func (s DropOldestStrategy) PruneQueue(pruneService txmgrtypes.UnstartedTxQueuePruner, qopt pg.QOpt) (n int64, err error)
func (DropOldestStrategy) Subject ¶
func (s DropOldestStrategy) Subject() uuid.NullUUID
type EthTxMeta ¶
type EthTxMeta = txmgrtypes.TxMeta[common.Address, common.Hash] // TODO: change Eth prefix: https://smartcontract-it.atlassian.net/browse/BCI-1198
Type aliases for EVM
type EvmAccessList ¶ added in v2.2.0
type EvmAccessList struct { AccessList types.AccessList Valid bool }
EvmAccessList is a nullable EIP2930 access list Used in the AdditionalParameters field in Tx Is optional and only has an effect on DynamicFee transactions on chains that support it (e.g. Ethereum Mainnet after London hard fork)
func EvmAccessListFrom ¶ added in v2.2.0
func EvmAccessListFrom(al types.AccessList) (n EvmAccessList)
func (EvmAccessList) MarshalJSON ¶ added in v2.2.0
func (e EvmAccessList) MarshalJSON() ([]byte, error)
func (*EvmAccessList) Scan ¶ added in v2.2.0
func (e *EvmAccessList) Scan(value interface{}) error
Scan returns the selector from its serialization in the database
func (*EvmAccessList) UnmarshalJSON ¶ added in v2.2.0
func (e *EvmAccessList) UnmarshalJSON(input []byte) error
type EvmBroadcaster ¶ added in v2.1.0
type EvmBroadcaster = Broadcaster[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList, *assets.Wei]
Type aliases for EVM
func NewEvmBroadcaster ¶ added in v2.2.0
func NewEvmBroadcaster( txStore EvmTxStore, evmClient EvmTxmClient, config txmgrtypes.BroadcasterConfig[*assets.Wei], keystore EvmKeyStore, eventBroadcaster pg.EventBroadcaster, txAttemptBuilder EvmTxAttemptBuilder, nonceSyncer EvmNonceSyncer, logger logger.Logger, checkerFactory EvmTransmitCheckerFactory, autoSyncNonce bool, ) *EvmBroadcaster
NewEvmBroadcaster returns a new concrete EvmBroadcaster
type EvmBroadcasterConfig ¶ added in v2.1.0
type EvmBroadcasterConfig txmgrtypes.BroadcasterConfig[*assets.Wei]
type EvmConfirmer ¶ added in v2.1.0
type EvmConfirmer = Confirmer[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList, *assets.Wei]
Type aliases for EVM
func NewEvmConfirmer ¶ added in v2.2.0
func NewEvmConfirmer( txStore EvmTxStore, evmClient EvmTxmClient, config txmgrtypes.ConfirmerConfig[*assets.Wei], keystore EvmKeyStore, txAttemptBuilder EvmTxAttemptBuilder, lggr logger.Logger, ) *EvmConfirmer
NewEvmConfirmer instantiates a new EVM confirmer
type EvmConfirmerConfig ¶ added in v2.1.0
type EvmConfirmerConfig txmgrtypes.ConfirmerConfig[*assets.Wei]
type EvmFwdMgr ¶ added in v2.1.0
type EvmFwdMgr = txmgrtypes.ForwarderManager[common.Address]
Type aliases for EVM
type EvmKeyStore ¶ added in v2.1.0
Type aliases for EVM
type EvmNonceSyncer ¶ added in v2.1.0
Type aliases for EVM
func NewNonceSyncer ¶
func NewNonceSyncer( txStore EvmTxStore, lggr logger.Logger, ethClient evmclient.Client, kst EvmKeyStore, ) EvmNonceSyncer
NewNonceSyncer returns a new syncer
type EvmPriorAttempt ¶ added in v2.1.0
type EvmPriorAttempt = txmgrtypes.PriorAttempt[gas.EvmFee, common.Hash]
Type aliases for EVM
type EvmReaper ¶ added in v2.2.0
Type aliases for EVM
func NewEvmReaper ¶ added in v2.2.0
func NewEvmReaper(lggr logger.Logger, store txmgrtypes.TxHistoryReaper[*big.Int], config EvmReaperConfig, chainID *big.Int) *EvmReaper
NewEvmReaper instantiates a new EVM-specific reaper object
type EvmReaperConfig ¶ added in v2.1.0
type EvmReaperConfig txmgrtypes.ReaperConfig
type EvmReceipt ¶
Type aliases for EVM
func DbReceiptToEvmReceipt ¶ added in v2.1.0
func DbReceiptToEvmReceipt(receipt *dbReceipt) EvmReceipt
type EvmReceiptPlus ¶
type EvmReceiptPlus = txmgrtypes.ReceiptPlus[*evmtypes.Receipt]
Type aliases for EVM
type EvmResender ¶ added in v2.1.0
type EvmResender = Resender[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee, *evmtypes.Receipt, EvmAccessList]
Type aliases for EVM
func NewEvmResender ¶ added in v2.2.0
func NewEvmResender( lggr logger.Logger, txStore EvmTxStore, evmClient EvmTxmClient, ks EvmKeyStore, pollInterval time.Duration, config EvmResenderConfig, ) *EvmResender
NewEvnResender creates a new concrete EvmResender
type EvmResenderConfig ¶ added in v2.1.0
type EvmResenderConfig txmgrtypes.ResenderConfig
type EvmTransmitChecker ¶ added in v2.1.0
type EvmTransmitChecker = TransmitChecker[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
var ( // NoChecker is a TransmitChecker that always determines a transaction should be submitted. NoChecker EvmTransmitChecker = noChecker{} )
type EvmTransmitCheckerFactory ¶ added in v2.1.0
type EvmTransmitCheckerFactory = TransmitCheckerFactory[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTransmitCheckerSpec ¶ added in v2.1.0
type EvmTransmitCheckerSpec = txmgrtypes.TransmitCheckerSpec[common.Address]
type EvmTx ¶ added in v2.1.0
type EvmTx = txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTxAttempt ¶ added in v2.1.0
type EvmTxAttempt = txmgrtypes.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTxAttemptBuilder ¶ added in v2.1.0
type EvmTxAttemptBuilder = txmgrtypes.TxAttemptBuilder[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTxManager ¶ added in v2.1.0
type EvmTxManager = TxManager[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTxStore ¶ added in v2.1.0
type EvmTxStore = txmgrtypes.TxStore[common.Address, *big.Int, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
func NewTxStore ¶ added in v2.1.0
type EvmTxm ¶ added in v2.1.0
type EvmTxm = Txm[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList, *assets.Wei]
Type aliases for EVM
func NewTxm ¶
func NewTxm( db *sqlx.DB, ethClient evmclient.Client, cfg EvmTxmConfig, keyStore EvmKeyStore, eventBroadcaster pg.EventBroadcaster, lggr logger.Logger, checkerFactory EvmTransmitCheckerFactory, fwdMgr EvmFwdMgr, txAttemptBuilder EvmTxAttemptBuilder, txStore EvmTxStore, nonceSyncer EvmNonceSyncer, ethBroadcaster *EvmBroadcaster, ethConfirmer *EvmConfirmer, ethResender *EvmResender, q pg.Q, ) *EvmTxm
NewTxm creates a new Txm with the given configuration.
type EvmTxmClient ¶ added in v2.2.0
type EvmTxmClient = txmgrtypes.TxmClient[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type EvmTxmConfig ¶ added in v2.1.0
type EvmTxmConfig txmgrtypes.TxmConfig[*assets.Wei]
type NonceSyncer ¶
type NonceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable] interface { Sync(ctx context.Context, addr ADDR) (err error) }
NonceSyncer manages the delicate task of syncing the local nonce with the chain nonce in case of divergence.
On startup, we check each key for the nonce value on chain and compare it to our local value.
Usually the on-chain nonce will be the same as (or lower than) the next_nonce in the DB, in which case we do nothing.
If we are restoring from a backup however, or another wallet has used the account, the chain nonce might be higher than our local one. In this scenario, we must fastforward the local nonce to match the chain nonce.
The problem with doing this is that now Chainlink does not have any ownership or control over potentially pending transactions with nonces between our local highest nonce and the chain nonce. If one of those transactions is pushed out of the mempool or re-org'd out of the chain, we run the risk of being stuck with a gap in the nonce sequence that will never be filled.
The solution is to query the chain for our own transactions and take ownership of them by writing them to the database and letting the EthConfirmer handle them as it would any other transaction.
This is not quite as straightforward as one might expect. We cannot query transactions from our account to infinite depth (geth does not support this). The best we can do is to query for all transactions sent within the past EVM.FinalityDepth blocks and find the ones sent by our address(es).
This gives us re-org protection up to EVM.FinalityDepth deep in the worst case, which is in line with our other guarantees.
type NullEvmTxManager ¶ added in v2.1.0
type NullEvmTxManager = NullTxManager[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee, EvmAccessList]
Type aliases for EVM
type NullTxManager ¶
type NullTxManager[ CHAIN_ID txmgrtypes.ID, HEAD commontypes.Head[BLOCK_HASH], ADDR commontypes.Hashable, TX_HASH, BLOCK_HASH commontypes.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, ] struct { ErrMsg string }
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Close ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Close() error
Close does noop for NullTxManager.
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) CreateEthTransaction ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) CreateEthTransaction(txmgrtypes.NewTx[ADDR, TX_HASH], ...pg.QOpt) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) GetForwarderForEOA ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) HealthReport ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) HealthReport() map[string]error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Name ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Name() string
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) OnNewLongestChain ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) OnNewLongestChain(context.Context, *evmtypes.Head)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Ready ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Ready() error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) RegisterResumeCallback ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) RegisterResumeCallback(fn ResumeCallback)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Reset ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Reset(f func(), addr ADDR, abandon bool) error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) SendEther ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) SendEther(chainID *big.Int, from, to ADDR, value assets.Eth, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error)
SendEther does nothing, null functionality
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Start ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Start(context.Context) error
Start does noop for NullTxManager.
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Trigger ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) Trigger(ADDR)
Trigger does noop for NullTxManager.
type ProcessUnstartedTxs ¶ added in v2.2.0
type Reaper ¶
type Reaper[CHAIN_ID txmgrtypes.ID] struct { // contains filtered or unexported fields }
Reaper handles periodic database cleanup for Txm
func NewReaper ¶
func NewReaper[CHAIN_ID txmgrtypes.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID]
NewReaper instantiates a new reaper object
func (*Reaper[CHAIN_ID]) SetLatestBlockNum ¶
SetLatestBlockNum should be called on every new highest block number
type Resender ¶ added in v2.2.0
type Resender[ CHAIN_ID txmgrtypes.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], ADD any, ] struct { // contains filtered or unexported fields }
EthResender periodically picks up transactions that have been languishing unconfirmed for a configured amount of time without being sent, and sends their highest priced attempt again. This helps to defend against geth/parity silently dropping txes, or txes being ejected from the mempool.
Previously we relied on the bumper to do this for us implicitly but there can occasionally be problems with this (e.g. abnormally long block times, or if gas bumping is disabled)
func NewResender ¶ added in v2.2.0
func NewResender[ CHAIN_ID txmgrtypes.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], ADD any, ]( lggr logger.Logger, txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], pollInterval time.Duration, config txmgrtypes.ResenderConfig, ) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE, R, ADD]
type ResumeCallback ¶
ResumeCallback is assumed to be idempotent
type SendEveryStrategy ¶
type SendEveryStrategy struct{}
SendEveryStrategy will always send the tx
func (SendEveryStrategy) PruneQueue ¶
func (SendEveryStrategy) PruneQueue(pruneService txmgrtypes.UnstartedTxQueuePruner, qopt pg.QOpt) (int64, error)
func (SendEveryStrategy) Subject ¶
func (SendEveryStrategy) Subject() uuid.NullUUID
type SimulateChecker ¶
SimulateChecker simulates transactions, producing an error if they revert on chain.
func (*SimulateChecker) Check ¶
func (s *SimulateChecker) Check( ctx context.Context, l logger.Logger, tx EvmTx, a EvmTxAttempt, ) error
Check satisfies the TransmitChecker interface.
type TransmitChecker ¶
type TransmitChecker[ CHAIN_ID txmgrtypes.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, ] interface { // Check the given transaction. If the transaction should not be sent, an error indicating why // is returned. Errors should only be returned if the checker can confirm that a transaction // should not be sent, other errors (for example connection or other unexpected errors) should // be logged and swallowed. Check(ctx context.Context, l logger.Logger, tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD]) error }
TransmitChecker determines whether a transaction should be submitted on-chain.
type TransmitCheckerFactory ¶
type TransmitCheckerFactory[ CHAIN_ID txmgrtypes.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, ] interface { // BuildChecker builds a new TransmitChecker based on the given spec. BuildChecker(spec txmgrtypes.TransmitCheckerSpec[ADDR]) (TransmitChecker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], error) }
TransmitCheckerFactory creates a transmit checker based on a spec.
type TxAttemptSigner ¶
type TxAttemptSigner[ADDR commontypes.Hashable] interface { SignTx(fromAddress ADDR, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) }
type TxManager ¶
type TxManager[ CHAIN_ID txmgrtypes.ID, HEAD commontypes.Head[BLOCK_HASH], ADDR commontypes.Hashable, TX_HASH commontypes.Hashable, BLOCK_HASH commontypes.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, ] interface { txmgrtypes.HeadTrackable[HEAD, BLOCK_HASH] services.ServiceCtx Trigger(addr ADDR) CreateEthTransaction(newTx txmgrtypes.NewTx[ADDR, TX_HASH], qopts ...pg.QOpt) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) RegisterResumeCallback(fn ResumeCallback) SendEther(chainID *big.Int, from, to ADDR, value assets.Eth, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error) Reset(f func(), addr ADDR, abandon bool) error }
TxManager is the main component of the transaction manager. It is also the interface to external callers.
type Txm ¶
type Txm[ CHAIN_ID txmgrtypes.ID, HEAD commontypes.Head[BLOCK_HASH], ADDR commontypes.Hashable, TX_HASH commontypes.Hashable, BLOCK_HASH commontypes.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ txmgrtypes.Sequence, FEE txmgrtypes.Fee, ADD any, FEE_UNIT txmgrtypes.Unit, ] struct { utils.StartStopOnce // contains filtered or unexported fields }
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CreateEthTransaction ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) CreateEthTransaction(newTx txmgrtypes.NewTx[ADDR, TX_HASH], qs ...pg.QOpt) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error)
CreateEthTransaction inserts a new transaction
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) GetForwarderForEOA ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
Calls forwarderMgr to get a proper forwarder for a given EOA.
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) HealthReport ¶
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) OnNewLongestChain ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) OnNewLongestChain(ctx context.Context, head HEAD)
OnNewLongestChain conforms to HeadTrackable
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RegisterResumeCallback ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) RegisterResumeCallback(fn ResumeCallback)
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Reset ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Reset(callback func(), addr ADDR, abandon bool) (err error)
Reset stops EthBroadcaster/EthConfirmer, executes callback, then starts them again
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SendEther ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) SendEther(chainID CHAIN_ID, from, to ADDR, value assets.Eth, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD], err error)
SendEther creates a transaction that transfers the given value of ether
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE, ADD, FEE_UNIT]) Start(ctx context.Context) (merr error)
Start starts Txm service. The provided context can be used to terminate Start sequence.
type VRFV1Checker ¶
type VRFV1Checker struct { // Callbacks checks whether a VRF V1 request has already been fulfilled on the VRFCoordinator // Solidity contract Callbacks func(opts *bind.CallOpts, reqID [32]byte) (v1.Callbacks, error) Client evmclient.Client }
VRFV1Checker is an implementation of TransmitChecker that checks whether a VRF V1 fulfillment has already been fulfilled.
func (*VRFV1Checker) Check ¶
func (v *VRFV1Checker) Check( ctx context.Context, l logger.Logger, tx EvmTx, _ EvmTxAttempt, ) error
Check satisfies the TransmitChecker interface.
type VRFV2Checker ¶
type VRFV2Checker struct { // GetCommitment checks whether a VRF V2 request has been fulfilled on the VRFCoordinatorV2 // Solidity contract. GetCommitment func(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error) // HeadByNumber fetches the head given the number. If nil is provided, // the latest header is fetched. HeadByNumber func(ctx context.Context, n *big.Int) (*types.Head, error) // RequestBlockNumber is the block number of the VRFV2 request. RequestBlockNumber *big.Int }
VRFV2Checker is an implementation of TransmitChecker that checks whether a VRF V2 fulfillment has already been fulfilled.
func (*VRFV2Checker) Check ¶
func (v *VRFV2Checker) Check( ctx context.Context, l logger.Logger, tx EvmTx, _ EvmTxAttempt, ) error
Check satisfies the TransmitChecker interface.