mempool

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MempoolChannel = p2p.ChannelID(0x30)

	// PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind
	PeerCatchupSleepIntervalMS = 100

	// UnknownPeerID is the peer ID to use when running CheckTx when there is
	// no peer (e.g. RPC)
	UnknownPeerID uint16 = 0

	MaxActiveIDs = math.MaxUint16
)
View Source
const (
	// MetricsSubsystem is a subsystem shared by all metrics exposed by this
	// package.
	MetricsSubsystem = "mempool"
)

Variables

This section is empty.

Functions

func GetChannelDescriptor

func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor

getChannelDescriptor produces an instance of a descriptor for this package's required channels.

Types

type IDs

type IDs struct {
	// contains filtered or unexported fields
}

func NewMempoolIDs

func NewMempoolIDs() *IDs

func (*IDs) GetForPeer

func (ids *IDs) GetForPeer(peerID types.NodeID) uint16

GetForPeer returns an ID reserved for the peer.

func (*IDs) Reclaim

func (ids *IDs) Reclaim(peerID types.NodeID)

Reclaim returns the ID reserved for the peer back to unused pool.

func (*IDs) ReserveForPeer

func (ids *IDs) ReserveForPeer(peerID types.NodeID)

ReserveForPeer searches for the next unused ID and assigns it to the provided peer.

type LRUTxCache

type LRUTxCache struct {
	// contains filtered or unexported fields
}

LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache only stores the hash of the raw transaction.

func NewLRUTxCache

func NewLRUTxCache(cacheSize int) *LRUTxCache

func (*LRUTxCache) GetList

func (c *LRUTxCache) GetList() *list.List

GetList returns the underlying linked-list that backs the LRU cache. Note, this should be used for testing purposes only!

func (*LRUTxCache) Push

func (c *LRUTxCache) Push(tx types.Tx) bool

func (*LRUTxCache) Remove

func (c *LRUTxCache) Remove(tx types.Tx)

func (*LRUTxCache) Reset

func (c *LRUTxCache) Reset()

type Mempool

type Mempool interface {
	// CheckTx executes a new transaction against the application to determine
	// its validity and whether it should be added to the mempool.
	CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error

	// RemoveTxByKey removes a transaction, identified by its key,
	// from the mempool.
	RemoveTxByKey(txKey types.TxKey) error

	HasTx(txKey types.TxKey) bool

	GetTxsForKeys(txKeys []types.TxKey) types.Txs

	// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
	// bytes total with the condition that the total gasWanted must be less than
	// maxGas.
	//
	// If both maxes are negative, there is no cap on the size of all returned
	// transactions (~ all available transactions).
	ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

	// ReapMaxTxs reaps up to max transactions from the mempool. If max is
	// negative, there is no cap on the size of all returned transactions
	// (~ all available transactions).
	ReapMaxTxs(max int) types.Txs

	// Lock locks the mempool. The consensus must be able to hold lock to safely
	// update.
	Lock()

	// Unlock unlocks the mempool.
	Unlock()

	// Update informs the mempool that the given txs were committed and can be
	// discarded.
	//
	// NOTE:
	// 1. This should be called *after* block is committed by consensus.
	// 2. Lock/Unlock must be managed by the caller.
	Update(
		ctx context.Context,
		blockHeight int64,
		blockTxs types.Txs,
		txResults []*abci.ExecTxResult,
		newPreFn PreCheckFunc,
		newPostFn PostCheckFunc,
		recheck bool,
	) error

	// FlushAppConn flushes the mempool connection to ensure async callback calls
	// are done, e.g. from CheckTx.
	//
	// NOTE:
	// 1. Lock/Unlock must be managed by caller.
	FlushAppConn(context.Context) error

	// Flush removes all transactions from the mempool and caches.
	Flush()

	// TxsAvailable returns a channel which fires once for every height, and only
	// when transactions are available in the mempool.
	//
	// NOTE:
	// 1. The returned channel may be nil if EnableTxsAvailable was not called.
	TxsAvailable() <-chan struct{}

	// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
	// trigger once every height when transactions are available.
	EnableTxsAvailable()

	// Size returns the number of transactions in the mempool.
	Size() int

	// SizeBytes returns the total size of all txs in the mempool.
	SizeBytes() int64

	TxStore() *TxStore
}

Mempool defines the mempool interface.

Updates to the mempool need to be synchronized with committing a block so applications can reset their transient state on Commit.

type Metrics

type Metrics struct {
	// Number of uncommitted transactions in the mempool.
	Size metrics.Gauge

	// Histogram of transaction sizes in bytes.
	TxSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1,3,7"`

	// Number of failed transactions.
	FailedTxs metrics.Counter

	// RejectedTxs defines the number of rejected transactions. These are
	// transactions that passed CheckTx but failed to make it into the mempool
	// due to resource limits, e.g. mempool is full and no lower priority
	// transactions exist in the mempool.
	//metrics:Number of rejected transactions.
	RejectedTxs metrics.Counter

	// EvictedTxs defines the number of evicted transactions. These are valid
	// transactions that passed CheckTx and existed in the mempool but were later
	// evicted to make room for higher priority valid transactions that passed
	// CheckTx.
	//metrics:Number of evicted transactions.
	EvictedTxs metrics.Counter

	// Number of times transactions are rechecked in the mempool.
	RecheckTimes metrics.Counter
}

Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.

func NopMetrics

func NopMetrics() *Metrics

func PrometheusMetrics

func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics

type NopTxCache

type NopTxCache struct{}

NopTxCache defines a no-op raw transaction cache.

func (NopTxCache) Push

func (NopTxCache) Push(types.Tx) bool

func (NopTxCache) Remove

func (NopTxCache) Remove(types.Tx)

func (NopTxCache) Reset

func (NopTxCache) Reset()

type PeerEvictor

type PeerEvictor interface {
	Errored(types.NodeID, error)
}

type PostCheckFunc

type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.

func PostCheckMaxGas

func PostCheckMaxGas(maxGas int64) PostCheckFunc

PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.

type PreCheckFunc

type PreCheckFunc func(types.Tx) error

PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.

func PreCheckMaxBytes

func PreCheckMaxBytes(maxBytes int64) PreCheckFunc

PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.

type Reactor

type Reactor struct {
	service.BaseService
	// contains filtered or unexported fields
}

Reactor implements a service that contains mempool of txs that are broadcasted amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.

func NewReactor

func NewReactor(
	logger log.Logger,
	cfg *config.MempoolConfig,
	txmp *TxMempool,
	peerEvents p2p.PeerEventSubscriber,
) *Reactor

NewReactor returns a reference to a new reactor.

func (*Reactor) MarkReadyToStart

func (r *Reactor) MarkReadyToStart()

func (*Reactor) OnStart

func (r *Reactor) OnStart(ctx context.Context) error

OnStart starts separate go routines for each p2p Channel and listens for envelopes on each. In addition, it also listens for peer updates and handles messages on that p2p channel accordingly. The caller must be sure to execute OnStop to ensure the outbound p2p Channels are closed.

func (*Reactor) OnStop

func (r *Reactor) OnStop()

OnStop stops the reactor by signaling to all spawned goroutines to exit and blocking until they all exit.

func (*Reactor) SetChannel

func (r *Reactor) SetChannel(ch *p2p.Channel)

type TxCache

type TxCache interface {
	// Reset resets the cache to an empty state.
	Reset()

	// Push adds the given raw transaction to the cache and returns true if it was
	// newly added. Otherwise, it returns false.
	Push(tx types.Tx) bool

	// Remove removes the given raw transaction from the cache.
	Remove(tx types.Tx)
}

TxCache defines an interface for raw transaction caching in a mempool. Currently, a TxCache does not allow direct reading or getting of transaction values. A TxCache is used primarily to push transactions and removing transactions. Pushing via Push returns a boolean telling the caller if the transaction already exists in the cache or not.

type TxInfo

type TxInfo struct {
	// SenderID is the internal peer ID used in the mempool to identify the
	// sender, storing two bytes with each transaction instead of 20 bytes for
	// the types.NodeID.
	SenderID uint16

	// SenderNodeID is the actual types.NodeID of the sender.
	SenderNodeID types.NodeID
}

TxInfo are parameters that get passed when attempting to add a tx to the mempool.

type TxMempool

type TxMempool struct {
	// contains filtered or unexported fields
}

TxMempool defines a prioritized mempool data structure used by the v1 mempool reactor. It keeps a thread-safe priority queue of transactions that is used when a block proposer constructs a block and a thread-safe linked-list that is used to gossip transactions to peers in a FIFO manner.

func NewTxMempool

func NewTxMempool(
	logger log.Logger,
	cfg *config.MempoolConfig,
	proxyAppConn abciclient.Client,
	peerManager PeerEvictor,
	options ...TxMempoolOption,
) *TxMempool

func (*TxMempool) CheckTx

func (txmp *TxMempool) CheckTx(
	ctx context.Context,
	tx types.Tx,
	cb func(*abci.ResponseCheckTx),
	txInfo TxInfo,
) error

CheckTx executes the ABCI CheckTx method for a given transaction. It acquires a read-lock and attempts to execute the application's CheckTx ABCI method synchronously. We return an error if any of the following happen:

  • The CheckTx execution fails.
  • The transaction already exists in the cache and we've already received the transaction from the peer. Otherwise, if it solely exists in the cache, we return nil.
  • The transaction size exceeds the maximum transaction size as defined by the configuration provided to the mempool.
  • The transaction fails Pre-Check (if it is defined).
  • The proxyAppConn fails, e.g. the buffer is full.

If the mempool is full, we still execute CheckTx and attempt to find a lower priority transaction to evict. If such a transaction exists, we remove the lower priority transaction and add the new one with higher priority.

NOTE: - The applications' CheckTx implementation may panic. - The caller is not to explicitly require any locks for executing CheckTx.

func (*TxMempool) EnableTxsAvailable

func (txmp *TxMempool) EnableTxsAvailable()

EnableTxsAvailable enables the mempool to trigger events when transactions are available on a block by block basis.

func (*TxMempool) Flush

func (txmp *TxMempool) Flush()

Flush empties the mempool. It acquires a read-lock, fetches all the transactions currently in the transaction store and removes each transaction from the store and all indexes and finally resets the cache.

NOTE: - Flushing the mempool may leave the mempool in an inconsistent state.

func (*TxMempool) FlushAppConn

func (txmp *TxMempool) FlushAppConn(ctx context.Context) error

FlushAppConn executes FlushSync on the mempool's proxyAppConn.

NOTE: The caller must obtain a write-lock prior to execution.

func (*TxMempool) GetPeerFailedCheckTxCount

func (txmp *TxMempool) GetPeerFailedCheckTxCount(nodeID types.NodeID) uint64

func (*TxMempool) GetTxsForKeys

func (txmp *TxMempool) GetTxsForKeys(txKeys []types.TxKey) types.Txs

func (*TxMempool) HasTx

func (txmp *TxMempool) HasTx(txKey types.TxKey) bool

func (*TxMempool) Lock

func (txmp *TxMempool) Lock()

Lock obtains a write-lock on the mempool. A caller must be sure to explicitly release the lock when finished.

func (*TxMempool) NextGossipTx

func (txmp *TxMempool) NextGossipTx() *clist.CElement

NextGossipTx returns the next valid transaction to gossip. A caller must wait for WaitForNextTx to signal a transaction is available to gossip first. It is thread-safe.

func (*TxMempool) ReapMaxBytesMaxGas

func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

ReapMaxBytesMaxGas returns a list of transactions within the provided size and gas constraints. Transaction are retrieved in priority order.

NOTE:

  • Transactions returned are not removed from the mempool transaction store or indexes.

func (*TxMempool) ReapMaxTxs

func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs

ReapMaxTxs returns a list of transactions within the provided number of transactions bound. Transaction are retrieved in priority order.

NOTE:

  • Transactions returned are not removed from the mempool transaction store or indexes.

func (*TxMempool) RemoveTxByKey

func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error

func (*TxMempool) Size

func (txmp *TxMempool) Size() int

Size returns the number of valid transactions in the mempool. It is thread-safe.

func (*TxMempool) SizeBytes

func (txmp *TxMempool) SizeBytes() int64

SizeBytes return the total sum in bytes of all the valid transactions in the mempool. It is thread-safe.

func (*TxMempool) TxStore

func (txmp *TxMempool) TxStore() *TxStore

func (*TxMempool) TxsAvailable

func (txmp *TxMempool) TxsAvailable() <-chan struct{}

TxsAvailable returns a channel which fires once for every height, and only when transactions are available in the mempool. It is thread-safe.

func (*TxMempool) Unlock

func (txmp *TxMempool) Unlock()

Unlock releases a write-lock on the mempool.

func (*TxMempool) Update

func (txmp *TxMempool) Update(
	ctx context.Context,
	blockHeight int64,
	blockTxs types.Txs,
	execTxResult []*abci.ExecTxResult,
	newPreFn PreCheckFunc,
	newPostFn PostCheckFunc,
	recheck bool,
) error

Update iterates over all the transactions provided by the block producer, removes them from the cache (if applicable), and removes the transactions from the main transaction store and associated indexes. If there are transactions remaining in the mempool, we initiate a re-CheckTx for them (if applicable), otherwise, we notify the caller more transactions are available.

NOTE: - The caller must explicitly acquire a write-lock.

func (*TxMempool) WaitForNextTx

func (txmp *TxMempool) WaitForNextTx() <-chan struct{}

WaitForNextTx returns a blocking channel that will be closed when the next valid transaction is available to gossip. It is thread-safe.

type TxMempoolOption

type TxMempoolOption func(*TxMempool)

TxMempoolOption sets an optional parameter on the TxMempool.

func WithMetrics

func WithMetrics(metrics *Metrics) TxMempoolOption

WithMetrics sets the mempool's metrics collector.

func WithPostCheck

func WithPostCheck(f PostCheckFunc) TxMempoolOption

WithPostCheck sets a filter for the mempool to reject a transaction if f(tx, resp) returns an error. This is executed after CheckTx. It only applies to the first created block. After that, Update overwrites the existing value.

func WithPreCheck

func WithPreCheck(f PreCheckFunc) TxMempoolOption

WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) returns an error. This is executed before CheckTx. It only applies to the first created block. After that, Update() overwrites the existing value.

type TxPriorityQueue

type TxPriorityQueue struct {
	// contains filtered or unexported fields
}

TxPriorityQueue defines a thread-safe priority queue for valid transactions.

func NewTxPriorityQueue

func NewTxPriorityQueue() *TxPriorityQueue

func (*TxPriorityQueue) GetEvictableTxs

func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx

GetEvictableTxs attempts to find and return a list of *WrappedTx than can be evicted to make room for another *WrappedTx with higher priority. If no such list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx indicate that these transactions can be removed due to them being of lower priority and that their total sum in size allows room for the incoming transaction according to the mempool's configured limits.

func (*TxPriorityQueue) Len

func (pq *TxPriorityQueue) Len() int

Len implements the Heap interface.

NOTE: A caller should never call Len. Use NumTxs instead.

func (*TxPriorityQueue) Less

func (pq *TxPriorityQueue) Less(i, j int) bool

Less implements the Heap interface. It returns true if the transaction at position i in the queue is of less priority than the transaction at position j.

func (*TxPriorityQueue) NumTxs

func (pq *TxPriorityQueue) NumTxs() int

NumTxs returns the number of transactions in the priority queue. It is thread safe.

func (*TxPriorityQueue) Pop

func (pq *TxPriorityQueue) Pop() interface{}

Pop implements the Heap interface.

NOTE: A caller should never call Pop. Use PopTx instead.

func (*TxPriorityQueue) PopTx

func (pq *TxPriorityQueue) PopTx() *WrappedTx

PopTx removes the top priority transaction from the queue. It is thread safe.

func (*TxPriorityQueue) Push

func (pq *TxPriorityQueue) Push(x interface{})

Push implements the Heap interface.

NOTE: A caller should never call Push. Use PushTx instead.

func (*TxPriorityQueue) PushTx

func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)

PushTx adds a valid transaction to the priority queue. It is thread safe.

func (*TxPriorityQueue) RemoveTx

func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)

RemoveTx removes a specific transaction from the priority queue.

func (*TxPriorityQueue) Swap

func (pq *TxPriorityQueue) Swap(i, j int)

Swap implements the Heap interface. It swaps two transactions in the queue.

type TxStore

type TxStore struct {
	// contains filtered or unexported fields
}

TxStore implements a thread-safe mapping of valid transaction(s).

NOTE:

  • Concurrent read-only access to a *WrappedTx object is OK. However, mutative access is not allowed. Regardless, it is not expected for the mempool to need mutative access.

func NewTxStore

func NewTxStore() *TxStore

func (*TxStore) GetAllTxs

func (txs *TxStore) GetAllTxs() []*WrappedTx

GetAllTxs returns all the transactions currently in the store.

func (*TxStore) GetOrSetPeerByTxHash

func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool)

GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the given peerID to the WrappedTx's set of peers that sent us this transaction. We return true if we've already recorded the given peer for this transaction and false otherwise. If the transaction does not exist by hash, we return (nil, false).

func (*TxStore) GetTxByHash

func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx

GetTxByHash returns a *WrappedTx by the transaction's hash.

func (*TxStore) GetTxBySender

func (txs *TxStore) GetTxBySender(sender string) *WrappedTx

GetTxBySender returns a *WrappedTx by the transaction's sender property defined by the ABCI application.

func (*TxStore) IsTxRemoved

func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool

IsTxRemoved returns true if a transaction by hash is marked as removed and false otherwise.

func (*TxStore) RemoveTx

func (txs *TxStore) RemoveTx(wtx *WrappedTx)

RemoveTx removes a *WrappedTx from the transaction store. It deletes all indexes of the transaction.

func (*TxStore) SetTx

func (txs *TxStore) SetTx(wtx *WrappedTx)

SetTx stores a *WrappedTx by it's hash. If the transaction also contains a non-empty sender, we additionally store the transaction by the sender as defined by the ABCI application.

func (*TxStore) Size

func (txs *TxStore) Size() int

Size returns the total number of transactions in the store.

func (*TxStore) TxHasPeer

func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool

TxHasPeer returns true if a transaction by hash has a given peer ID and false otherwise. If the transaction does not exist, false is returned.

type WrappedTx

type WrappedTx struct {
	// contains filtered or unexported fields
}

WrappedTx defines a wrapper around a raw transaction with additional metadata that is used for indexing.

func (*WrappedTx) Size

func (wtx *WrappedTx) Size() int

type WrappedTxList

type WrappedTxList struct {
	// contains filtered or unexported fields
}

WrappedTxList implements a thread-safe list of *WrappedTx objects that can be used to build generic transaction indexes in the mempool. It accepts a comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx references which is used during Insert in order to determine sorted order. If less returns true, a <= b.

func NewWrappedTxList

func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList

func (*WrappedTxList) Insert

func (wtl *WrappedTxList) Insert(wtx *WrappedTx)

Insert inserts a WrappedTx reference into the sorted list based on the list's comparator function.

func (*WrappedTxList) Remove

func (wtl *WrappedTxList) Remove(wtx *WrappedTx)

Remove attempts to remove a WrappedTx from the sorted list.

func (*WrappedTxList) Reset

func (wtl *WrappedTxList) Reset()

Reset resets the list of transactions to an empty list.

func (*WrappedTxList) Size

func (wtl *WrappedTxList) Size() int

Size returns the number of WrappedTx objects in the list.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL