v1

package
v0.36.0-dev Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2021 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetChannelShims

func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim

GetChannelShims returns a map of ChannelDescriptorShim objects, where each object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding p2p proto.Message the new p2p Channel is responsible for handling.

TODO: Remove once p2p refactor is complete. ref: https://github.com/tendermint/tendermint/issues/5670

Types

type PeerManager

type PeerManager interface {
	GetHeight(types.NodeID) int64
}

PeerManager defines the interface contract required for getting necessary peer information. This should eventually be replaced with a message-oriented approach utilizing the p2p stack.

type Reactor

type Reactor struct {
	service.BaseService
	// contains filtered or unexported fields
}

Reactor implements a service that contains mempool of txs that are broadcasted amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.

func NewReactor

func NewReactor(
	logger log.Logger,
	cfg *config.MempoolConfig,
	peerMgr PeerManager,
	txmp *TxMempool,
	mempoolCh *p2p.Channel,
	peerUpdates *p2p.PeerUpdates,
) *Reactor

NewReactor returns a reference to a new reactor.

func (*Reactor) OnStart

func (r *Reactor) OnStart() error

OnStart starts separate go routines for each p2p Channel and listens for envelopes on each. In addition, it also listens for peer updates and handles messages on that p2p channel accordingly. The caller must be sure to execute OnStop to ensure the outbound p2p Channels are closed.

func (*Reactor) OnStop

func (r *Reactor) OnStop()

OnStop stops the reactor by signaling to all spawned goroutines to exit and blocking until they all exit.

type TxMempool

type TxMempool struct {
	// contains filtered or unexported fields
}

TxMempool defines a prioritized mempool data structure used by the v1 mempool reactor. It keeps a thread-safe priority queue of transactions that is used when a block proposer constructs a block and a thread-safe linked-list that is used to gossip transactions to peers in a FIFO manner.

func NewTxMempool

func NewTxMempool(
	logger log.Logger,
	cfg *config.MempoolConfig,
	proxyAppConn proxy.AppConnMempool,
	height int64,
	options ...TxMempoolOption,
) *TxMempool

func (*TxMempool) CheckTx

func (txmp *TxMempool) CheckTx(
	ctx context.Context,
	tx types.Tx,
	cb func(*abci.Response),
	txInfo mempool.TxInfo,
) error

CheckTx executes the ABCI CheckTx method for a given transaction. It acquires a read-lock attempts to execute the application's CheckTx ABCI method via CheckTxAsync. We return an error if any of the following happen:

  • The CheckTxAsync execution fails.
  • The transaction already exists in the cache and we've already received the transaction from the peer. Otherwise, if it solely exists in the cache, we return nil.
  • The transaction size exceeds the maximum transaction size as defined by the configuration provided to the mempool.
  • The transaction fails Pre-Check (if it is defined).
  • The proxyAppConn fails, e.g. the buffer is full.

If the mempool is full, we still execute CheckTx and attempt to find a lower priority transaction to evict. If such a transaction exists, we remove the lower priority transaction and add the new one with higher priority.

NOTE: - The applications' CheckTx implementation may panic. - The caller is not to explicitly require any locks for executing CheckTx.

func (*TxMempool) EnableTxsAvailable

func (txmp *TxMempool) EnableTxsAvailable()

EnableTxsAvailable enables the mempool to trigger events when transactions are available on a block by block basis.

func (*TxMempool) Flush

func (txmp *TxMempool) Flush()

Flush flushes out the mempool. It acquires a read-lock, fetches all the transactions currently in the transaction store and removes each transaction from the store and all indexes and finally resets the cache.

NOTE: - Flushing the mempool may leave the mempool in an inconsistent state.

func (*TxMempool) FlushAppConn

func (txmp *TxMempool) FlushAppConn() error

FlushAppConn executes FlushSync on the mempool's proxyAppConn.

NOTE: The caller must obtain a write-lock via Lock() prior to execution.

func (*TxMempool) Lock

func (txmp *TxMempool) Lock()

Lock obtains a write-lock on the mempool. A caller must be sure to explicitly release the lock when finished.

func (*TxMempool) NextGossipTx

func (txmp *TxMempool) NextGossipTx() *clist.CElement

NextGossipTx returns the next valid transaction to gossip. A caller must wait for WaitForNextTx to signal a transaction is available to gossip first. It is thread-safe.

func (*TxMempool) ReapMaxBytesMaxGas

func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

ReapMaxBytesMaxGas returns a list of transactions within the provided size and gas constraints. Transaction are retrieved in priority order.

NOTE:

  • A read-lock is acquired.
  • Transactions returned are not actually removed from the mempool transaction store or indexes.

func (*TxMempool) ReapMaxTxs

func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs

ReapMaxTxs returns a list of transactions within the provided number of transactions bound. Transaction are retrieved in priority order.

NOTE:

  • A read-lock is acquired.
  • Transactions returned are not actually removed from the mempool transaction store or indexes.

func (*TxMempool) Size

func (txmp *TxMempool) Size() int

Size returns the number of valid transactions in the mempool. It is thread-safe.

func (*TxMempool) SizeBytes

func (txmp *TxMempool) SizeBytes() int64

SizeBytes return the total sum in bytes of all the valid transactions in the mempool. It is thread-safe.

func (*TxMempool) TxsAvailable

func (txmp *TxMempool) TxsAvailable() <-chan struct{}

TxsAvailable returns a channel which fires once for every height, and only when transactions are available in the mempool. It is thread-safe.

func (*TxMempool) Unlock

func (txmp *TxMempool) Unlock()

Unlock releases a write-lock on the mempool.

func (*TxMempool) Update

func (txmp *TxMempool) Update(
	blockHeight int64,
	blockTxs types.Txs,
	deliverTxResponses []*abci.ResponseDeliverTx,
	newPreFn mempool.PreCheckFunc,
	newPostFn mempool.PostCheckFunc,
) error

Update iterates over all the transactions provided by the caller, i.e. the block producer, and removes them from the cache (if applicable) and removes the transactions from the main transaction store and associated indexes. Finally, if there are trainsactions remaining in the mempool, we initiate a re-CheckTx for them (if applicable), otherwise, we notify the caller more transactions are available.

NOTE: - The caller must explicitly acquire a write-lock via Lock().

func (*TxMempool) WaitForNextTx

func (txmp *TxMempool) WaitForNextTx() <-chan struct{}

WaitForNextTx returns a blocking channel that will be closed when the next valid transaction is available to gossip. It is thread-safe.

type TxMempoolOption

type TxMempoolOption func(*TxMempool)

TxMempoolOption sets an optional parameter on the TxMempool.

func WithMetrics

func WithMetrics(metrics *mempool.Metrics) TxMempoolOption

WithMetrics sets the mempool's metrics collector.

func WithPostCheck

func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption

WithPostCheck sets a filter for the mempool to reject a transaction if f(tx, resp) returns an error. This is executed after CheckTx. It only applies to the first created block. After that, Update overwrites the existing value.

func WithPreCheck

func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption

WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) returns an error. This is executed before CheckTx. It only applies to the first created block. After that, Update() overwrites the existing value.

type TxPriorityQueue

type TxPriorityQueue struct {
	// contains filtered or unexported fields
}

TxPriorityQueue defines a thread-safe priority queue for valid transactions.

func NewTxPriorityQueue

func NewTxPriorityQueue() *TxPriorityQueue

func (*TxPriorityQueue) GetEvictableTxs

func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx

GetEvictableTxs attempts to find and return a list of *WrappedTx than can be evicted to make room for another *WrappedTx with higher priority. If no such list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx indicate that these transactions can be removed due to them being of lower priority and that their total sum in size allows room for the incoming transaction according to the mempool's configured limits.

func (*TxPriorityQueue) Len

func (pq *TxPriorityQueue) Len() int

Len implements the Heap interface.

NOTE: A caller should never call Len. Use NumTxs instead.

func (*TxPriorityQueue) Less

func (pq *TxPriorityQueue) Less(i, j int) bool

Less implements the Heap interface. It returns true if the transaction at position i in the queue is of less priority than the transaction at position j.

func (*TxPriorityQueue) NumTxs

func (pq *TxPriorityQueue) NumTxs() int

NumTxs returns the number of transactions in the priority queue. It is thread safe.

func (*TxPriorityQueue) Pop

func (pq *TxPriorityQueue) Pop() interface{}

Pop implements the Heap interface.

NOTE: A caller should never call Pop. Use PopTx instead.

func (*TxPriorityQueue) PopTx

func (pq *TxPriorityQueue) PopTx() *WrappedTx

PopTx removes the top priority transaction from the queue. It is thread safe.

func (*TxPriorityQueue) Push

func (pq *TxPriorityQueue) Push(x interface{})

Push implements the Heap interface.

NOTE: A caller should never call Push. Use PushTx instead.

func (*TxPriorityQueue) PushTx

func (pq *TxPriorityQueue) PushTx(tx *WrappedTx)

PushTx adds a valid transaction to the priority queue. It is thread safe.

func (*TxPriorityQueue) RemoveTx

func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx)

RemoveTx removes a specific transaction from the priority queue.

func (*TxPriorityQueue) Swap

func (pq *TxPriorityQueue) Swap(i, j int)

Swap implements the Heap interface. It swaps two transactions in the queue.

type TxStore

type TxStore struct {
	// contains filtered or unexported fields
}

TxStore implements a thread-safe mapping of valid transaction(s).

NOTE:

  • Concurrent read-only access to a *WrappedTx object is OK. However, mutative access is not allowed. Regardless, it is not expected for the mempool to need mutative access.

func NewTxStore

func NewTxStore() *TxStore

func (*TxStore) GetAllTxs

func (txs *TxStore) GetAllTxs() []*WrappedTx

GetAllTxs returns all the transactions currently in the store.

func (*TxStore) GetOrSetPeerByTxHash

func (txs *TxStore) GetOrSetPeerByTxHash(hash [mempool.TxKeySize]byte, peerID uint16) (*WrappedTx, bool)

GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the given peerID to the WrappedTx's set of peers that sent us this transaction. We return true if we've already recorded the given peer for this transaction and false otherwise. If the transaction does not exist by hash, we return (nil, false).

func (*TxStore) GetTxByHash

func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx

GetTxByHash returns a *WrappedTx by the transaction's hash.

func (*TxStore) GetTxBySender

func (txs *TxStore) GetTxBySender(sender string) *WrappedTx

GetTxBySender returns a *WrappedTx by the transaction's sender property defined by the ABCI application.

func (*TxStore) IsTxRemoved

func (txs *TxStore) IsTxRemoved(hash [mempool.TxKeySize]byte) bool

IsTxRemoved returns true if a transaction by hash is marked as removed and false otherwise.

func (*TxStore) RemoveTx

func (txs *TxStore) RemoveTx(wtx *WrappedTx)

RemoveTx removes a *WrappedTx from the transaction store. It deletes all indexes of the transaction.

func (*TxStore) SetTx

func (txs *TxStore) SetTx(wtx *WrappedTx)

SetTx stores a *WrappedTx by it's hash. If the transaction also contains a non-empty sender, we additionally store the transaction by the sender as defined by the ABCI application.

func (*TxStore) Size

func (txs *TxStore) Size() int

Size returns the total number of transactions in the store.

func (*TxStore) TxHasPeer

func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool

TxHasPeer returns true if a transaction by hash has a given peer ID and false otherwise. If the transaction does not exist, false is returned.

type WrappedTx

type WrappedTx struct {
	// contains filtered or unexported fields
}

WrappedTx defines a wrapper around a raw transaction with additional metadata that is used for indexing.

func (*WrappedTx) Size

func (wtx *WrappedTx) Size() int

type WrappedTxList

type WrappedTxList struct {
	// contains filtered or unexported fields
}

WrappedTxList implements a thread-safe list of *WrappedTx objects that can be used to build generic transaction indexes in the mempool. It accepts a comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx references which is used during Insert in order to determine sorted order. If less returns true, a <= b.

func NewWrappedTxList

func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList

func (*WrappedTxList) Insert

func (wtl *WrappedTxList) Insert(wtx *WrappedTx)

Insert inserts a WrappedTx reference into the sorted list based on the list's comparator function.

func (*WrappedTxList) Remove

func (wtl *WrappedTxList) Remove(wtx *WrappedTx)

Remove attempts to remove a WrappedTx from the sorted list.

func (*WrappedTxList) Reset

func (wtl *WrappedTxList) Reset()

Reset resets the list of transactions to an empty list.

func (*WrappedTxList) Size

func (wtl *WrappedTxList) Size() int

Size returns the number of WrappedTx objects in the list.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL