Documentation ¶
Index ¶
- Constants
- type IDs
- type LRUTxCache
- type Mempool
- type Metrics
- type NopTxCache
- type PostCheckFunc
- type PreCheckFunc
- type Reactor
- type TxCache
- type TxInfo
- type TxMempool
- func (txmp *TxMempool) CheckTx(ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), ...) error
- func (txmp *TxMempool) EnableTxsAvailable()
- func (txmp *TxMempool) Flush()
- func (txmp *TxMempool) FlushAppConn(ctx context.Context) error
- func (txmp *TxMempool) Lock()
- func (txmp *TxMempool) NextGossipTx() *clist.CElement
- func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
- func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs
- func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error
- func (txmp *TxMempool) Size() int
- func (txmp *TxMempool) SizeBytes() int64
- func (txmp *TxMempool) TxsAvailable() <-chan struct{}
- func (txmp *TxMempool) Unlock()
- func (txmp *TxMempool) Update(ctx context.Context, blockHeight int64, blockTxs types.Txs, ...) error
- func (txmp *TxMempool) WaitForNextTx() <-chan struct{}
- type TxMempoolOption
- type TxPriorityQueue
- func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx
- func (pq *TxPriorityQueue) Len() int
- func (pq *TxPriorityQueue) Less(i, j int) bool
- func (pq *TxPriorityQueue) NumTxs() int
- func (pq *TxPriorityQueue) Pop() interface{}
- func (pq *TxPriorityQueue) PopTx() *WrappedTx
- func (pq *TxPriorityQueue) Push(x interface{})
- func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)
- func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)
- func (pq *TxPriorityQueue) Swap(i, j int)
- type TxStore
- func (txs *TxStore) GetAllTxs() []*WrappedTx
- func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool)
- func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx
- func (txs *TxStore) GetTxBySender(sender string) *WrappedTx
- func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool
- func (txs *TxStore) RemoveTx(wtx *WrappedTx)
- func (txs *TxStore) SetTx(wtx *WrappedTx)
- func (txs *TxStore) Size() int
- func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool
- type WrappedTx
- type WrappedTxList
Constants ¶
const ( MempoolChannel = p2p.ChannelID(0x30) // PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind PeerCatchupSleepIntervalMS = 100 // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 MaxActiveIDs = math.MaxUint16 )
const ( // MetricsSubsystem is a subsystem shared by all metrics exposed by this // package. MetricsSubsystem = "mempool" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type IDs ¶
type IDs struct {
// contains filtered or unexported fields
}
func NewMempoolIDs ¶
func NewMempoolIDs() *IDs
func (*IDs) GetForPeer ¶
GetForPeer returns an ID reserved for the peer.
func (*IDs) ReserveForPeer ¶
ReserveForPeer searches for the next unused ID and assigns it to the provided peer.
type LRUTxCache ¶
type LRUTxCache struct {
// contains filtered or unexported fields
}
LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache only stores the hash of the raw transaction.
func NewLRUTxCache ¶
func NewLRUTxCache(cacheSize int) *LRUTxCache
func (*LRUTxCache) GetList ¶
func (c *LRUTxCache) GetList() *list.List
GetList returns the underlying linked-list that backs the LRU cache. Note, this should be used for testing purposes only!
func (*LRUTxCache) Remove ¶
func (c *LRUTxCache) Remove(tx types.Tx)
func (*LRUTxCache) Reset ¶
func (c *LRUTxCache) Reset()
type Mempool ¶
type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error // RemoveTxByKey removes a transaction, identified by its key, // from the mempool. RemoveTxByKey(txKey types.TxKey) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than // maxGas. // // If both maxes are negative, there is no cap on the size of all returned // transactions (~ all available transactions). ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs // ReapMaxTxs reaps up to max transactions from the mempool. If max is // negative, there is no cap on the size of all returned transactions // (~ all available transactions). ReapMaxTxs(max int) types.Txs // Lock locks the mempool. The consensus must be able to hold lock to safely // update. Lock() // Unlock unlocks the mempool. Unlock() // Update informs the mempool that the given txs were committed and can be // discarded. // // NOTE: // 1. This should be called *after* block is committed by consensus. // 2. Lock/Unlock must be managed by the caller. Update( ctx context.Context, blockHeight int64, blockTxs types.Txs, txResults []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, recheck bool, ) error // FlushAppConn flushes the mempool connection to ensure async callback calls // are done, e.g. from CheckTx. // // NOTE: // 1. Lock/Unlock must be managed by caller. FlushAppConn(context.Context) error // Flush removes all transactions from the mempool and caches. Flush() // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. // // NOTE: // 1. The returned channel may be nil if EnableTxsAvailable was not called. TxsAvailable() <-chan struct{} // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will // trigger once every height when transactions are available. EnableTxsAvailable() // Size returns the number of transactions in the mempool. Size() int // SizeBytes returns the total size of all txs in the mempool. SizeBytes() int64 }
Mempool defines the mempool interface.
Updates to the mempool need to be synchronized with committing a block so applications can reset their transient state on Commit.
type Metrics ¶
type Metrics struct { // Number of uncommitted transactions in the mempool. Size metrics.Gauge // Histogram of transaction sizes in bytes. TxSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1,3,7"` // Number of failed transactions. FailedTxs metrics.Counter // RejectedTxs defines the number of rejected transactions. These are // transactions that passed CheckTx but failed to make it into the mempool // due to resource limits, e.g. mempool is full and no lower priority // transactions exist in the mempool. //metrics:Number of rejected transactions. RejectedTxs metrics.Counter // EvictedTxs defines the number of evicted transactions. These are valid // transactions that passed CheckTx and existed in the mempool but were later // evicted to make room for higher priority valid transactions that passed // CheckTx. //metrics:Number of evicted transactions. EvictedTxs metrics.Counter // Number of times transactions are rechecked in the mempool. RecheckTimes metrics.Counter }
Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.
func NopMetrics ¶
func NopMetrics() *Metrics
func PrometheusMetrics ¶
type NopTxCache ¶
type NopTxCache struct{}
NopTxCache defines a no-op raw transaction cache.
func (NopTxCache) Remove ¶
func (NopTxCache) Remove(types.Tx)
func (NopTxCache) Reset ¶
func (NopTxCache) Reset()
type PostCheckFunc ¶
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error
PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.
func PostCheckMaxGas ¶
func PostCheckMaxGas(maxGas int64) PostCheckFunc
PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.
type PreCheckFunc ¶
PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.
func PreCheckMaxBytes ¶
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc
PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
type Reactor ¶
type Reactor struct { service.BaseService // contains filtered or unexported fields }
Reactor implements a service that contains mempool of txs that are broadcasted amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.
func NewReactor ¶
func NewReactor( logger log.Logger, cfg *config.MempoolConfig, txmp *TxMempool, chCreator p2p.ChannelCreator, peerEvents p2p.PeerEventSubscriber, ) *Reactor
NewReactor returns a reference to a new reactor.
func (*Reactor) OnStart ¶
OnStart starts separate go routines for each p2p Channel and listens for envelopes on each. In addition, it also listens for peer updates and handles messages on that p2p channel accordingly. The caller must be sure to execute OnStop to ensure the outbound p2p Channels are closed.
type TxCache ¶
type TxCache interface { // Reset resets the cache to an empty state. Reset() // Push adds the given raw transaction to the cache and returns true if it was // newly added. Otherwise, it returns false. Push(tx types.Tx) bool // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) }
TxCache defines an interface for raw transaction caching in a mempool. Currently, a TxCache does not allow direct reading or getting of transaction values. A TxCache is used primarily to push transactions and removing transactions. Pushing via Push returns a boolean telling the caller if the transaction already exists in the cache or not.
type TxInfo ¶
type TxInfo struct { // SenderID is the internal peer ID used in the mempool to identify the // sender, storing two bytes with each transaction instead of 20 bytes for // the types.NodeID. SenderID uint16 // SenderNodeID is the actual types.NodeID of the sender. SenderNodeID types.NodeID }
TxInfo are parameters that get passed when attempting to add a tx to the mempool.
type TxMempool ¶
type TxMempool struct {
// contains filtered or unexported fields
}
TxMempool defines a prioritized mempool data structure used by the v1 mempool reactor. It keeps a thread-safe priority queue of transactions that is used when a block proposer constructs a block and a thread-safe linked-list that is used to gossip transactions to peers in a FIFO manner.
func NewTxMempool ¶
func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, proxyAppConn abciclient.Client, options ...TxMempoolOption, ) *TxMempool
func (*TxMempool) CheckTx ¶
func (txmp *TxMempool) CheckTx( ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error
CheckTx executes the ABCI CheckTx method for a given transaction. It acquires a read-lock and attempts to execute the application's CheckTx ABCI method synchronously. We return an error if any of the following happen:
- The CheckTx execution fails.
- The transaction already exists in the cache and we've already received the transaction from the peer. Otherwise, if it solely exists in the cache, we return nil.
- The transaction size exceeds the maximum transaction size as defined by the configuration provided to the mempool.
- The transaction fails Pre-Check (if it is defined).
- The proxyAppConn fails, e.g. the buffer is full.
If the mempool is full, we still execute CheckTx and attempt to find a lower priority transaction to evict. If such a transaction exists, we remove the lower priority transaction and add the new one with higher priority.
NOTE: - The applications' CheckTx implementation may panic. - The caller is not to explicitly require any locks for executing CheckTx.
func (*TxMempool) EnableTxsAvailable ¶
func (txmp *TxMempool) EnableTxsAvailable()
EnableTxsAvailable enables the mempool to trigger events when transactions are available on a block by block basis.
func (*TxMempool) Flush ¶
func (txmp *TxMempool) Flush()
Flush empties the mempool. It acquires a read-lock, fetches all the transactions currently in the transaction store and removes each transaction from the store and all indexes and finally resets the cache.
NOTE: - Flushing the mempool may leave the mempool in an inconsistent state.
func (*TxMempool) FlushAppConn ¶
FlushAppConn executes FlushSync on the mempool's proxyAppConn.
NOTE: The caller must obtain a write-lock prior to execution.
func (*TxMempool) Lock ¶
func (txmp *TxMempool) Lock()
Lock obtains a write-lock on the mempool. A caller must be sure to explicitly release the lock when finished.
func (*TxMempool) NextGossipTx ¶
NextGossipTx returns the next valid transaction to gossip. A caller must wait for WaitForNextTx to signal a transaction is available to gossip first. It is thread-safe.
func (*TxMempool) ReapMaxBytesMaxGas ¶
ReapMaxBytesMaxGas returns a list of transactions within the provided size and gas constraints. Transaction are retrieved in priority order.
NOTE:
- Transactions returned are not removed from the mempool transaction store or indexes.
func (*TxMempool) ReapMaxTxs ¶
ReapMaxTxs returns a list of transactions within the provided number of transactions bound. Transaction are retrieved in priority order.
NOTE:
- Transactions returned are not removed from the mempool transaction store or indexes.
func (*TxMempool) Size ¶
Size returns the number of valid transactions in the mempool. It is thread-safe.
func (*TxMempool) SizeBytes ¶
SizeBytes return the total sum in bytes of all the valid transactions in the mempool. It is thread-safe.
func (*TxMempool) TxsAvailable ¶
func (txmp *TxMempool) TxsAvailable() <-chan struct{}
TxsAvailable returns a channel which fires once for every height, and only when transactions are available in the mempool. It is thread-safe.
func (*TxMempool) Unlock ¶
func (txmp *TxMempool) Unlock()
Unlock releases a write-lock on the mempool.
func (*TxMempool) Update ¶
func (txmp *TxMempool) Update( ctx context.Context, blockHeight int64, blockTxs types.Txs, execTxResult []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, recheck bool, ) error
Update iterates over all the transactions provided by the block producer, removes them from the cache (if applicable), and removes the transactions from the main transaction store and associated indexes. If there are transactions remaining in the mempool, we initiate a re-CheckTx for them (if applicable), otherwise, we notify the caller more transactions are available.
NOTE: - The caller must explicitly acquire a write-lock.
func (*TxMempool) WaitForNextTx ¶
func (txmp *TxMempool) WaitForNextTx() <-chan struct{}
WaitForNextTx returns a blocking channel that will be closed when the next valid transaction is available to gossip. It is thread-safe.
type TxMempoolOption ¶
type TxMempoolOption func(*TxMempool)
TxMempoolOption sets an optional parameter on the TxMempool.
func WithMetrics ¶
func WithMetrics(metrics *Metrics) TxMempoolOption
WithMetrics sets the mempool's metrics collector.
func WithPostCheck ¶
func WithPostCheck(f PostCheckFunc) TxMempoolOption
WithPostCheck sets a filter for the mempool to reject a transaction if f(tx, resp) returns an error. This is executed after CheckTx. It only applies to the first created block. After that, Update overwrites the existing value.
func WithPreCheck ¶
func WithPreCheck(f PreCheckFunc) TxMempoolOption
WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) returns an error. This is executed before CheckTx. It only applies to the first created block. After that, Update() overwrites the existing value.
type TxPriorityQueue ¶
type TxPriorityQueue struct {
// contains filtered or unexported fields
}
TxPriorityQueue defines a thread-safe priority queue for valid transactions.
func NewTxPriorityQueue ¶
func NewTxPriorityQueue() *TxPriorityQueue
func (*TxPriorityQueue) GetEvictableTxs ¶
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx
GetEvictableTxs attempts to find and return a list of *WrappedTx than can be evicted to make room for another *WrappedTx with higher priority. If no such list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx indicate that these transactions can be removed due to them being of lower priority and that their total sum in size allows room for the incoming transaction according to the mempool's configured limits.
func (*TxPriorityQueue) Len ¶
func (pq *TxPriorityQueue) Len() int
Len implements the Heap interface.
NOTE: A caller should never call Len. Use NumTxs instead.
func (*TxPriorityQueue) Less ¶
func (pq *TxPriorityQueue) Less(i, j int) bool
Less implements the Heap interface. It returns true if the transaction at position i in the queue is of less priority than the transaction at position j.
func (*TxPriorityQueue) NumTxs ¶
func (pq *TxPriorityQueue) NumTxs() int
NumTxs returns the number of transactions in the priority queue. It is thread safe.
func (*TxPriorityQueue) Pop ¶
func (pq *TxPriorityQueue) Pop() interface{}
Pop implements the Heap interface.
NOTE: A caller should never call Pop. Use PopTx instead.
func (*TxPriorityQueue) PopTx ¶
func (pq *TxPriorityQueue) PopTx() *WrappedTx
PopTx removes the top priority transaction from the queue. It is thread safe.
func (*TxPriorityQueue) Push ¶
func (pq *TxPriorityQueue) Push(x interface{})
Push implements the Heap interface.
NOTE: A caller should never call Push. Use PushTx instead.
func (*TxPriorityQueue) PushTx ¶
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)
PushTx adds a valid transaction to the priority queue. It is thread safe.
func (*TxPriorityQueue) RemoveTx ¶
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)
RemoveTx removes a specific transaction from the priority queue.
func (*TxPriorityQueue) Swap ¶
func (pq *TxPriorityQueue) Swap(i, j int)
Swap implements the Heap interface. It swaps two transactions in the queue.
type TxStore ¶
type TxStore struct {
// contains filtered or unexported fields
}
TxStore implements a thread-safe mapping of valid transaction(s).
NOTE:
- Concurrent read-only access to a *WrappedTx object is OK. However, mutative access is not allowed. Regardless, it is not expected for the mempool to need mutative access.
func NewTxStore ¶
func NewTxStore() *TxStore
func (*TxStore) GetOrSetPeerByTxHash ¶
GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the given peerID to the WrappedTx's set of peers that sent us this transaction. We return true if we've already recorded the given peer for this transaction and false otherwise. If the transaction does not exist by hash, we return (nil, false).
func (*TxStore) GetTxByHash ¶
GetTxByHash returns a *WrappedTx by the transaction's hash.
func (*TxStore) GetTxBySender ¶
GetTxBySender returns a *WrappedTx by the transaction's sender property defined by the ABCI application.
func (*TxStore) IsTxRemoved ¶
IsTxRemoved returns true if a transaction by hash is marked as removed and false otherwise.
func (*TxStore) RemoveTx ¶
RemoveTx removes a *WrappedTx from the transaction store. It deletes all indexes of the transaction.
func (*TxStore) SetTx ¶
SetTx stores a *WrappedTx by it's hash. If the transaction also contains a non-empty sender, we additionally store the transaction by the sender as defined by the ABCI application.
type WrappedTx ¶
type WrappedTx struct {
// contains filtered or unexported fields
}
WrappedTx defines a wrapper around a raw transaction with additional metadata that is used for indexing.
type WrappedTxList ¶
type WrappedTxList struct {
// contains filtered or unexported fields
}
WrappedTxList implements a thread-safe list of *WrappedTx objects that can be used to build generic transaction indexes in the mempool. It accepts a comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx references which is used during Insert in order to determine sorted order. If less returns true, a <= b.
func NewWrappedTxList ¶
func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList
func (*WrappedTxList) Insert ¶
func (wtl *WrappedTxList) Insert(wtx *WrappedTx)
Insert inserts a WrappedTx reference into the sorted list based on the list's comparator function.
func (*WrappedTxList) Remove ¶
func (wtl *WrappedTxList) Remove(wtx *WrappedTx)
Remove attempts to remove a WrappedTx from the sorted list.
func (*WrappedTxList) Reset ¶
func (wtl *WrappedTxList) Reset()
Reset resets the list of transactions to an empty list.
func (*WrappedTxList) Size ¶
func (wtl *WrappedTxList) Size() int
Size returns the number of WrappedTx objects in the list.