Documentation
¶
Overview ¶
TODO: Better handle abci client errors. (make it automatically handle connection errors)
Index ¶
- Constants
- Variables
- func IsPreCheckError(err error) bool
- type CListIterator
- type CListMempool
- func (mem *CListMempool) CheckTx(tx types.Tx, sender p2p.ID) (*abcicli.ReqRes, error)
- func (mem *CListMempool) EnableTxsAvailable()
- func (mem *CListMempool) Flush()
- func (mem *CListMempool) FlushAppConn() error
- func (mem *CListMempool) InMempool(txKey types.TxKey) bool
- func (mem *CListMempool) Lock()
- func (mem *CListMempool) NewIterator(ctx context.Context) Iterator
- func (mem *CListMempool) PreUpdate()
- func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
- func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
- func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error
- func (mem *CListMempool) SetLogger(l log.Logger)
- func (mem *CListMempool) Size() int
- func (mem *CListMempool) SizeBytes() int64
- func (mem *CListMempool) TxsAvailable() <-chan struct{}
- func (mem *CListMempool) TxsFront() *clist.CElementdeprecated
- func (mem *CListMempool) TxsWaitChan() <-chan struct{}deprecated
- func (mem *CListMempool) Unlock()
- func (mem *CListMempool) Update(height int64, txs types.Txs, txResults []*abci.ExecTxResult, ...) error
- type CListMempoolOption
- type Entry
- type ErrAppConnMempool
- type ErrFlushAppConn
- type ErrMempoolIsFull
- type ErrPreCheck
- type ErrTxTooLarge
- type Iterator
- type LRUTxCache
- type Mempool
- type Metrics
- type NopMempool
- func (*NopMempool) CheckTx(types.Tx, p2p.ID) (*abcicli.ReqRes, error)
- func (*NopMempool) EnableTxsAvailable()
- func (*NopMempool) Flush()
- func (*NopMempool) FlushAppConn() error
- func (*NopMempool) Lock()
- func (*NopMempool) PreUpdate()
- func (*NopMempool) ReapMaxBytesMaxGas(int64, int64) types.Txs
- func (*NopMempool) ReapMaxTxs(int) types.Txs
- func (*NopMempool) RemoveTxByKey(types.TxKey) error
- func (*NopMempool) Size() int
- func (*NopMempool) SizeBytes() int64
- func (*NopMempool) TxsAvailable() <-chan struct{}
- func (*NopMempool) Unlock()
- func (*NopMempool) Update(int64, types.Txs, []*abci.ExecTxResult, PreCheckFunc, PostCheckFunc) error
- type NopMempoolReactor
- func (*NopMempoolReactor) AddPeer(p2p.Peer)
- func (*NopMempoolReactor) GetChannels() []*p2p.ChannelDescriptor
- func (*NopMempoolReactor) InitPeer(p2p.Peer) p2p.Peer
- func (*NopMempoolReactor) Receive(p2p.Envelope)
- func (*NopMempoolReactor) RemovePeer(p2p.Peer, any)
- func (*NopMempoolReactor) SetSwitch(*p2p.Switch)
- func (*NopMempoolReactor) WaitSync() bool
- type NopTxCache
- type PeerState
- type PostCheckFunc
- type PreCheckFunc
- type Reactor
- func (memR *Reactor) AddPeer(peer p2p.Peer)
- func (memR *Reactor) EnableInOutTxs()
- func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
- func (memR *Reactor) OnStart() error
- func (memR *Reactor) Receive(e p2p.Envelope)
- func (memR *Reactor) SetLogger(l log.Logger)
- func (memR *Reactor) WaitSync() bool
- type TxCache
- type TxKey
Constants ¶
const ( MempoolChannel = byte(0x30) // PeerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind. PeerCatchupSleepIntervalMS = 100 )
const ( // MetricsSubsystem is a subsystem shared by all metrics exposed by this // package. MetricsSubsystem = "mempool" )
Variables ¶
var ErrInvalidTx = errors.New("tx is invalid")
ErrInvalidTx is returned when a transaction that is trying to be added to the mempool is invalid.
var ErrLateRecheckResponse = errors.New("rechecking has finished; discard late recheck response")
ErrLateRecheckResponse is returned when a CheckTx response arrives after the rechecking process has finished.
var ErrRecheckFull = errors.New("mempool is still rechecking after a new committed block, so it is considered as full")
ErrRecheckFull is returned when checking if the mempool is full and rechecking is still in progress after a new block was committed.
var ErrTxAlreadyReceivedFromSender = errors.New("tx already received from the same sender")
ErrTxAlreadyReceivedFromSender is returned if when processing a tx already received from the same sender.
var ErrTxInCache = errors.New("tx already exists in cache")
ErrTxInCache is returned to the client if we saw tx earlier.
var ErrTxInMempool = errors.New("transaction already in mempool, not adding it again")
ErrTxInMempool is returned when a transaction that is trying to be added to the mempool is already there.
var ErrTxNotFound = errors.New("transaction not found in mempool")
ErrTxNotFound is returned to the client if tx is not found in mempool.
Functions ¶
func IsPreCheckError ¶
IsPreCheckError returns true if err is due to pre check failure.
Types ¶
type CListIterator ¶ added in v1.0.0
type CListIterator struct {
// contains filtered or unexported fields
}
CListIterator implements an Iterator that traverses the CList sequentially. When the current entry is removed from the mempool, the iterator starts from the beginning of the CList. When it reaches the end, it waits until a new entry is appended.
func (*CListIterator) WaitNextCh ¶ added in v1.0.0
func (iter *CListIterator) WaitNextCh() <-chan Entry
WaitNextCh returns a channel to wait for the next available entry. The channel will be explicitly closed when the entry gets removed before it is added to the channel, or when reaching the end of the list.
Unsafe for concurrent use by multiple goroutines.
type CListMempool ¶ added in v0.38.0
type CListMempool struct {
// contains filtered or unexported fields
}
CListMempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. The mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.
func NewCListMempool ¶ added in v0.38.0
func NewCListMempool( cfg *config.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, options ...CListMempoolOption, ) *CListMempool
NewCListMempool returns a new mempool with the given configuration and connection to an application.
func (*CListMempool) CheckTx ¶ added in v0.38.0
It blocks if we're waiting on Update() or Reap(). Safe for concurrent use by multiple goroutines.
func (*CListMempool) EnableTxsAvailable ¶ added in v0.38.0
func (mem *CListMempool) EnableTxsAvailable()
NOTE: not thread safe - should only be called once, on startup.
func (*CListMempool) Flush ¶ added in v0.38.0
func (mem *CListMempool) Flush()
XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
func (*CListMempool) FlushAppConn ¶ added in v0.38.0
func (mem *CListMempool) FlushAppConn() error
Lock() must be help by the caller during execution.
func (*CListMempool) InMempool ¶ added in v1.0.0
func (mem *CListMempool) InMempool(txKey types.TxKey) bool
func (*CListMempool) Lock ¶ added in v0.38.0
func (mem *CListMempool) Lock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) NewIterator ¶ added in v1.0.0
func (mem *CListMempool) NewIterator(ctx context.Context) Iterator
func (*CListMempool) PreUpdate ¶ added in v0.38.8
func (mem *CListMempool) PreUpdate()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxBytesMaxGas ¶ added in v0.38.0
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxTxs ¶ added in v0.38.0
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) RemoveTxByKey ¶ added in v0.38.0
func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error
RemoveTxByKey removes a transaction from the mempool by its TxKey index. Called from:
- Update (lock held) if tx was committed
- handleRecheckTxResponse (lock not held) if tx was invalidated
func (*CListMempool) SetLogger ¶ added in v0.38.0
func (mem *CListMempool) SetLogger(l log.Logger)
SetLogger sets the Logger.
func (*CListMempool) Size ¶ added in v0.38.0
func (mem *CListMempool) Size() int
Safe for concurrent use by multiple goroutines.
func (*CListMempool) SizeBytes ¶ added in v0.38.0
func (mem *CListMempool) SizeBytes() int64
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsAvailable ¶ added in v0.38.0
func (mem *CListMempool) TxsAvailable() <-chan struct{}
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsFront
deprecated
added in
v0.38.0
func (mem *CListMempool) TxsFront() *clist.CElement
TxsFront returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. FIXME: leaking implementation details!
Safe for concurrent use by multiple goroutines.
Deprecated: Use CListIterator instead.
func (*CListMempool) TxsWaitChan
deprecated
added in
v0.38.0
func (mem *CListMempool) TxsWaitChan() <-chan struct{}
TxsWaitChan returns a channel to wait on transactions. It will be closed once the mempool is not empty (ie. the internal `mem.txs` has at least one element)
Safe for concurrent use by multiple goroutines.
Deprecated: Use CListIterator instead.
func (*CListMempool) Unlock ¶ added in v0.38.0
func (mem *CListMempool) Unlock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) Update ¶ added in v0.38.0
func (mem *CListMempool) Update( height int64, txs types.Txs, txResults []*abci.ExecTxResult, preCheck PreCheckFunc, postCheck PostCheckFunc, ) error
Lock() must be help by the caller during execution. TODO: this function always returns nil; remove the return value.
type CListMempoolOption ¶ added in v0.38.0
type CListMempoolOption func(*CListMempool)
CListMempoolOption sets an optional parameter on the mempool.
func WithMetrics ¶ added in v0.38.0
func WithMetrics(metrics *Metrics) CListMempoolOption
WithMetrics sets the metrics.
func WithPostCheck ¶ added in v0.38.0
func WithPostCheck(f PostCheckFunc) CListMempoolOption
WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran after CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
func WithPreCheck ¶ added in v0.38.0
func WithPreCheck(f PreCheckFunc) CListMempoolOption
WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran before CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
type Entry ¶ added in v1.0.0
type Entry interface { // Tx returns the transaction stored in the entry. Tx() types.Tx // Height returns the height of the latest block at the moment the entry was created. Height() int64 // GasWanted returns the amount of gas required by the transaction. GasWanted() int64 // IsSender returns whether we received the transaction from the given peer ID. IsSender(peerID p2p.ID) bool }
An entry in the mempool.
type ErrAppConnMempool ¶ added in v0.38.6
type ErrAppConnMempool struct {
Err error
}
func (ErrAppConnMempool) Error ¶ added in v0.38.6
func (e ErrAppConnMempool) Error() string
func (ErrAppConnMempool) Unwrap ¶ added in v0.38.6
func (e ErrAppConnMempool) Unwrap() error
type ErrFlushAppConn ¶ added in v0.38.6
type ErrFlushAppConn struct {
Err error
}
func (ErrFlushAppConn) Error ¶ added in v0.38.6
func (e ErrFlushAppConn) Error() string
func (ErrFlushAppConn) Unwrap ¶ added in v0.38.6
func (e ErrFlushAppConn) Unwrap() error
type ErrMempoolIsFull ¶
ErrMempoolIsFull defines an error where CometBFT and the application cannot handle that much load.
func (ErrMempoolIsFull) Error ¶
func (e ErrMempoolIsFull) Error() string
type ErrPreCheck ¶
type ErrPreCheck struct {
Err error
}
ErrPreCheck defines an error where a transaction fails a pre-check.
func (ErrPreCheck) Error ¶
func (e ErrPreCheck) Error() string
func (ErrPreCheck) Unwrap ¶ added in v0.38.6
func (e ErrPreCheck) Unwrap() error
type ErrTxTooLarge ¶
ErrTxTooLarge defines an error when a transaction is too big to be sent in a message to other peers.
func (ErrTxTooLarge) Error ¶
func (e ErrTxTooLarge) Error() string
type Iterator ¶ added in v1.0.0
type Iterator interface { // WaitNextCh returns a channel on which to wait for the next available entry. WaitNextCh() <-chan Entry }
An iterator is used to iterate through the mempool entries. Multiple iterators should be allowed to run concurrently.
type LRUTxCache ¶
type LRUTxCache struct {
// contains filtered or unexported fields
}
LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache only stores the hash of the raw transaction.
func NewLRUTxCache ¶
func NewLRUTxCache(cacheSize int) *LRUTxCache
func (*LRUTxCache) GetList ¶
func (c *LRUTxCache) GetList() *list.List
GetList returns the underlying linked-list that backs the LRU cache. Note, this should be used for testing purposes only!
func (*LRUTxCache) Remove ¶
func (c *LRUTxCache) Remove(tx types.Tx)
type Mempool ¶
type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTx(tx types.Tx, sender p2p.ID) (*abcicli.ReqRes, error) // RemoveTxByKey removes a transaction, identified by its key, // from the mempool. RemoveTxByKey(txKey types.TxKey) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than // maxGas. // // If both maxes are negative, there is no cap on the size of all returned // transactions (~ all available transactions). ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs // ReapMaxTxs reaps up to max transactions from the mempool. If max is // negative, there is no cap on the size of all returned transactions // (~ all available transactions). ReapMaxTxs(max int) types.Txs // Lock locks the mempool. The consensus must be able to hold lock to safely // update. Lock() // Unlock unlocks the mempool. Unlock() // PreUpdate signals that a new update is coming, before acquiring the mempool lock. // If the mempool is still rechecking at this point, it should be considered full. PreUpdate() // Update informs the mempool that the given txs were committed and can be // discarded. // // NOTE: // 1. This should be called *after* block is committed by consensus. // 2. Lock/Unlock must be managed by the caller. Update( blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, ) error // FlushAppConn flushes the mempool connection to ensure async callback calls // are done, e.g. from CheckTx. // // NOTE: // 1. Lock/Unlock must be managed by caller. FlushAppConn() error // Flush removes all transactions from the mempool and caches. Flush() // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. // // NOTE: // 1. The returned channel may be nil if EnableTxsAvailable was not called. TxsAvailable() <-chan struct{} // EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will // trigger once every height when transactions are available. EnableTxsAvailable() // Size returns the number of transactions in the mempool. Size() int // SizeBytes returns the total size of all txs in the mempool. SizeBytes() int64 }
Mempool defines the mempool interface.
Updates to the mempool need to be synchronized with committing a block so applications can reset their transient state on Commit.
type Metrics ¶
type Metrics struct { // Number of uncommitted transactions in the mempool. Size metrics.Gauge // Total size of the mempool in bytes. SizeBytes metrics.Gauge // Histogram of transaction sizes in bytes. TxSizeBytes metrics.Histogram `metrics_bucketsizes:"1,3,7" metrics_buckettype:"exp"` // FailedTxs defines the number of failed transactions. These are // transactions that failed to make it into the mempool because they were // deemed invalid. // metrics:Number of failed transactions. FailedTxs metrics.Counter // RejectedTxs defines the number of rejected transactions. These are // transactions that failed to make it into the mempool due to resource // limits, e.g. mempool is full. // metrics:Number of rejected transactions. RejectedTxs metrics.Counter // EvictedTxs defines the number of evicted transactions. These are valid // transactions that passed CheckTx and make it into the mempool but later // became invalid. // metrics:Number of evicted transactions. EvictedTxs metrics.Counter // Number of times transactions are rechecked in the mempool. RecheckTimes metrics.Counter // Number of times transactions were received more than once. // metrics:Number of duplicate transaction reception. AlreadyReceivedTxs metrics.Counter // Number of connections being actively used for gossiping transactions // (experimental feature). ActiveOutboundConnections metrics.Gauge }
Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.
func NopMetrics ¶
func NopMetrics() *Metrics
func PrometheusMetrics ¶
type NopMempool ¶ added in v0.37.4
type NopMempool struct{}
NopMempool is a mempool that does nothing.
The ABCI app is responsible for storing, disseminating, and proposing transactions. See [ADR-111](../docs/architecture/adr-111-nop-mempool.md).
func (*NopMempool) EnableTxsAvailable ¶ added in v0.37.4
func (*NopMempool) EnableTxsAvailable()
EnableTxsAvailable does nothing.
func (*NopMempool) FlushAppConn ¶ added in v0.37.4
func (*NopMempool) FlushAppConn() error
FlushAppConn does nothing.
func (*NopMempool) PreUpdate ¶ added in v0.37.7
func (*NopMempool) PreUpdate()
func (*NopMempool) ReapMaxBytesMaxGas ¶ added in v0.37.4
func (*NopMempool) ReapMaxBytesMaxGas(int64, int64) types.Txs
ReapMaxBytesMaxGas always returns nil.
func (*NopMempool) ReapMaxTxs ¶ added in v0.37.4
func (*NopMempool) ReapMaxTxs(int) types.Txs
ReapMaxTxs always returns nil.
func (*NopMempool) RemoveTxByKey ¶ added in v0.37.4
func (*NopMempool) RemoveTxByKey(types.TxKey) error
RemoveTxByKey always returns an error.
func (*NopMempool) SizeBytes ¶ added in v0.37.4
func (*NopMempool) SizeBytes() int64
SizeBytes always returns 0.
func (*NopMempool) TxsAvailable ¶ added in v0.37.4
func (*NopMempool) TxsAvailable() <-chan struct{}
TxsAvailable always returns nil.
func (*NopMempool) Update ¶ added in v0.37.4
func (*NopMempool) Update( int64, types.Txs, []*abci.ExecTxResult, PreCheckFunc, PostCheckFunc, ) error
Update does nothing.
type NopMempoolReactor ¶ added in v0.37.4
type NopMempoolReactor struct {
service.BaseService
}
NopMempoolReactor is a mempool reactor that does nothing.
func NewNopMempoolReactor ¶ added in v0.37.4
func NewNopMempoolReactor() *NopMempoolReactor
NewNopMempoolReactor returns a new `nop` reactor.
To be used only in RPC.
func (*NopMempoolReactor) AddPeer ¶ added in v0.37.4
func (*NopMempoolReactor) AddPeer(p2p.Peer)
AddPeer does nothing.
func (*NopMempoolReactor) GetChannels ¶ added in v0.37.4
func (*NopMempoolReactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels always returns nil.
func (*NopMempoolReactor) InitPeer ¶ added in v0.37.4
func (*NopMempoolReactor) InitPeer(p2p.Peer) p2p.Peer
InitPeer always returns nil.
func (*NopMempoolReactor) Receive ¶ added in v0.38.2
func (*NopMempoolReactor) Receive(p2p.Envelope)
Receive does nothing.
func (*NopMempoolReactor) RemovePeer ¶ added in v0.37.4
func (*NopMempoolReactor) RemovePeer(p2p.Peer, any)
RemovePeer does nothing.
func (*NopMempoolReactor) SetSwitch ¶ added in v0.37.4
func (*NopMempoolReactor) SetSwitch(*p2p.Switch)
SetSwitch does nothing.
func (*NopMempoolReactor) WaitSync ¶ added in v0.37.4
func (*NopMempoolReactor) WaitSync() bool
WaitSync always returns false.
type NopTxCache ¶
type NopTxCache struct{}
NopTxCache defines a no-op raw transaction cache.
func (NopTxCache) Remove ¶
func (NopTxCache) Remove(types.Tx)
func (NopTxCache) Reset ¶
func (NopTxCache) Reset()
type PeerState ¶ added in v0.38.0
type PeerState interface {
GetHeight() int64
}
PeerState describes the state of a peer.
type PostCheckFunc ¶
type PostCheckFunc func(types.Tx, *abci.CheckTxResponse) error
PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.
func PostCheckMaxGas ¶
func PostCheckMaxGas(maxGas int64) PostCheckFunc
PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.
type PreCheckFunc ¶
PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.
func PreCheckMaxBytes ¶
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc
PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
type Reactor ¶ added in v0.38.0
type Reactor struct { p2p.BaseReactor // contains filtered or unexported fields }
Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.
func NewReactor ¶ added in v0.38.0
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool, waitSync bool) *Reactor
NewReactor returns a new Reactor with the given config and mempool.
func (*Reactor) AddPeer ¶ added in v0.38.0
AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (*Reactor) EnableInOutTxs ¶ added in v1.0.0
func (memR *Reactor) EnableInOutTxs()
func (*Reactor) GetChannels ¶ added in v0.38.0
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels implements Reactor by returning the list of channels for this reactor.
func (*Reactor) Receive ¶ added in v0.38.0
Receive implements Reactor. It adds any received transactions to the mempool.
type TxCache ¶
type TxCache interface { // Reset resets the cache to an empty state. Reset() // Push adds the given raw transaction to the cache and returns true if it was // newly added. Otherwise, it returns false. Push(tx types.Tx) bool // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) // Has reports whether tx is present in the cache. Checking for presence is // not treated as an access of the value. Has(tx types.Tx) bool }
TxCache defines an interface for raw transaction caching in a mempool. Currently, a TxCache does not allow direct reading or getting of transaction values. A TxCache is used primarily to push transactions and removing transactions. Pushing via Push returns a boolean telling the caller if the transaction already exists in the cache or not.