Documentation ¶
Index ¶
- type PeerState
- type Reactor
- func (memR *Reactor) AddPeer(peer p2p.Peer)
- func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
- func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer
- func (memR *Reactor) OnStart() error
- func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
- func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{})
- func (memR *Reactor) SetLogger(l log.Logger)
- type TxMempool
- func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo) error
- func (txmp *TxMempool) EnableTxsAvailable()
- func (txmp *TxMempool) Flush()
- func (txmp *TxMempool) FlushAppConn() error
- func (txmp *TxMempool) Lock()
- func (txmp *TxMempool) NextGossipTx() *clist.CElement
- func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
- func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs
- func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error
- func (txmp *TxMempool) Size() int
- func (txmp *TxMempool) SizeBytes() int64
- func (txmp *TxMempool) TxsAvailable() <-chan struct{}
- func (txmp *TxMempool) Unlock()
- func (txmp *TxMempool) Update(blockHeight int64, blockTxs types.Txs, ...) error
- func (txmp *TxMempool) WaitForNextTx() <-chan struct{}
- type TxMempoolOption
- type TxPriorityQueue
- func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx
- func (pq *TxPriorityQueue) Len() int
- func (pq *TxPriorityQueue) Less(i, j int) bool
- func (pq *TxPriorityQueue) NumTxs() int
- func (pq *TxPriorityQueue) Pop() interface{}
- func (pq *TxPriorityQueue) PopTx() *WrappedTx
- func (pq *TxPriorityQueue) Push(x interface{})
- func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)
- func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)
- func (pq *TxPriorityQueue) Swap(i, j int)
- type TxStore
- func (txs *TxStore) GetAllTxs() []*WrappedTx
- func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool)
- func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx
- func (txs *TxStore) GetTxBySender(sender string) *WrappedTx
- func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool
- func (txs *TxStore) RemoveTx(wtx *WrappedTx)
- func (txs *TxStore) SetTx(wtx *WrappedTx)
- func (txs *TxStore) Size() int
- func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool
- type TxsMessage
- type WrappedTx
- type WrappedTxList
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PeerState ¶
type PeerState interface {
GetHeight() int64
}
PeerState describes the state of a peer.
type Reactor ¶
type Reactor struct { p2p.BaseReactor // contains filtered or unexported fields }
Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.
func NewReactor ¶
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor
NewReactor returns a new Reactor with the given config and mempool.
func (*Reactor) AddPeer ¶
AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (*Reactor) GetChannels ¶
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels implements Reactor by returning the list of channels for this reactor.
func (*Reactor) Receive ¶
Receive implements Reactor. It adds any received transactions to the mempool.
func (*Reactor) RemovePeer ¶
RemovePeer implements Reactor.
type TxMempool ¶
type TxMempool struct {
// contains filtered or unexported fields
}
TxMempool defines a prioritized mempool data structure used by the v1 mempool reactor. It keeps a thread-safe priority queue of transactions that is used when a block proposer constructs a block and a thread-safe linked-list that is used to gossip transactions to peers in a FIFO manner.
func NewTxMempool ¶
func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, options ...TxMempoolOption, ) *TxMempool
func (*TxMempool) CheckTx ¶
func (txmp *TxMempool) CheckTx( tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo, ) error
CheckTx executes the ABCI CheckTx method for a given transaction. It acquires a read-lock attempts to execute the application's CheckTx ABCI method via CheckTxAsync. We return an error if any of the following happen:
- The CheckTxAsync execution fails.
- The transaction already exists in the cache and we've already received the transaction from the peer. Otherwise, if it solely exists in the cache, we return nil.
- The transaction size exceeds the maximum transaction size as defined by the configuration provided to the mempool.
- The transaction fails Pre-Check (if it is defined).
- The proxyAppConn fails, e.g. the buffer is full.
If the mempool is full, we still execute CheckTx and attempt to find a lower priority transaction to evict. If such a transaction exists, we remove the lower priority transaction and add the new one with higher priority.
NOTE: - The applications' CheckTx implementation may panic. - The caller is not to explicitly require any locks for executing CheckTx.
func (*TxMempool) EnableTxsAvailable ¶
func (txmp *TxMempool) EnableTxsAvailable()
EnableTxsAvailable enables the mempool to trigger events when transactions are available on a block by block basis.
func (*TxMempool) Flush ¶
func (txmp *TxMempool) Flush()
Flush flushes out the mempool. It acquires a read-lock, fetches all the transactions currently in the transaction store and removes each transaction from the store and all indexes and finally resets the cache.
NOTE: - Flushing the mempool may leave the mempool in an inconsistent state.
func (*TxMempool) FlushAppConn ¶
FlushAppConn executes FlushSync on the mempool's proxyAppConn.
NOTE: The caller must obtain a write-lock via Lock() prior to execution.
func (*TxMempool) Lock ¶
func (txmp *TxMempool) Lock()
Lock obtains a write-lock on the mempool. A caller must be sure to explicitly release the lock when finished.
func (*TxMempool) NextGossipTx ¶
NextGossipTx returns the next valid transaction to gossip. A caller must wait for WaitForNextTx to signal a transaction is available to gossip first. It is thread-safe.
func (*TxMempool) ReapMaxBytesMaxGas ¶
ReapMaxBytesMaxGas returns a list of transactions within the provided size and gas constraints. Transaction are retrieved in priority order.
NOTE:
- A read-lock is acquired.
- Transactions returned are not actually removed from the mempool transaction store or indexes.
func (*TxMempool) ReapMaxTxs ¶
ReapMaxTxs returns a list of transactions within the provided number of transactions bound. Transaction are retrieved in priority order.
NOTE:
- A read-lock is acquired.
- Transactions returned are not actually removed from the mempool transaction store or indexes.
func (*TxMempool) Size ¶
Size returns the number of valid transactions in the mempool. It is thread-safe.
func (*TxMempool) SizeBytes ¶
SizeBytes return the total sum in bytes of all the valid transactions in the mempool. It is thread-safe.
func (*TxMempool) TxsAvailable ¶
func (txmp *TxMempool) TxsAvailable() <-chan struct{}
TxsAvailable returns a channel which fires once for every height, and only when transactions are available in the mempool. It is thread-safe.
func (*TxMempool) Unlock ¶
func (txmp *TxMempool) Unlock()
Unlock releases a write-lock on the mempool.
func (*TxMempool) Update ¶
func (txmp *TxMempool) Update( blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc, ) error
Update iterates over all the transactions provided by the caller, i.e. the block producer, and removes them from the cache (if applicable) and removes the transactions from the main transaction store and associated indexes. Finally, if there are trainsactions remaining in the mempool, we initiate a re-CheckTx for them (if applicable), otherwise, we notify the caller more transactions are available.
NOTE: - The caller must explicitly acquire a write-lock via Lock().
func (*TxMempool) WaitForNextTx ¶
func (txmp *TxMempool) WaitForNextTx() <-chan struct{}
WaitForNextTx returns a blocking channel that will be closed when the next valid transaction is available to gossip. It is thread-safe.
type TxMempoolOption ¶
type TxMempoolOption func(*TxMempool)
TxMempoolOption sets an optional parameter on the TxMempool.
func WithMetrics ¶
func WithMetrics(metrics *mempool.Metrics) TxMempoolOption
WithMetrics sets the mempool's metrics collector.
func WithPostCheck ¶
func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption
WithPostCheck sets a filter for the mempool to reject a transaction if f(tx, resp) returns an error. This is executed after CheckTx. It only applies to the first created block. After that, Update overwrites the existing value.
func WithPreCheck ¶
func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption
WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) returns an error. This is executed before CheckTx. It only applies to the first created block. After that, Update() overwrites the existing value.
type TxPriorityQueue ¶
type TxPriorityQueue struct {
// contains filtered or unexported fields
}
TxPriorityQueue defines a thread-safe priority queue for valid transactions.
func NewTxPriorityQueue ¶
func NewTxPriorityQueue() *TxPriorityQueue
func (*TxPriorityQueue) GetEvictableTxs ¶
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx
GetEvictableTxs attempts to find and return a list of *WrappedTx than can be evicted to make room for another *WrappedTx with higher priority. If no such list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx indicate that these transactions can be removed due to them being of lower priority and that their total sum in size allows room for the incoming transaction according to the mempool's configured limits.
func (*TxPriorityQueue) Len ¶
func (pq *TxPriorityQueue) Len() int
Len implements the Heap interface.
NOTE: A caller should never call Len. Use NumTxs instead.
func (*TxPriorityQueue) Less ¶
func (pq *TxPriorityQueue) Less(i, j int) bool
Less implements the Heap interface. It returns true if the transaction at position i in the queue is of less priority than the transaction at position j.
func (*TxPriorityQueue) NumTxs ¶
func (pq *TxPriorityQueue) NumTxs() int
NumTxs returns the number of transactions in the priority queue. It is thread safe.
func (*TxPriorityQueue) Pop ¶
func (pq *TxPriorityQueue) Pop() interface{}
Pop implements the Heap interface.
NOTE: A caller should never call Pop. Use PopTx instead.
func (*TxPriorityQueue) PopTx ¶
func (pq *TxPriorityQueue) PopTx() *WrappedTx
PopTx removes the top priority transaction from the queue. It is thread safe.
func (*TxPriorityQueue) Push ¶
func (pq *TxPriorityQueue) Push(x interface{})
Push implements the Heap interface.
NOTE: A caller should never call Push. Use PushTx instead.
func (*TxPriorityQueue) PushTx ¶
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)
PushTx adds a valid transaction to the priority queue. It is thread safe.
func (*TxPriorityQueue) RemoveTx ¶
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)
RemoveTx removes a specific transaction from the priority queue.
func (*TxPriorityQueue) Swap ¶
func (pq *TxPriorityQueue) Swap(i, j int)
Swap implements the Heap interface. It swaps two transactions in the queue.
type TxStore ¶
type TxStore struct {
// contains filtered or unexported fields
}
TxStore implements a thread-safe mapping of valid transaction(s).
NOTE:
- Concurrent read-only access to a *WrappedTx object is OK. However, mutative access is not allowed. Regardless, it is not expected for the mempool to need mutative access.
func NewTxStore ¶
func NewTxStore() *TxStore
func (*TxStore) GetOrSetPeerByTxHash ¶
GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the given peerID to the WrappedTx's set of peers that sent us this transaction. We return true if we've already recorded the given peer for this transaction and false otherwise. If the transaction does not exist by hash, we return (nil, false).
func (*TxStore) GetTxByHash ¶
GetTxByHash returns a *WrappedTx by the transaction's hash.
func (*TxStore) GetTxBySender ¶
GetTxBySender returns a *WrappedTx by the transaction's sender property defined by the ABCI application.
func (*TxStore) IsTxRemoved ¶
IsTxRemoved returns true if a transaction by hash is marked as removed and false otherwise.
func (*TxStore) RemoveTx ¶
RemoveTx removes a *WrappedTx from the transaction store. It deletes all indexes of the transaction.
func (*TxStore) SetTx ¶
SetTx stores a *WrappedTx by it's hash. If the transaction also contains a non-empty sender, we additionally store the transaction by the sender as defined by the ABCI application.
type TxsMessage ¶
TxsMessage is a Message containing transactions.
func (*TxsMessage) String ¶
func (m *TxsMessage) String() string
String returns a string representation of the TxsMessage.
type WrappedTx ¶
type WrappedTx struct {
// contains filtered or unexported fields
}
WrappedTx defines a wrapper around a raw transaction with additional metadata that is used for indexing.
type WrappedTxList ¶
type WrappedTxList struct {
// contains filtered or unexported fields
}
WrappedTxList implements a thread-safe list of *WrappedTx objects that can be used to build generic transaction indexes in the mempool. It accepts a comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx references which is used during Insert in order to determine sorted order. If less returns true, a <= b.
func NewWrappedTxList ¶
func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList
func (*WrappedTxList) Insert ¶
func (wtl *WrappedTxList) Insert(wtx *WrappedTx)
Insert inserts a WrappedTx reference into the sorted list based on the list's comparator function.
func (*WrappedTxList) Remove ¶
func (wtl *WrappedTxList) Remove(wtx *WrappedTx)
Remove attempts to remove a WrappedTx from the sorted list.
func (*WrappedTxList) Reset ¶
func (wtl *WrappedTxList) Reset()
Reset resets the list of transactions to an empty list.
func (*WrappedTxList) Size ¶
func (wtl *WrappedTxList) Size() int
Size returns the number of WrappedTx objects in the list.