Documentation ¶
Index ¶
- Constants
- Variables
- func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy)
- func NewSendEveryStrategy() txmgrtypes.TxStrategy
- type Broadcaster
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close() error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport() map[string]error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name() string
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetResumeCallback(callback ResumeCallback)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(_ context.Context) error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger(addr ADDR)
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestCloseInternal() error
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestDisableUnstartedTxAutoProcessing()
- func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error
- type Confirmer
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, ...) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ...)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ForceRebroadcast(seqs []SEQ, fee FEE, address ADDR, overrideGasLimit uint32) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthReport() map[string]error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Name() string
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ProcessHead(ctx context.Context, head types.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetResumeCallback(callback ResumeCallback)
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestCloseInternal() error
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestSetClient(client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])
- func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestStartInternal() error
- type DropOldestStrategy
- type NullTxManager
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close() error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport() map[string]error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name() string
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) OnNewLongestChain(context.Context, HEAD)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Ready() error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(addr ADDR, abandon bool) error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, ...) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(context.Context) error
- func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger(ADDR)
- type ProcessUnstartedTxs
- type Reaper
- type Resender
- type ResumeCallback
- type SendEveryStrategy
- type SequenceSyncer
- type TransmitChecker
- type TransmitCheckerFactory
- type TxManager
- type Txm
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (merr error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthReport() map[string]error
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Name() string
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) OnNewLongestChain(ctx context.Context, head HEAD)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr ADDR, abandon bool) (err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, ...) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) (merr error)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(addr ADDR)
- func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAbandon(addr ADDR) (err error)
Constants ¶
const ( // InFlightTransactionRecheckInterval controls how often the Broadcaster // will poll the unconfirmed queue to see if it is allowed to send another // transaction InFlightTransactionRecheckInterval = 1 * time.Second // TransmitCheckTimeout controls the maximum amount of time that will be // spent on the transmit check. TransmitCheckTimeout = 2 * time.Second )
const ( TxUnstarted = txmgrtypes.TxState("unstarted") TxInProgress = txmgrtypes.TxState("in_progress") TxFatalError = txmgrtypes.TxState("fatal_error") TxUnconfirmed = txmgrtypes.TxState("unconfirmed") TxConfirmed = txmgrtypes.TxState("confirmed") TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt") )
const ( // pollInterval is the maximum amount of time in addition to // TxResendAfterThreshold that we will wait before resending an attempt DefaultResenderPollInterval = 5 * time.Second )
Variables ¶
var ErrTxRemoved = errors.New("tx removed")
Functions ¶
func NewQueueingTxStrategy ¶
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy)
NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.
func NewSendEveryStrategy ¶
func NewSendEveryStrategy() txmgrtypes.TxStrategy
NewSendEveryStrategy creates a new TxStrategy that does not drop transactions.
Types ¶
type Broadcaster ¶
type Broadcaster[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] struct { txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] utils.StartStopOnce // contains filtered or unexported fields }
Broadcaster monitors txes for transactions that need to be broadcast, assigns sequences and ensures that at least one node somewhere has received the transaction successfully.
This does not guarantee delivery! A whole host of other things can subsequently go wrong such as transactions being evicted from the mempool, nodes going offline etc. Responsibility for ensuring eventual inclusion into the chain falls on the shoulders of the confirmer.
What Broadcaster does guarantee is: - a monotonic series of increasing sequences for txes that can all eventually be confirmed if you retry enough times - transition of txes out of unstarted into either fatal_error or unconfirmed - existence of a saved tx_attempt
func NewBroadcaster ¶
func NewBroadcaster[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ]( txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE], client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], config txmgrtypes.BroadcasterChainConfig, feeConfig txmgrtypes.BroadcasterFeeConfig, txConfig txmgrtypes.BroadcasterTransactionsConfig, listenerConfig txmgrtypes.BroadcasterListenerConfig, keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], eventBroadcaster pg.EventBroadcaster, txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], logger logger.Logger, checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], autoSyncSequence bool, parseAddress func(string) (ADDR, error), generateNextSequence types.GenerateNextSequenceFunc[SEQ], ) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close() error
Close closes the Broadcaster
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence ¶ added in v2.7.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error)
Used to get the next usable sequence for a transaction
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport() map[string]error
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence ¶ added in v2.7.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ)
Used to increment the sequence in the mapping to have the next usable one available for the next transaction
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name() string
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error)
ProcessUnstartedTxs picks up and handles all txes in the queue revive:disable:error-return
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence ¶ added in v2.7.0
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ)
Used to set the next sequence explicitly to a certain value
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetResumeCallback ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetResumeCallback(callback ResumeCallback)
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(_ context.Context) error
Start starts Broadcaster service. The provided context can be used to terminate Start sequence.
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR)
syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger(addr ADDR)
Trigger forces the monitor for a particular address to recheck for new txes Logs error and does nothing if address was not registered on startup
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestCloseInternal ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestCloseInternal() error
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestDisableUnstartedTxAutoProcessing ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestDisableUnstartedTxAutoProcessing()
func (*Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal ¶
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error
type Confirmer ¶
type Confirmer[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ] struct { utils.StartStopOnce txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] // contains filtered or unexported fields }
Confirmer is a broad service which performs four different tasks in sequence on every new longest chain Step 1: Mark that all currently pending transaction attempts were broadcast before this block Step 2: Check pending transactions for receipts Step 3: See if any transactions have exceeded the gas bumping block threshold and, if so, bump them Step 4: Check confirmed transactions to make sure they are still in the longest chain (reorg protection)
func NewConfirmer ¶
func NewConfirmer[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ]( txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], chainConfig txmgrtypes.ConfirmerChainConfig, feeConfig txmgrtypes.ConfirmerFeeConfig, txConfig txmgrtypes.ConfirmerTransactionsConfig, dbConfig txmgrtypes.ConfirmerDatabaseConfig, keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], lggr logger.Logger, isReceiptNil func(R) bool, ) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckConfirmedMissingReceipt ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckConfirmedMissingReceipt(ctx context.Context) (err error)
CheckConfirmedMissingReceipt will attempt to re-send any transaction in the state of "confirmed_missing_receipt". If we get back any type of senderror other than "sequence too low" it means that this transaction isn't actually confirmed and needs to be put back into "unconfirmed" state, so that it can enter the gas bumping cycle. This is necessary in rare cases (e.g. Polygon) where network conditions are extremely hostile.
For example, assume the following scenario:
0. We are connected to multiple primary nodes via load balancer 1. We send a transaction, it is confirmed and, we get a receipt 2. A new head comes in from RPC node 1 indicating that this transaction was re-org'd, so we put it back into unconfirmed state 3. We re-send that transaction to a RPC node 2 **which hasn't caught up to this re-org yet** 4. RPC node 2 still has an old view of the chain, so it returns us "sequence too low" indicating "no problem this transaction is already mined" 5. Now the transaction is marked "confirmed_missing_receipt" but the latest chain does not actually include it 6. Now we are reliant on the Resender to propagate it, and this transaction will not be gas bumped, so in the event of gas spikes it could languish or even be evicted from the mempool and hold up the queue 7. Even if/when RPC node 2 catches up, the transaction is still stuck in state "confirmed_missing_receipt"
This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error
CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close ¶
Close is a comment to appease the linter
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error
EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth of the given chain and ensures that every one has a receipt with a block hash that is in the given chain.
If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringRebroadcast ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxsRequiringRebroadcast returns attempts that hit insufficient native tokens, and attempts that need bumping, in sequence ASC order
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ForceRebroadcast ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ForceRebroadcast(seqs []SEQ, fee FEE, address ADDR, overrideGasLimit uint32) error
ForceRebroadcast sends a transaction for every sequence in the given sequence range at the given gas price. If an tx exists for this sequence, we re-send the existing tx with the supplied parameters. If an tx doesn't exist for this sequence, we send a zero transaction. This operates completely orthogonal to the normal Confirmer and can result in untracked attempts! Only for emergency usage. This is in case of some unforeseen scenario where the node is refusing to release the lock. KISS.
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthReport ¶
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ProcessHead ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ProcessHead(ctx context.Context, head types.Head[BLOCK_HASH]) error
ProcessHead takes all required transactions for the confirmer on a new head
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RebroadcastWhereNecessary ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error
RebroadcastWhereNecessary bumps gas or resends transactions that were previously out-of-funds
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error
ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetResumeCallback ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetResumeCallback(callback ResumeCallback)
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) error
Start is a comment to appease the linter
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestCloseInternal ¶
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestSetClient ¶
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestSetClient(client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])
func (*Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestStartInternal ¶
type DropOldestStrategy ¶
type DropOldestStrategy struct {
// contains filtered or unexported fields
}
DropOldestStrategy will send the newest N transactions, older ones will be removed from the queue
func NewDropOldestStrategy ¶
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy
NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded.
func (DropOldestStrategy) PruneQueue ¶
func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error)
func (DropOldestStrategy) Subject ¶
func (s DropOldestStrategy) Subject() uuid.NullUUID
type NullTxManager ¶
type NullTxManager[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] struct { ErrMsg string }
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close() error
Close does noop for NullTxManager.
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CreateTransaction ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetForwarderForEOA(addr ADDR) (fwdr ADDR, err error)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HealthReport() map[string]error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Name() string
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) OnNewLongestChain ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) OnNewLongestChain(context.Context, HEAD)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Ready ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Ready() error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) RegisterResumeCallback ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback)
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Reset(addr ADDR, abandon bool) error
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SendNativeToken ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
SendNativeToken does nothing, null functionality
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(context.Context) error
Start does noop for NullTxManager.
func (*NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger ¶
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trigger(ADDR)
Trigger does noop for NullTxManager.
type ProcessUnstartedTxs ¶
type Reaper ¶
Reaper handles periodic database cleanup for Txm
func NewReaper ¶
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID]
NewReaper instantiates a new reaper object
func (*Reaper[CHAIN_ID]) SetLatestBlockNum ¶
SetLatestBlockNum should be called on every new highest block number
type Resender ¶
type Resender[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] struct { // contains filtered or unexported fields }
Resender periodically picks up transactions that have been languishing unconfirmed for a configured amount of time without being sent, and sends their highest priced attempt again. This helps to defend against geth/parity silently dropping txes, or txes being ejected from the mempool.
Previously we relied on the bumper to do this for us implicitly but there can occasionally be problems with this (e.g. abnormally long block times, or if gas bumping is disabled)
func NewResender ¶
func NewResender[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ]( lggr logger.Logger, txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE], client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], pollInterval time.Duration, config txmgrtypes.ResenderChainConfig, txConfig txmgrtypes.ResenderTransactionsConfig, ) *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
func (*Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start ¶
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start()
Start is a comment which satisfies the linter
func (*Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Stop ¶
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Stop()
Stop is a comment which satisfies the linter
func (*Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestResendUnconfirmed ¶
type ResumeCallback ¶
ResumeCallback is assumed to be idempotent
type SendEveryStrategy ¶
type SendEveryStrategy struct{}
SendEveryStrategy will always send the tx
func (SendEveryStrategy) PruneQueue ¶
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error)
func (SendEveryStrategy) Subject ¶
func (SendEveryStrategy) Subject() uuid.NullUUID
type SequenceSyncer ¶
type TransmitChecker ¶
type TransmitChecker[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] interface { // Check the given transaction. If the transaction should not be sent, an error indicating why // is returned. Errors should only be returned if the checker can confirm that a transaction // should not be sent, other errors (for example connection or other unexpected errors) should // be logged and swallowed. Check(ctx context.Context, l logger.Logger, tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error }
TransmitChecker determines whether a transaction should be submitted on-chain.
type TransmitCheckerFactory ¶
type TransmitCheckerFactory[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] interface { // BuildChecker builds a new TransmitChecker based on the given spec. BuildChecker(spec txmgrtypes.TransmitCheckerSpec[ADDR]) (TransmitChecker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) }
TransmitCheckerFactory creates a transmit checker based on a spec.
type TxManager ¶
type TxManager[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ] interface { types.HeadTrackable[HEAD, BLOCK_HASH] services.Service Trigger(addr ADDR) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) RegisterResumeCallback(fn ResumeCallback) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) Reset(addr ADDR, abandon bool) error }
TxManager is the main component of the transaction manager. It is also the interface to external callers.
type Txm ¶
type Txm[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ] struct { utils.StartStopOnce // contains filtered or unexported fields }
func NewTxm ¶
func NewTxm[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee, ]( chainId CHAIN_ID, cfg txmgrtypes.TransactionManagerChainConfig, txCfg txmgrtypes.TransactionManagerTransactionsConfig, keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], lggr logger.Logger, checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fwdMgr txmgrtypes.ForwarderManager[ADDR], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
NewTxm creates a new Txm with the given configuration.
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
CreateTransaction inserts a new transaction
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
Calls forwarderMgr to get a proper forwarder for a given EOA.
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthReport ¶
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) OnNewLongestChain ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) OnNewLongestChain(ctx context.Context, head HEAD)
OnNewLongestChain conforms to HeadTrackable
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback)
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr ADDR, abandon bool) (err error)
Reset stops Broadcaster/Confirmer, executes callback, then starts them again
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNativeToken ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
SendNativeToken creates a transaction that transfers the given value of native tokens
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) (merr error)
Start starts Txm service. The provided context can be used to terminate Start sequence.
func (*Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger ¶
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(addr ADDR)
Trigger forces the Broadcaster to check early for the given address