txmgr

package
v2.0.0-beta0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// InFlightTransactionRecheckInterval controls how often the EthBroadcaster
	// will poll the unconfirmed queue to see if it is allowed to send another
	// transaction
	InFlightTransactionRecheckInterval = 1 * time.Second

	// TransmitCheckTimeout controls the maximum amount of time that will be
	// spent on the transmit check.
	TransmitCheckTimeout = 2 * time.Second
)
View Source
const (
	EthTxUnstarted               = EthTxState("unstarted")
	EthTxInProgress              = EthTxState("in_progress")
	EthTxFatalError              = EthTxState("fatal_error")
	EthTxUnconfirmed             = EthTxState("unconfirmed")
	EthTxConfirmed               = EthTxState("confirmed")
	EthTxConfirmedMissingReceipt = EthTxState("confirmed_missing_receipt")

	// TransmitCheckerTypeSimulate is a checker that simulates the transaction before executing on
	// chain.
	TransmitCheckerTypeSimulate = TransmitCheckerType("simulate")

	// TransmitCheckerTypeVRFV1 is a checker that will not submit VRF V1 fulfillment requests that
	// have already been fulfilled. This could happen if the request was fulfilled by another node.
	TransmitCheckerTypeVRFV1 = TransmitCheckerType("vrf_v1")

	// TransmitCheckerTypeVRFV2 is a checker that will not submit VRF V2 fulfillment requests that
	// have already been fulfilled. This could happen if the request was fulfilled by another node.
	TransmitCheckerTypeVRFV2 = TransmitCheckerType("vrf_v2")
)

Variables

View Source
var (
	// ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt
	// This most likely happened because an external wallet used the account for this nonce
	ErrCouldNotGetReceipt = "could not get receipt"
)
View Source
var ErrInvalidQOpt = errors.New("orm: Invalid QOpt")
View Source
var ErrKeyNotUpdated = errors.New("orm: Key not updated")
View Source
var Max256BitUInt = big.NewInt(0).Exp(big.NewInt(2), big.NewInt(256), nil)

Functions

func NewEvmTxAttemptBuilder

func NewEvmTxAttemptBuilder(chainID big.Int, config Config, keystore TxAttemptSigner, estimator gas.EvmFeeEstimator) *evmTxAttemptBuilder

func NewQueueingTxStrategy

func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy)

NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.

func NewSendEveryStrategy

func NewSendEveryStrategy() txmgrtypes.TxStrategy

NewSendEveryStrategy creates a new TxStrategy that does not drop transactions.

Types

type CheckerFactory

type CheckerFactory struct {
	Client evmclient.Client
}

CheckerFactory is a real implementation of TransmitCheckerFactory.

func (*CheckerFactory) BuildChecker

func (c *CheckerFactory) BuildChecker(spec TransmitCheckerSpec) (TransmitChecker, error)

BuildChecker satisfies the TransmitCheckerFactory interface.

type Config

type Config interface {
	gas.Config
	pg.QConfig
	EthTxReaperInterval() time.Duration
	EthTxReaperThreshold() time.Duration
	EthTxResendAfterThreshold() time.Duration
	EvmGasBumpThreshold() uint64
	EvmGasBumpTxDepth() uint16
	EvmGasLimitDefault() uint32
	EvmMaxInFlightTransactions() uint32
	EvmMaxQueuedTransactions() uint64
	EvmNonceAutoSync() bool
	EvmUseForwarders() bool
	EvmRPCDefaultBatchSize() uint32
	KeySpecificMaxGasPriceWei(addr common.Address) *assets.Wei
	TriggerFallbackDBPollInterval() time.Duration
}

Config encompasses config used by txmgr package Unless otherwise specified, these should support changing at runtime

type DropOldestStrategy

type DropOldestStrategy struct {
	// contains filtered or unexported fields
}

DropOldestStrategy will send the newest N transactions, older ones will be removed from the queue

func NewDropOldestStrategy

func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy

NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the queue size is exceeded.

func (DropOldestStrategy) PruneQueue

func (s DropOldestStrategy) PruneQueue(pruneService txmgrtypes.UnstartedTxQueuePruner, qopt pg.QOpt) (n int64, err error)

func (DropOldestStrategy) Subject

func (s DropOldestStrategy) Subject() uuid.NullUUID

type EthBroadcaster

type EthBroadcaster struct {
	txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt]

	utils.StartStopOnce
	// contains filtered or unexported fields
}

EthBroadcaster monitors eth_txes for transactions that need to be broadcast, assigns nonces and ensures that at least one eth node somewhere has received the transaction successfully.

This does not guarantee delivery! A whole host of other things can subsequently go wrong such as transactions being evicted from the mempool, eth nodes going offline etc. Responsibility for ensuring eventual inclusion into the chain falls on the shoulders of the ethConfirmer.

What EthBroadcaster does guarantee is: - a monotonic series of increasing nonces for eth_txes that can all eventually be confirmed if you retry enough times - transition of eth_txes out of unstarted into either fatal_error or unconfirmed - existence of a saved eth_tx_attempt

func NewEthBroadcaster

func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore,
	eventBroadcaster pg.EventBroadcaster,
	keyStates []ethkey.State, resumeCallback ResumeCallback,
	txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt],
	logger logger.Logger, checkerFactory TransmitCheckerFactory, autoSyncNonce bool) *EthBroadcaster

NewEthBroadcaster returns a new concrete EthBroadcaster

func (*EthBroadcaster) Close

func (eb *EthBroadcaster) Close() error

Close closes the EthBroadcaster

func (*EthBroadcaster) HealthReport

func (eb *EthBroadcaster) HealthReport() map[string]error

func (*EthBroadcaster) Name

func (eb *EthBroadcaster) Name() string

func (*EthBroadcaster) ProcessUnstartedEthTxs

func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, keyState ethkey.State) (err error, retryable bool)

ProcessUnstartedEthTxs picks up and handles all eth_txes in the queue revive:disable:error-return

func (*EthBroadcaster) Start

func (eb *EthBroadcaster) Start(ctx context.Context) error

Start starts EthBroadcaster service. The provided context can be used to terminate Start sequence.

func (*EthBroadcaster) SyncNonce

func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State)

syncNonce tries to sync the key nonce, retrying indefinitely until success

func (*EthBroadcaster) Trigger

func (eb *EthBroadcaster) Trigger(addr gethCommon.Address)

Trigger forces the monitor for a particular address to recheck for new eth_txes Logs error and does nothing if address was not registered on startup

type EthConfirmer

type EthConfirmer struct {
	utils.StartStopOnce

	txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt]
	// contains filtered or unexported fields
}

EthConfirmer is a broad service which performs four different tasks in sequence on every new longest chain Step 1: Mark that all currently pending transaction attempts were broadcast before this block Step 2: Check pending transactions for receipts Step 3: See if any transactions have exceeded the gas bumping block threshold and, if so, bump them Step 4: Check confirmed transactions to make sure they are still in the longest chain (reorg protection)

func NewEthConfirmer

func NewEthConfirmer(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore,
	keyStates []ethkey.State, resumeCallback ResumeCallback,
	txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt],
	lggr logger.Logger) *EthConfirmer

NewEthConfirmer instantiates a new eth confirmer

func (*EthConfirmer) CheckConfirmedMissingReceipt

func (ec *EthConfirmer) CheckConfirmedMissingReceipt(ctx context.Context) (err error)

CheckConfirmedMissingReceipt will attempt to re-send any transaction in the state of "confirmed_missing_receipt". If we get back any type of senderror other than "nonce too low" it means that this transaction isn't actually confirmed and needs to be put back into "unconfirmed" state, so that it can enter the gas bumping cycle. This is necessary in rare cases (e.g. Polygon) where network conditions are extremely hostile.

For example, assume the following scenario:

0. We are connected to multiple primary nodes via load balancer 1. We send a transaction, it is confirmed and, we get a receipt 2. A new head comes in from RPC node 1 indicating that this transaction was re-org'd, so we put it back into unconfirmed state 3. We re-send that transaction to a RPC node 2 **which hasn't caught up to this re-org yet** 4. RPC node 2 still has an old view of the chain, so it returns us "nonce too low" indicating "no problem this transaction is already mined" 5. Now the transaction is marked "confirmed_missing_receipt" but the latest chain does not actually include it 6. Now we are reliant on the EthResender to propagate it, and this transaction will not be gas bumped, so in the event of gas spikes it could languish or even be evicted from the mempool and hold up the queue 7. Even if/when RPC node 2 catches up, the transaction is still stuck in state "confirmed_missing_receipt"

This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.

func (*EthConfirmer) CheckForReceipts

func (ec *EthConfirmer) CheckForReceipts(ctx context.Context, blockNum int64) error

CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number

func (*EthConfirmer) Close

func (ec *EthConfirmer) Close() error

Close is a comment to appease the linter

func (*EthConfirmer) EnsureConfirmedTransactionsInLongestChain

func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head txmgrtypes.Head) error

EnsureConfirmedTransactionsInLongestChain finds all confirmed eth_txes up to the depth of the given chain and ensures that every one has a receipt with a block hash that is in the given chain.

If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast.

func (*EthConfirmer) FindEthTxsRequiringRebroadcast

func (ec *EthConfirmer) FindEthTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address gethCommon.Address, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID big.Int) (etxs []*EthTx, err error)

FindEthTxsRequiringRebroadcast returns attempts that hit insufficient eth, and attempts that need bumping, in nonce ASC order

func (*EthConfirmer) ForceRebroadcast

func (ec *EthConfirmer) ForceRebroadcast(beginningNonce int64, endingNonce int64, gasPriceWei uint64, address gethCommon.Address, overrideGasLimit uint32) error

ForceRebroadcast sends a transaction for every nonce in the given nonce range at the given gas price. If an eth_tx exists for this nonce, we re-send the existing eth_tx with the supplied parameters. If an eth_tx doesn't exist for this nonce, we send a zero transaction. This operates completely orthogonal to the normal EthConfirmer and can result in untracked attempts! Only for emergency usage. This is in case of some unforeseen scenario where the node is refusing to release the lock. KISS.

func (*EthConfirmer) HealthReport

func (ec *EthConfirmer) HealthReport() map[string]error

func (*EthConfirmer) Name

func (ec *EthConfirmer) Name() string

func (*EthConfirmer) ProcessHead

func (ec *EthConfirmer) ProcessHead(ctx context.Context, head txmgrtypes.Head) error

ProcessHead takes all required transactions for the confirmer on a new head

func (*EthConfirmer) RebroadcastWhereNecessary

func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeight int64) error

RebroadcastWhereNecessary bumps gas or resends transactions that were previously out-of-eth

func (*EthConfirmer) ResumePendingTaskRuns

func (ec *EthConfirmer) ResumePendingTaskRuns(ctx context.Context, head txmgrtypes.Head) error

ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts

func (*EthConfirmer) Start

func (ec *EthConfirmer) Start(_ context.Context) error

Start is a comment to appease the linter

type EthResender

type EthResender struct {
	// contains filtered or unexported fields
}

EthResender periodically picks up transactions that have been languishing unconfirmed for a configured amount of time without being sent, and sends their highest priced attempt again. This helps to defend against geth/parity silently dropping txes, or txes being ejected from the mempool.

Previously we relied on the bumper to do this for us implicitly but there can occasionally be problems with this (e.g. abnormally long block times, or if gas bumping is disabled)

func NewEthResender

func NewEthResender(lggr logger.Logger, orm ORM, ethClient evmclient.Client, ks KeyStore, pollInterval time.Duration, config Config) *EthResender

NewEthResender creates a new concrete EthResender

func (*EthResender) Start

func (er *EthResender) Start()

Start is a comment which satisfies the linter

func (*EthResender) Stop

func (er *EthResender) Stop()

Stop is a comment which satisfies the linter

type EthTx

type EthTx struct {
	ID             int64
	Nonce          *int64
	FromAddress    common.Address
	ToAddress      common.Address
	EncodedPayload []byte
	Value          assets.Eth
	// GasLimit on the EthTx is always the conceptual gas limit, which is not
	// necessarily the same as the on-chain encoded value (i.e. Optimism)
	GasLimit uint32
	Error    null.String
	// BroadcastAt is updated every time an attempt for this eth_tx is re-sent
	// In almost all cases it will be within a second or so of the actual send time.
	BroadcastAt *time.Time
	// InitialBroadcastAt is recorded once, the first ever time this eth_tx is sent
	InitialBroadcastAt *time.Time
	CreatedAt          time.Time
	State              EthTxState
	EthTxAttempts      []EthTxAttempt `json:"-"`
	// Marshalled EthTxMeta
	// Used for additional context around transactions which you want to log
	// at send time.
	Meta       *datatypes.JSON
	Subject    uuid.NullUUID
	EVMChainID utils.Big

	PipelineTaskRunID uuid.NullUUID
	MinConfirmations  cnull.Uint32

	// AccessList is optional and only has an effect on DynamicFee transactions
	// on chains that support it (e.g. Ethereum Mainnet after London hard fork)
	AccessList NullableEIP2930AccessList

	// TransmitChecker defines the check that should be performed before a transaction is submitted on
	// chain.
	TransmitChecker *datatypes.JSON
}

func (EthTx) GetChecker

func (e EthTx) GetChecker() (TransmitCheckerSpec, error)

GetChecker returns an EthTx's transmit checker spec in struct form, unmarshalling it from JSON first.

func (EthTx) GetError

func (e EthTx) GetError() error

func (EthTx) GetID

func (e EthTx) GetID() string

GetID allows EthTx to be used as jsonapi.MarshalIdentifier

func (EthTx) GetLogger

func (e EthTx) GetLogger(lgr logger.Logger) logger.Logger

GetLogger returns a new logger with metadata fields.

func (EthTx) GetMeta

func (e EthTx) GetMeta() (*EthTxMeta, error)

GetMeta returns an EthTx's meta in struct form, unmarshalling it from JSON first.

type EthTxAttempt

type EthTxAttempt struct {
	ID      int64
	EthTxID int64
	EthTx   EthTx
	// GasPrice applies to LegacyTx
	GasPrice *assets.Wei
	// GasTipCap and GasFeeCap are used instead for DynamicFeeTx
	GasTipCap *assets.Wei
	GasFeeCap *assets.Wei
	// ChainSpecificGasLimit on the EthTxAttempt is always the same as the on-chain encoded value for gas limit
	ChainSpecificGasLimit   uint32
	SignedRawTx             []byte
	Hash                    common.Hash
	CreatedAt               time.Time
	BroadcastBeforeBlockNum *int64
	State                   txmgrtypes.TxAttemptState
	EthReceipts             []EvmReceipt `json:"-"`
	TxType                  int
}

func (EthTxAttempt) Fee

func (a EthTxAttempt) Fee() (fee gas.EvmFee)

func (EthTxAttempt) GetBroadcastBeforeBlockNum

func (a EthTxAttempt) GetBroadcastBeforeBlockNum() *int64

func (EthTxAttempt) GetChainSpecificGasLimit

func (a EthTxAttempt) GetChainSpecificGasLimit() uint32

func (EthTxAttempt) GetHash

func (a EthTxAttempt) GetHash() common.Hash

func (EthTxAttempt) GetSignedTx

func (a EthTxAttempt) GetSignedTx() (*types.Transaction, error)

GetSignedTx decodes the SignedRawTx into a types.Transaction struct

func (EthTxAttempt) GetTxType

func (a EthTxAttempt) GetTxType() int

func (EthTxAttempt) String

func (a EthTxAttempt) String() string

type EthTxMeta

type EthTxMeta struct {
	JobID *int32 `json:"JobID,omitempty"`

	// Pipeline fields
	FailOnRevert null.Bool `json:"FailOnRevert,omitempty"`

	// VRF-only fields
	RequestID     *common.Hash `json:"RequestID,omitempty"`
	RequestTxHash *common.Hash `json:"RequestTxHash,omitempty"`
	// Batch variants of the above
	RequestIDs      []common.Hash `json:"RequestIDs,omitempty"`
	RequestTxHashes []common.Hash `json:"RequestTxHashes,omitempty"`
	// Used for the VRFv2 - max link this tx will bill
	// should it get bumped
	MaxLink *string `json:"MaxLink,omitempty"`
	// Used for the VRFv2 - the subscription ID of the
	// requester of the VRF.
	SubID *uint64 `json:"SubId,omitempty"`

	// Used for keepers
	UpkeepID *string `json:"UpkeepID,omitempty"`

	// Used only for forwarded txs, tracks the original destination address.
	// When this is set, it indicates tx is forwarded through To address.
	FwdrDestAddress *common.Address `json:"ForwarderDestAddress,omitempty"`
}

EthTxMeta contains fields of the transaction metadata Not all fields are guaranteed to be present

type EthTxState

type EthTxState string

type EvmReceiptPlus

type EvmReceiptPlus = txmgrtypes.ReceiptPlus[evmtypes.Receipt]

type KeyStore

type KeyStore interface {
	CheckEnabled(address common.Address, chainID *big.Int) error
	EnabledKeysForChain(chainID *big.Int) (keys []ethkey.KeyV2, err error)
	GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error)
	GetStatesForChain(chainID *big.Int) ([]ethkey.State, error)
	IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error
	SubscribeToKeyChanges() (ch chan struct{}, unsub func())
}

KeyStore encompasses the subset of keystore used by txmgr

type NSinserttx

type NSinserttx struct {
	Etx     EthTx
	Attempt EthTxAttempt
}

NSinserttx represents an EthTx and Attempt to be inserted together

type NewTx

type NewTx struct {
	FromAddress      common.Address
	ToAddress        common.Address
	EncodedPayload   []byte
	GasLimit         uint32
	Meta             *EthTxMeta
	ForwarderAddress common.Address

	// Pipeline variables - if you aren't calling this from ethtx task within
	// the pipeline, you don't need these variables
	MinConfirmations  null.Uint32
	PipelineTaskRunID *uuid.UUID

	Strategy txmgrtypes.TxStrategy

	// Checker defines the check that should be run before a transaction is submitted on chain.
	Checker TransmitCheckerSpec
}

type NonceSyncer

type NonceSyncer struct {
	// contains filtered or unexported fields
}

NonceSyncer manages the delicate task of syncing the local nonce with the chain nonce in case of divergence.

On startup, we check each key for the nonce value on chain and compare it to our local value.

Usually the on-chain nonce will be the same as (or lower than) the next_nonce in the DB, in which case we do nothing.

If we are restoring from a backup however, or another wallet has used the account, the chain nonce might be higher than our local one. In this scenario, we must fastforward the local nonce to match the chain nonce.

The problem with doing this is that now Chainlink does not have any ownership or control over potentially pending transactions with nonces between our local highest nonce and the chain nonce. If one of those transactions is pushed out of the mempool or re-org'd out of the chain, we run the risk of being stuck with a gap in the nonce sequence that will never be filled.

The solution is to query the chain for our own transactions and take ownership of them by writing them to the database and letting the EthConfirmer handle them as it would any other transaction.

This is not quite as straightforward as one might expect. We cannot query transactions from our account to infinite depth (geth does not support this). The best we can do is to query for all transactions sent within the past EVM.FinalityDepth blocks and find the ones sent by our address(es).

This gives us re-org protection up to EVM.FinalityDepth deep in the worst case, which is in line with our other guarantees.

func NewNonceSyncer

func NewNonceSyncer(orm ORM, lggr logger.Logger, ethClient evmclient.Client, kst NonceSyncerKeyStore) *NonceSyncer

NewNonceSyncer returns a new syncer

func (NonceSyncer) Sync

func (s NonceSyncer) Sync(ctx context.Context, keyState ethkey.State) (err error)

SyncAll syncs nonces for all enabled keys in parallel

This should only be called once, before the EthBroadcaster has started. Calling it later is not safe and could lead to races.

type NonceSyncerKeyStore

type NonceSyncerKeyStore interface {
	GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error)
}

type NullTxManager

type NullTxManager struct {
	ErrMsg string
}

func (*NullTxManager) Close

func (n *NullTxManager) Close() error

Close does noop for NullTxManager.

func (*NullTxManager) CreateEthTransaction

func (n *NullTxManager) CreateEthTransaction(NewTx, ...pg.QOpt) (etx EthTx, err error)

func (*NullTxManager) GetForwarderForEOA

func (n *NullTxManager) GetForwarderForEOA(addr common.Address) (fwdr common.Address, err error)

func (*NullTxManager) HealthReport

func (n *NullTxManager) HealthReport() map[string]error

func (*NullTxManager) Name

func (n *NullTxManager) Name() string

func (*NullTxManager) OnNewLongestChain

func (n *NullTxManager) OnNewLongestChain(context.Context, *evmtypes.Head)

func (*NullTxManager) Ready

func (n *NullTxManager) Ready() error

func (*NullTxManager) RegisterResumeCallback

func (n *NullTxManager) RegisterResumeCallback(fn ResumeCallback)

func (*NullTxManager) Reset

func (n *NullTxManager) Reset(f func(), addr common.Address, abandon bool) error

func (*NullTxManager) SendEther

func (n *NullTxManager) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)

SendEther does nothing, null functionality

func (*NullTxManager) Start

func (n *NullTxManager) Start(context.Context) error

Start does noop for NullTxManager.

func (*NullTxManager) Trigger

func (n *NullTxManager) Trigger(common.Address)

Trigger does noop for NullTxManager.

type NullableEIP2930AccessList

type NullableEIP2930AccessList struct {
	AccessList types.AccessList
	Valid      bool
}

func NullableEIP2930AccessListFrom

func NullableEIP2930AccessListFrom(al types.AccessList) (n NullableEIP2930AccessList)

func (NullableEIP2930AccessList) MarshalJSON

func (e NullableEIP2930AccessList) MarshalJSON() ([]byte, error)

func (*NullableEIP2930AccessList) Scan

func (e *NullableEIP2930AccessList) Scan(value interface{}) error

Scan returns the selector from its serialization in the database

func (*NullableEIP2930AccessList) UnmarshalJSON

func (e *NullableEIP2930AccessList) UnmarshalJSON(input []byte) error

func (NullableEIP2930AccessList) Value

Value returns this instance serialized for database storage

type ORM

func NewORM

func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM

type Reaper

type Reaper struct {
	// contains filtered or unexported fields
}

Reaper handles periodic database cleanup for Txm

func NewReaper

func NewReaper(lggr logger.Logger, db *sqlx.DB, config ReaperConfig, chainID big.Int) *Reaper

NewReaper instantiates a new reaper object

func (*Reaper) ReapEthTxes

func (r *Reaper) ReapEthTxes(headNum int64) error

ReapEthTxes deletes old eth_txes

func (*Reaper) SetLatestBlockNum

func (r *Reaper) SetLatestBlockNum(latestBlockNum int64)

SetLatestBlockNum should be called on every new highest block number

func (*Reaper) Start

func (r *Reaper) Start()

Start the reaper. Should only be called once.

func (*Reaper) Stop

func (r *Reaper) Stop()

Stop the reaper. Should only be called once.

type ReaperConfig

type ReaperConfig interface {
	EthTxReaperInterval() time.Duration
	EthTxReaperThreshold() time.Duration
	EvmFinalityDepth() uint32
}

ReaperConfig is the config subset used by the reaper

type ResumeCallback

type ResumeCallback func(id uuid.UUID, result interface{}, err error) error

ResumeCallback is assumed to be idempotent

type SendEveryStrategy

type SendEveryStrategy struct{}

SendEveryStrategy will always send the tx

func (SendEveryStrategy) PruneQueue

func (SendEveryStrategy) PruneQueue(pruneService txmgrtypes.UnstartedTxQueuePruner, qopt pg.QOpt) (int64, error)

func (SendEveryStrategy) Subject

func (SendEveryStrategy) Subject() uuid.NullUUID

type SimulateChecker

type SimulateChecker struct {
	Client evmclient.Client
}

SimulateChecker simulates transactions, producing an error if they revert on chain.

func (*SimulateChecker) Check

func (s *SimulateChecker) Check(
	ctx context.Context,
	l logger.Logger,
	tx EthTx,
	a EthTxAttempt,
) error

Check satisfies the TransmitChecker interface.

type TransmitChecker

type TransmitChecker interface {

	// Check the given transaction. If the transaction should not be sent, an error indicating why
	// is returned. Errors should only be returned if the checker can confirm that a transaction
	// should not be sent, other errors (for example connection or other unexpected errors) should
	// be logged and swallowed.
	Check(ctx context.Context, l logger.Logger, tx EthTx, a EthTxAttempt) error
}

TransmitChecker determines whether a transaction should be submitted on-chain.

var (
	// NoChecker is a TransmitChecker that always determines a transaction should be submitted.
	NoChecker TransmitChecker = noChecker{}
)

type TransmitCheckerFactory

type TransmitCheckerFactory interface {
	// BuildChecker builds a new TransmitChecker based on the given spec.
	BuildChecker(spec TransmitCheckerSpec) (TransmitChecker, error)
}

TransmitCheckerFactory creates a transmit checker based on a spec.

type TransmitCheckerSpec

type TransmitCheckerSpec struct {
	// CheckerType is the type of check that should be performed. Empty indicates no check.
	CheckerType TransmitCheckerType `json:",omitempty"`

	// VRFCoordinatorAddress is the address of the VRF coordinator that should be used to perform
	// VRF transmit checks. This should be set iff CheckerType is TransmitCheckerTypeVRFV2.
	VRFCoordinatorAddress *common.Address `json:",omitempty"`

	// VRFRequestBlockNumber is the block number in which the provided VRF request has been made.
	// This should be set iff CheckerType is TransmitCheckerTypeVRFV2.
	VRFRequestBlockNumber *big.Int `json:",omitempty"`
}

TransmitCheckerSpec defines the check that should be performed before a transaction is submitted on chain.

type TransmitCheckerType

type TransmitCheckerType string

TransmitCheckerType describes the type of check that should be performed before a transaction is executed on-chain.

type TxAttemptSigner

type TxAttemptSigner interface {
	SignTx(fromAddress common.Address, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error)
}

type TxManager

type TxManager interface {
	txmgrtypes.HeadTrackable[*evmtypes.Head]
	services.ServiceCtx
	Trigger(addr common.Address)
	CreateEthTransaction(newTx NewTx, qopts ...pg.QOpt) (etx EthTx, err error)
	GetForwarderForEOA(eoa common.Address) (forwarder common.Address, err error)
	RegisterResumeCallback(fn ResumeCallback)
	SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)
	Reset(f func(), addr common.Address, abandon bool) error
}

TxManager is the main component of the transaction manager. It is also the interface to external callers.

type Txm

type Txm struct {
	utils.StartStopOnce
	// contains filtered or unexported fields
}

func NewTxm

func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore KeyStore, eventBroadcaster pg.EventBroadcaster, lggr logger.Logger, checkerFactory TransmitCheckerFactory,
	fwdMgr txmgrtypes.ForwarderManager[common.Address],
	txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, common.Address, common.Hash, EthTx, EthTxAttempt],
) *Txm

NewTxm creates a new Txm with the given configuration.

func (*Txm) Close

func (b *Txm) Close() (merr error)

func (*Txm) CreateEthTransaction

func (b *Txm) CreateEthTransaction(newTx NewTx, qs ...pg.QOpt) (etx EthTx, err error)

CreateEthTransaction inserts a new transaction

func (*Txm) GetForwarderForEOA

func (b *Txm) GetForwarderForEOA(eoa common.Address) (forwarder common.Address, err error)

Calls forwarderMgr to get a proper forwarder for a given EOA.

func (*Txm) HealthReport

func (b *Txm) HealthReport() map[string]error

func (*Txm) Name

func (b *Txm) Name() string

func (*Txm) OnNewLongestChain

func (b *Txm) OnNewLongestChain(ctx context.Context, head *evmtypes.Head)

OnNewLongestChain conforms to HeadTrackable

func (*Txm) RegisterResumeCallback

func (b *Txm) RegisterResumeCallback(fn ResumeCallback)

func (*Txm) Reset

func (b *Txm) Reset(callback func(), addr common.Address, abandon bool) (err error)

Reset stops EthBroadcaster/EthConfirmer, executes callback, then starts them again

func (*Txm) SendEther

func (b *Txm) SendEther(chainID *big.Int, from, to common.Address, value assets.Eth, gasLimit uint32) (etx EthTx, err error)

SendEther creates a transaction that transfers the given value of ether

func (*Txm) Start

func (b *Txm) Start(ctx context.Context) (merr error)

Start starts Txm service. The provided context can be used to terminate Start sequence.

func (*Txm) Trigger

func (b *Txm) Trigger(addr common.Address)

Trigger forces the EthBroadcaster to check early for the given address

type VRFV1Checker

type VRFV1Checker struct {

	// Callbacks checks whether a VRF V1 request has already been fulfilled on the VRFCoordinator
	// Solidity contract
	Callbacks func(opts *bind.CallOpts, reqID [32]byte) (v1.Callbacks, error)

	Client evmclient.Client
}

VRFV1Checker is an implementation of TransmitChecker that checks whether a VRF V1 fulfillment has already been fulfilled.

func (*VRFV1Checker) Check

func (v *VRFV1Checker) Check(
	ctx context.Context,
	l logger.Logger,
	tx EthTx,
	_ EthTxAttempt,
) error

Check satisfies the TransmitChecker interface.

type VRFV2Checker

type VRFV2Checker struct {

	// GetCommitment checks whether a VRF V2 request has been fulfilled on the VRFCoordinatorV2
	// Solidity contract.
	GetCommitment func(opts *bind.CallOpts, requestID *big.Int) ([32]byte, error)

	// HeadByNumber fetches the head given the number. If nil is provided,
	// the latest header is fetched.
	HeadByNumber func(ctx context.Context, n *big.Int) (*types.Head, error)

	// RequestBlockNumber is the block number of the VRFV2 request.
	RequestBlockNumber *big.Int
}

VRFV2Checker is an implementation of TransmitChecker that checks whether a VRF V2 fulfillment has already been fulfilled.

func (*VRFV2Checker) Check

func (v *VRFV2Checker) Check(
	ctx context.Context,
	l logger.Logger,
	tx EthTx,
	_ EthTxAttempt,
) error

Check satisfies the TransmitChecker interface.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL