Versions in this module Expand all Collapse all v0 v0.5.0 Jun 23, 2023 v0.4.0 Jun 22, 2023 v0.3.0 Jun 22, 2023 v0.2.0 Jun 22, 2023 Changes in this version + const EthTxAttemptBroadcast + const EthTxAttemptInProgress + const EthTxAttemptInsufficientEth + const EthTxConfirmed + const EthTxConfirmedMissingReceipt + const EthTxFatalError + const EthTxInProgress + const EthTxUnconfirmed + const EthTxUnstarted + const InFlightTransactionRecheckInterval + const TransmitCheckTimeout + const TransmitCheckerTypeSimulate + const TransmitCheckerTypeVRFV1 + const TransmitCheckerTypeVRFV2 + var ErrCouldNotGetReceipt = "could not get receipt" + var Max256BitUInt = big.NewInt(0).Exp(big.NewInt(2), big.NewInt(256), nil) + func CheckEthTxQueueCapacity(q pg.Queryer, fromAddress common.Address, maxQueuedTransactions uint64, ...) (err error) + func CountUnconfirmedTransactions(q pg.Q, fromAddress common.Address, chainID big.Int) (count uint32, err error) + func CountUnstartedTransactions(q pg.Q, fromAddress common.Address, chainID big.Int) (count uint32, err error) + type ChainKeyStore struct + func NewChainKeyStore(chainID big.Int, config Config, keystore KeyStore) ChainKeyStore + func (c *ChainKeyStore) NewDynamicFeeAttempt(etx EthTx, fee gas.DynamicFee, gasLimit uint32) (attempt EthTxAttempt, err error) + func (c *ChainKeyStore) NewLegacyAttempt(etx EthTx, gasPrice *assets.Wei, gasLimit uint32) (attempt EthTxAttempt, err error) + func (c *ChainKeyStore) SignTx(address common.Address, tx *gethTypes.Transaction) (common.Hash, []byte, error) + type CheckerFactory struct + Client evmclient.Client + func (c *CheckerFactory) BuildChecker(spec TransmitCheckerSpec) (TransmitChecker, error) + type Config interface + EthTxReaperInterval func() time.Duration + EthTxReaperThreshold func() time.Duration + EthTxResendAfterThreshold func() time.Duration + EvmGasBumpThreshold func() uint64 + EvmGasBumpTxDepth func() uint16 + EvmGasLimitDefault func() uint32 + EvmMaxInFlightTransactions func() uint32 + EvmMaxQueuedTransactions func() uint64 + EvmNonceAutoSync func() bool + EvmRPCDefaultBatchSize func() uint32 + EvmUseForwarders func() bool + KeySpecificMaxGasPriceWei func(addr common.Address) *assets.Wei + TriggerFallbackDBPollInterval func() time.Duration + type DropOldestStrategy struct + func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy + func (s DropOldestStrategy) PruneQueue(q pg.Queryer) (n int64, err error) + func (s DropOldestStrategy) Subject() uuid.NullUUID + type EthBroadcaster struct + func NewEthBroadcaster(db *sqlx.DB, ethClient evmclient.Client, config Config, keystore KeyStore, ...) *EthBroadcaster + func (eb *EthBroadcaster) Close() error + func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, keyState ethkey.State) (err error, retryable bool) + func (eb *EthBroadcaster) Start(ctx context.Context) error + func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) + func (eb *EthBroadcaster) Trigger(addr gethCommon.Address) + type EthConfirmer struct + func NewEthConfirmer(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore, ...) *EthConfirmer + func (ec *EthConfirmer) CheckConfirmedMissingReceipt(ctx context.Context) (err error) + func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error + func (ec *EthConfirmer) Close() error + func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head *evmtypes.Head) error + func (ec *EthConfirmer) FindEthTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address gethCommon.Address, ...) (etxs []*EthTx, err error) + func (ec *EthConfirmer) ForceRebroadcast(beginningNonce uint, endingNonce uint, gasPriceWei uint64, ...) error + func (ec *EthConfirmer) HealthReport() map[string]error + func (ec *EthConfirmer) Name() string + func (ec *EthConfirmer) ProcessHead(ctx context.Context, head *evmtypes.Head) error + func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error + func (ec *EthConfirmer) ResumePendingTaskRuns(ctx context.Context, head *evmtypes.Head) error + func (ec *EthConfirmer) Start(_ context.Context) error + type EthReceipt struct + BlockHash common.Hash + BlockNumber int64 + CreatedAt time.Time + ID int64 + Receipt evmtypes.Receipt + TransactionIndex uint + TxHash common.Hash + type EthReceiptsPlus struct + FailOnRevert bool + ID uuid.UUID + Receipt evmtypes.Receipt + type EthResender struct + func NewEthResender(lggr logger.Logger, orm ORM, ethClient evmclient.Client, ks KeyStore, ...) *EthResender + func (er *EthResender) Start() + func (er *EthResender) Stop() + type EthTx struct + AccessList NullableEIP2930AccessList + BroadcastAt *time.Time + CreatedAt time.Time + EVMChainID utils.Big + EncodedPayload []byte + Error null.String + EthTxAttempts []EthTxAttempt + FromAddress common.Address + GasLimit uint32 + ID int64 + InitialBroadcastAt *time.Time + Meta *datatypes.JSON + MinConfirmations cnull.Uint32 + Nonce *int64 + PipelineTaskRunID uuid.NullUUID + State EthTxState + Subject uuid.NullUUID + ToAddress common.Address + TransmitChecker *datatypes.JSON + Value assets.Eth + func (e EthTx) GetChecker() (TransmitCheckerSpec, error) + func (e EthTx) GetError() error + func (e EthTx) GetID() string + func (e EthTx) GetLogger(lgr logger.Logger) logger.Logger + func (e EthTx) GetMeta() (*EthTxMeta, error) + type EthTxAttempt struct + BroadcastBeforeBlockNum *int64 + ChainSpecificGasLimit uint32 + CreatedAt time.Time + EthReceipts []EthReceipt + EthTx EthTx + EthTxID int64 + GasFeeCap *assets.Wei + GasPrice *assets.Wei + GasTipCap *assets.Wei + Hash common.Hash + ID int64 + SignedRawTx []byte + State EthTxAttemptState + TxType int + func (a EthTxAttempt) DynamicFee() gas.DynamicFee + func (a EthTxAttempt) GetBroadcastBeforeBlockNum() *int64 + func (a EthTxAttempt) GetChainSpecificGasLimit() uint32 + func (a EthTxAttempt) GetGasPrice() *assets.Wei + func (a EthTxAttempt) GetHash() common.Hash + func (a EthTxAttempt) GetSignedTx() (*types.Transaction, error) + func (a EthTxAttempt) GetTxType() int + type EthTxAttemptState string + type EthTxMeta struct + FailOnRevert null.Bool + FwdrDestAddress *common.Address + JobID *int32 + MaxLink *string + RequestID *common.Hash + RequestIDs []common.Hash + RequestTxHash *common.Hash + RequestTxHashes []common.Hash + SubID *uint64 + UpkeepID *string + type EthTxState string + type KeyStore interface + CheckEnabled func(address common.Address, chainID *big.Int) error + EnabledKeysForChain func(chainID *big.Int) (keys []ethkey.KeyV2, err error) + GetNextNonce func(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) + GetStatesForChain func(chainID *big.Int) ([]ethkey.State, error) + IncrementNextNonce func(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error + SignTx func(fromAddress common.Address, tx *gethTypes.Transaction, chainID *big.Int) (*gethTypes.Transaction, error) + SubscribeToKeyChanges func() (ch chan struct{}, unsub func()) + type NSinserttx struct + Attempt EthTxAttempt + Etx EthTx + type NewTx struct + Checker TransmitCheckerSpec + EncodedPayload []byte + ForwarderAddress common.Address + FromAddress common.Address + GasLimit uint32 + Meta *EthTxMeta + MinConfirmations null.Uint32 + PipelineTaskRunID *uuid.UUID + Strategy TxStrategy + ToAddress common.Address + type NonceSyncer struct + func NewNonceSyncer(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, ethClient evmclient.Client, ...) *NonceSyncer + func (s NonceSyncer) Sync(ctx context.Context, keyState ethkey.State) (err error) + type NonceSyncerKeyStore interface + GetNextNonce func(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) + type NullTxManager struct + ErrMsg string + func (n *NullTxManager) Close() error + func (n *NullTxManager) CreateEthTransaction(NewTx, ...pg.QOpt) (etx EthTx, err error) + func (n *NullTxManager) GetForwarderForEOA(addr common.Address) (fwdr common.Address, err error) + func (n *NullTxManager) GetGasEstimator() gas.Estimator + func (n *NullTxManager) HealthReport() map[string]error + func (n *NullTxManager) Healthy() error + func (n *NullTxManager) Name() string + func (n *NullTxManager) OnNewLongestChain(context.Context, *evmtypes.Head) + func (n *NullTxManager) Ready() error + func (n *NullTxManager) RegisterResumeCallback(fn ResumeCallback) + func (n *NullTxManager) Reset(f func(), addr common.Address, abandon bool) error + func (n *NullTxManager) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error) + func (n *NullTxManager) Start(context.Context) error + func (n *NullTxManager) Trigger(common.Address) + type NullableEIP2930AccessList struct + AccessList types.AccessList + Valid bool + func NullableEIP2930AccessListFrom(al types.AccessList) (n NullableEIP2930AccessList) + func (e *NullableEIP2930AccessList) Scan(value interface{}) error + func (e *NullableEIP2930AccessList) UnmarshalJSON(input []byte) error + func (e NullableEIP2930AccessList) MarshalJSON() ([]byte, error) + func (e NullableEIP2930AccessList) Value() (driver.Value, error) + type ORM interface + Close func() + DeleteInProgressAttempt func(ctx context.Context, attempt EthTxAttempt) error + EthTransactions func(offset, limit int) ([]EthTx, int, error) + EthTransactionsWithAttempts func(offset, limit int) ([]EthTx, int, error) + EthTxAttempts func(offset, limit int) ([]EthTxAttempt, int, error) + FindEthReceiptsPendingConfirmation func(ctx context.Context, blockNum int64, chainID big.Int) (receiptsPlus []EthReceiptsPlus, err error) + FindEthTxAttempt func(hash common.Hash) (*EthTxAttempt, error) + FindEthTxAttemptConfirmedByEthTxIDs func(ids []int64) ([]EthTxAttempt, error) + FindEthTxAttemptsByEthTxIDs func(ids []int64) ([]EthTxAttempt, error) + FindEthTxAttemptsRequiringReceiptFetch func(chainID big.Int) (attempts []EthTxAttempt, err error) + FindEthTxAttemptsRequiringResend func(olderThan time.Time, maxInFlightTransactions uint32, chainID big.Int, ...) (attempts []EthTxAttempt, err error) + FindEthTxByHash func(hash common.Hash) (*EthTx, error) + FindEthTxWithAttempts func(etxID int64) (etx EthTx, err error) + FindEthTxWithNonce func(fromAddress common.Address, nonce uint) (etx *EthTx, err error) + FindEthTxsRequiringGasBump func(ctx context.Context, address common.Address, ...) (etxs []*EthTx, err error) + FindEthTxsRequiringResubmissionDueToInsufficientEth func(address common.Address, chainID big.Int, qopts ...pg.QOpt) (etxs []*EthTx, err error) + FindEtxAttemptsConfirmedMissingReceipt func(chainID big.Int) (attempts []EthTxAttempt, err error) + FindTransactionsConfirmedInBlockRange func(highBlockNumber, lowBlockNumber int64, chainID big.Int) (etxs []*EthTx, err error) + GetInProgressEthTxAttempts func(ctx context.Context, address common.Address, chainID big.Int) (attempts []EthTxAttempt, err error) + InsertEthReceipt func(receipt *EthReceipt) error + InsertEthTx func(etx *EthTx) error + InsertEthTxAttempt func(attempt *EthTxAttempt) error + LoadEthTxAttempts func(etx *EthTx, qopts ...pg.QOpt) error + LoadEthTxesAttempts func(etxs []*EthTx, qopts ...pg.QOpt) error + MarkAllConfirmedMissingReceipt func(chainID big.Int) (err error) + MarkOldTxesMissingReceiptAsErrored func(blockNum int64, finalityDepth uint32, chainID big.Int, qopts ...pg.QOpt) error + PreloadEthTxes func(attempts []EthTxAttempt) error + SaveConfirmedMissingReceiptAttempt func(ctx context.Context, timeout time.Duration, attempt *EthTxAttempt, ...) error + SaveFetchedReceipts func(receipts []evmtypes.Receipt, chainID big.Int) (err error) + SaveInProgressAttempt func(attempt *EthTxAttempt) error + SaveInsufficientEthAttempt func(timeout time.Duration, attempt *EthTxAttempt, broadcastAt time.Time) error + SaveReplacementInProgressAttempt func(oldAttempt EthTxAttempt, replacementAttempt *EthTxAttempt, qopts ...pg.QOpt) error + SaveSentAttempt func(timeout time.Duration, attempt *EthTxAttempt, broadcastAt time.Time) error + SetBroadcastBeforeBlockNum func(blockNum int64, chainID big.Int) error + UpdateBroadcastAts func(now time.Time, etxIDs []int64) error + UpdateEthTxForRebroadcast func(etx EthTx, etxAttempt EthTxAttempt) error + UpdateEthTxsUnconfirmed func(ids []int64) error + func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM + type Reaper struct + func NewReaper(lggr logger.Logger, db *sqlx.DB, config ReaperConfig, chainID big.Int) *Reaper + func (r *Reaper) ReapEthTxes(headNum int64) error + func (r *Reaper) SetLatestBlockNum(latestBlockNum int64) + func (r *Reaper) Start() + func (r *Reaper) Stop() + type ReaperConfig interface + EthTxReaperInterval func() time.Duration + EthTxReaperThreshold func() time.Duration + EvmFinalityDepth func() uint32 + type ResumeCallback func(id uuid.UUID, result interface{}, err error) error + type SendEveryStrategy struct + func (SendEveryStrategy) PruneQueue(pg.Queryer) (int64, error) + func (SendEveryStrategy) Subject() uuid.NullUUID + type SimulateChecker struct + Client evmclient.Client + func (s *SimulateChecker) Check(ctx context.Context, l logger.Logger, tx EthTx, a EthTxAttempt) error + type TransmitChecker interface + Check func(ctx context.Context, l logger.Logger, tx EthTx, a EthTxAttempt) error + var NoChecker TransmitChecker = noChecker{} + type TransmitCheckerFactory interface + BuildChecker func(spec TransmitCheckerSpec) (TransmitChecker, error) + type TransmitCheckerSpec struct + CheckerType TransmitCheckerType + VRFCoordinatorAddress *common.Address + VRFRequestBlockNumber *big.Int + type TransmitCheckerType string + type TxManager interface + CreateEthTransaction func(newTx NewTx, qopts ...pg.QOpt) (etx EthTx, err error) + GetForwarderForEOA func(eoa common.Address) (forwarder common.Address, err error) + GetGasEstimator func() gas.Estimator + RegisterResumeCallback func(fn ResumeCallback) + Reset func(f func(), addr common.Address, abandon bool) error + SendEther func(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error) + Trigger func(addr common.Address) + type TxStrategy interface + PruneQueue func(q pg.Queryer) (n int64, err error) + Subject func() uuid.NullUUID + func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy TxStrategy) + func NewSendEveryStrategy() TxStrategy + type Txm struct + func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore KeyStore, ...) *Txm + func (b *Txm) Close() (merr error) + func (b *Txm) CreateEthTransaction(newTx NewTx, qs ...pg.QOpt) (etx EthTx, err error) + func (b *Txm) GetForwarderForEOA(eoa common.Address) (forwarder common.Address, err error) + func (b *Txm) GetGasEstimator() gas.Estimator + func (b *Txm) HealthReport() map[string]error + func (b *Txm) Name() string + func (b *Txm) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) + func (b *Txm) RegisterResumeCallback(fn ResumeCallback) + func (b *Txm) Reset(callback func(), addr common.Address, abandon bool) (err error) + func (b *Txm) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error) + func (b *Txm) Start(ctx context.Context) (merr error) + func (b *Txm) Trigger(addr common.Address) + type VRFV1Checker struct + Callbacks func(opts *bind.CallOpts, reqID [32]byte) (v1.Callbacks, error) + Client evmclient.Client + func (v *VRFV1Checker) Check(ctx context.Context, l logger.Logger, tx EthTx, _ EthTxAttempt) error + type VRFV2Checker struct + GetCommitment func(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error) + HeadByNumber func(ctx context.Context, n *big.Int) (*types.Head, error) + RequestBlockNumber *big.Int + func (v *VRFV2Checker) Check(ctx context.Context, l logger.Logger, tx EthTx, _ EthTxAttempt) error