Documentation ¶
Overview ¶
TODO: Better handle abci client errors. (make it automatically handle connection errors)
Index ¶
- Constants
- Variables
- func IsPreCheckError(err error) bool
- type CListMempool
- func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo) error
- func (mem *CListMempool) EnableTxsAvailable()
- func (mem *CListMempool) Flush()
- func (mem *CListMempool) FlushAppConn() error
- func (mem *CListMempool) Lock()
- func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
- func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
- func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error
- func (mem *CListMempool) SetLogger(l log.Logger)
- func (mem *CListMempool) Size() int
- func (mem *CListMempool) SizeBytes() int64
- func (mem *CListMempool) TxsAvailable() <-chan struct{}
- func (mem *CListMempool) TxsFront() *clist.CElement
- func (mem *CListMempool) TxsWaitChan() <-chan struct{}
- func (mem *CListMempool) Unlock()
- func (mem *CListMempool) Update(height int64, txs types.Txs, txResults []*abci.ExecTxResult, ...) error
- type CListMempoolOption
- type ErrAppConnMempool
- type ErrCheckTxAsync
- type ErrFlushAppConn
- type ErrMempoolIsFull
- type ErrPreCheck
- type ErrTxTooLarge
- type LRUTxCache
- type Mempool
- type Metrics
- type NopMempool
- func (*NopMempool) CheckTx(types.Tx, func(*abci.ResponseCheckTx), TxInfo) error
- func (*NopMempool) EnableTxsAvailable()
- func (*NopMempool) Flush()
- func (*NopMempool) FlushAppConn() error
- func (*NopMempool) Lock()
- func (*NopMempool) ReapMaxBytesMaxGas(int64, int64) types.Txs
- func (*NopMempool) ReapMaxTxs(int) types.Txs
- func (*NopMempool) RemoveTxByKey(types.TxKey) error
- func (*NopMempool) SetTxRemovedCallback(func(txKey types.TxKey))
- func (*NopMempool) Size() int
- func (*NopMempool) SizeBytes() int64
- func (*NopMempool) TxsAvailable() <-chan struct{}
- func (*NopMempool) Unlock()
- func (*NopMempool) Update(int64, types.Txs, []*abci.ExecTxResult, PreCheckFunc, PostCheckFunc) error
- type NopMempoolReactor
- func (*NopMempoolReactor) AddPeer(p2p.Peer)
- func (*NopMempoolReactor) GetChannels() []*p2p.ChannelDescriptor
- func (*NopMempoolReactor) InitPeer(p2p.Peer) p2p.Peer
- func (*NopMempoolReactor) Receive(p2p.Envelope)
- func (*NopMempoolReactor) RemovePeer(p2p.Peer, interface{})
- func (*NopMempoolReactor) SetSwitch(*p2p.Switch)
- type NopTxCache
- type PeerState
- type PostCheckFunc
- type PreCheckFunc
- type Reactor
- func (memR *Reactor) AddPeer(peer p2p.Peer)
- func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
- func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer
- func (memR *Reactor) OnStart() error
- func (memR *Reactor) Receive(e p2p.Envelope)
- func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{})
- func (memR *Reactor) SetLogger(l log.Logger)
- type TxCache
- type TxInfo
- type TxKey
- type TxsMessage
Constants ¶
const ( MempoolChannel = byte(0x30) // PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind PeerCatchupSleepIntervalMS = 100 // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 MaxActiveIDs = math.MaxUint16 )
const ( // MetricsSubsystem is a subsystem shared by all metrics exposed by this // package. MetricsSubsystem = "mempool" )
Variables ¶
var ErrTxInCache = errors.New("tx already exists in cache")
ErrTxInCache is returned to the client if we saw tx earlier
var ErrTxNotFound = errors.New("transaction not found in mempool")
ErrTxNotFound is returned to the client if tx is not found in mempool
Functions ¶
func IsPreCheckError ¶
IsPreCheckError returns true if err is due to pre check failure.
Types ¶
type CListMempool ¶
type CListMempool struct {
// contains filtered or unexported fields
}
CListMempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. The mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.
func NewCListMempool ¶
func NewCListMempool( cfg *config.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, options ...CListMempoolOption, ) *CListMempool
NewCListMempool returns a new mempool with the given configuration and connection to an application.
func (*CListMempool) CheckTx ¶
func (mem *CListMempool) CheckTx( tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error
It blocks if we're waiting on Update() or Reap(). cb: A callback from the CheckTx command.
It gets called from another goroutine.
CONTRACT: Either cb will get called, or err returned.
Safe for concurrent use by multiple goroutines.
func (*CListMempool) EnableTxsAvailable ¶
func (mem *CListMempool) EnableTxsAvailable()
NOTE: not thread safe - should only be called once, on startup
func (*CListMempool) Flush ¶
func (mem *CListMempool) Flush()
XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
func (*CListMempool) FlushAppConn ¶
func (mem *CListMempool) FlushAppConn() error
Lock() must be help by the caller during execution.
func (*CListMempool) Lock ¶
func (mem *CListMempool) Lock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxBytesMaxGas ¶
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxTxs ¶
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) RemoveTxByKey ¶
func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error
RemoveTxByKey removes a transaction from the mempool by its TxKey index. Called from:
- Update (lock held) if tx was committed
- resCbRecheck (lock not held) if tx was invalidated
func (*CListMempool) SetLogger ¶
func (mem *CListMempool) SetLogger(l log.Logger)
SetLogger sets the Logger.
func (*CListMempool) Size ¶
func (mem *CListMempool) Size() int
Safe for concurrent use by multiple goroutines.
func (*CListMempool) SizeBytes ¶
func (mem *CListMempool) SizeBytes() int64
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsAvailable ¶
func (mem *CListMempool) TxsAvailable() <-chan struct{}
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsFront ¶
func (mem *CListMempool) TxsFront() *clist.CElement
TxsFront returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. FIXME: leaking implementation details!
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsWaitChan ¶
func (mem *CListMempool) TxsWaitChan() <-chan struct{}
TxsWaitChan returns a channel to wait on transactions. It will be closed once the mempool is not empty (ie. the internal `mem.txs` has at least one element)
Safe for concurrent use by multiple goroutines.
func (*CListMempool) Unlock ¶
func (mem *CListMempool) Unlock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) Update ¶
func (mem *CListMempool) Update( height int64, txs types.Txs, txResults []*abci.ExecTxResult, preCheck PreCheckFunc, postCheck PostCheckFunc, ) error
Lock() must be help by the caller during execution.
type CListMempoolOption ¶
type CListMempoolOption func(*CListMempool)
CListMempoolOption sets an optional parameter on the mempool.
func WithMetrics ¶
func WithMetrics(metrics *Metrics) CListMempoolOption
WithMetrics sets the metrics.
func WithPostCheck ¶
func WithPostCheck(f PostCheckFunc) CListMempoolOption
WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran after CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
func WithPreCheck ¶
func WithPreCheck(f PreCheckFunc) CListMempoolOption
WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran before CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
type ErrAppConnMempool ¶
type ErrAppConnMempool struct {
Err error
}
func (ErrAppConnMempool) Error ¶
func (e ErrAppConnMempool) Error() string
func (ErrAppConnMempool) Unwrap ¶
func (e ErrAppConnMempool) Unwrap() error
type ErrCheckTxAsync ¶
type ErrCheckTxAsync struct {
Err error
}
func (ErrCheckTxAsync) Error ¶
func (e ErrCheckTxAsync) Error() string
func (ErrCheckTxAsync) Unwrap ¶
func (e ErrCheckTxAsync) Unwrap() error
type ErrFlushAppConn ¶
type ErrFlushAppConn struct {
Err error
}
func (ErrFlushAppConn) Error ¶
func (e ErrFlushAppConn) Error() string
func (ErrFlushAppConn) Unwrap ¶
func (e ErrFlushAppConn) Unwrap() error
type ErrMempoolIsFull ¶
ErrMempoolIsFull defines an error where CometBFT and the application cannot handle that much load.
func (ErrMempoolIsFull) Error ¶
func (e ErrMempoolIsFull) Error() string
type ErrPreCheck ¶
type ErrPreCheck struct {
Err error
}
ErrPreCheck defines an error where a transaction fails a pre-check.
func (ErrPreCheck) Error ¶
func (e ErrPreCheck) Error() string
func (ErrPreCheck) Unwrap ¶
func (e ErrPreCheck) Unwrap() error
type ErrTxTooLarge ¶
ErrTxTooLarge defines an error when a transaction is too big to be sent in a message to other peers.
func (ErrTxTooLarge) Error ¶
func (e ErrTxTooLarge) Error() string
type LRUTxCache ¶
type LRUTxCache struct {
// contains filtered or unexported fields
}
LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache only stores the hash of the raw transaction.
func NewLRUTxCache ¶
func NewLRUTxCache(cacheSize int) *LRUTxCache
func (*LRUTxCache) GetList ¶
func (c *LRUTxCache) GetList() *list.List
GetList returns the underlying linked-list that backs the LRU cache. Note, this should be used for testing purposes only!
func (*LRUTxCache) Remove ¶
func (c *LRUTxCache) Remove(tx types.Tx)
func (*LRUTxCache) Reset ¶
func (c *LRUTxCache) Reset()
type Mempool ¶
type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTx(tx types.Tx, callback func(*abci.ResponseCheckTx), txInfo TxInfo) error // RemoveTxByKey removes a transaction, identified by its key, // from the mempool. RemoveTxByKey(txKey types.TxKey) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than // maxGas. // // If both maxes are negative, there is no cap on the size of all returned // transactions (~ all available transactions). ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs // ReapMaxTxs reaps up to max transactions from the mempool. If max is // negative, there is no cap on the size of all returned transactions // (~ all available transactions). ReapMaxTxs(max int) types.Txs // Lock locks the mempool. The consensus must be able to hold lock to safely // update. Lock() // Unlock unlocks the mempool. Unlock() // Update informs the mempool that the given txs were committed and can be // discarded. // // NOTE: // 1. This should be called *after* block is committed by consensus. // 2. Lock/Unlock must be managed by the caller. Update( blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, ) error // FlushAppConn flushes the mempool connection to ensure async callback calls // are done, e.g. from CheckTx. // // NOTE: // 1. Lock/Unlock must be managed by caller. FlushAppConn() error // Flush removes all transactions from the mempool and caches. Flush() // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. // // NOTE: // 1. The returned channel may be nil if EnableTxsAvailable was not called. TxsAvailable() <-chan struct{} // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will // trigger once every height when transactions are available. EnableTxsAvailable() // Size returns the number of transactions in the mempool. Size() int // SizeBytes returns the total size of all txs in the mempool. SizeBytes() int64 }
Mempool defines the mempool interface.
Updates to the mempool need to be synchronized with committing a block so applications can reset their transient state on Commit.
type Metrics ¶
type Metrics struct { // Number of uncommitted transactions in the mempool. Size metrics.Gauge // Total size of the mempool in bytes. SizeBytes metrics.Gauge // Histogram of transaction sizes in bytes. TxSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1,3,7"` // Number of failed transactions. FailedTxs metrics.Counter // RejectedTxs defines the number of rejected transactions. These are // transactions that passed CheckTx but failed to make it into the mempool // due to resource limits, e.g. mempool is full and no lower priority // transactions exist in the mempool. //metrics:Number of rejected transactions. RejectedTxs metrics.Counter // EvictedTxs defines the number of evicted transactions. These are valid // transactions that passed CheckTx and existed in the mempool but were later // evicted to make room for higher priority valid transactions that passed // CheckTx. //metrics:Number of evicted transactions. EvictedTxs metrics.Counter // Number of times transactions are rechecked in the mempool. RecheckTimes metrics.Counter // Number of connections being actively used for gossiping transactions // (experimental feature). ActiveOutboundConnections metrics.Gauge }
Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.
func NopMetrics ¶
func NopMetrics() *Metrics
func PrometheusMetrics ¶
type NopMempool ¶
type NopMempool struct{}
NopMempool is a mempool that does nothing.
The ABCI app is responsible for storing, disseminating, and proposing transactions. See [ADR-111](../docs/architecture/adr-111-nop-mempool.md).
func (*NopMempool) CheckTx ¶
func (*NopMempool) CheckTx(types.Tx, func(*abci.ResponseCheckTx), TxInfo) error
CheckTx always returns an error.
func (*NopMempool) EnableTxsAvailable ¶
func (*NopMempool) EnableTxsAvailable()
EnableTxsAvailable does nothing.
func (*NopMempool) FlushAppConn ¶
func (*NopMempool) FlushAppConn() error
FlushAppConn does nothing.
func (*NopMempool) ReapMaxBytesMaxGas ¶
func (*NopMempool) ReapMaxBytesMaxGas(int64, int64) types.Txs
ReapMaxBytesMaxGas always returns nil.
func (*NopMempool) ReapMaxTxs ¶
func (*NopMempool) ReapMaxTxs(int) types.Txs
ReapMaxTxs always returns nil.
func (*NopMempool) RemoveTxByKey ¶
func (*NopMempool) RemoveTxByKey(types.TxKey) error
RemoveTxByKey always returns an error.
func (*NopMempool) SetTxRemovedCallback ¶
func (*NopMempool) SetTxRemovedCallback(func(txKey types.TxKey))
SetTxRemovedCallback does nothing.
func (*NopMempool) TxsAvailable ¶
func (*NopMempool) TxsAvailable() <-chan struct{}
TxsAvailable always returns nil.
func (*NopMempool) Update ¶
func (*NopMempool) Update( int64, types.Txs, []*abci.ExecTxResult, PreCheckFunc, PostCheckFunc, ) error
Update does nothing.
type NopMempoolReactor ¶
type NopMempoolReactor struct {
service.BaseService
}
NopMempoolReactor is a mempool reactor that does nothing.
func NewNopMempoolReactor ¶
func NewNopMempoolReactor() *NopMempoolReactor
NewNopMempoolReactor returns a new `nop` reactor.
To be used only in RPC.
func (*NopMempoolReactor) AddPeer ¶
func (*NopMempoolReactor) AddPeer(p2p.Peer)
AddPeer does nothing.
func (*NopMempoolReactor) GetChannels ¶
func (*NopMempoolReactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels always returns nil.
func (*NopMempoolReactor) InitPeer ¶
func (*NopMempoolReactor) InitPeer(p2p.Peer) p2p.Peer
InitPeer always returns nil.
func (*NopMempoolReactor) Receive ¶
func (*NopMempoolReactor) Receive(p2p.Envelope)
Receive does nothing.
func (*NopMempoolReactor) RemovePeer ¶
func (*NopMempoolReactor) RemovePeer(p2p.Peer, interface{})
RemovePeer does nothing.
func (*NopMempoolReactor) SetSwitch ¶
func (*NopMempoolReactor) SetSwitch(*p2p.Switch)
SetSwitch does nothing.
type NopTxCache ¶
type NopTxCache struct{}
NopTxCache defines a no-op raw transaction cache.
func (NopTxCache) Remove ¶
func (NopTxCache) Remove(types.Tx)
func (NopTxCache) Reset ¶
func (NopTxCache) Reset()
type PeerState ¶
type PeerState interface {
GetHeight() int64
}
PeerState describes the state of a peer.
type PostCheckFunc ¶
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error
PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.
func PostCheckMaxGas ¶
func PostCheckMaxGas(maxGas int64) PostCheckFunc
PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.
type PreCheckFunc ¶
PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.
func PreCheckMaxBytes ¶
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc
PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
type Reactor ¶
type Reactor struct { p2p.BaseReactor // contains filtered or unexported fields }
Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.
func NewReactor ¶
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor
NewReactor returns a new Reactor with the given config and mempool.
func (*Reactor) AddPeer ¶
AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (*Reactor) GetChannels ¶
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels implements Reactor by returning the list of channels for this reactor.
func (*Reactor) Receive ¶
Receive implements Reactor. It adds any received transactions to the mempool.
func (*Reactor) RemovePeer ¶
RemovePeer implements Reactor.
type TxCache ¶
type TxCache interface { // Reset resets the cache to an empty state. Reset() // Push adds the given raw transaction to the cache and returns true if it was // newly added. Otherwise, it returns false. Push(tx types.Tx) bool // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) // Has reports whether tx is present in the cache. Checking for presence is // not treated as an access of the value. Has(tx types.Tx) bool }
TxCache defines an interface for raw transaction caching in a mempool. Currently, a TxCache does not allow direct reading or getting of transaction values. A TxCache is used primarily to push transactions and removing transactions. Pushing via Push returns a boolean telling the caller if the transaction already exists in the cache or not.
type TxInfo ¶
type TxInfo struct { // SenderID is the internal peer ID used in the mempool to identify the // sender, storing two bytes with each transaction instead of 20 bytes for // the types.NodeID. SenderID uint16 // SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging. SenderP2PID p2p.ID }
TxInfo are parameters that get passed when attempting to add a tx to the mempool.
type TxsMessage ¶
TxsMessage is a Message containing transactions.
func (*TxsMessage) String ¶
func (m *TxsMessage) String() string
String returns a string representation of the TxsMessage.