mempool

package
v0.0.0-...-69704c5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2023 License: Apache-2.0, Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

TODO: Better handle abci client errors. (make it automatically handle connection errors)

Index

Constants

View Source
const (
	MempoolChannel = byte(0x30)

	// UnknownPeerID is the peer ID to use when running CheckTx when there is
	// no peer (e.g. RPC)
	UnknownPeerID uint16 = 0
)
View Source
const (
	FlagEnablePendingPool = "mempool.enable_pending_pool"
)
View Source
const (
	// MetricsSubsystem is a subsystem shared by all metrics exposed by this
	// package.
	MetricsSubsystem = "mempool"
)

Variables

View Source
var (
	// ErrTxInCache is returned to the client if we saw tx earlier
	ErrTxInCache = errors.New("tx already exists in cache")
	// ErrNoSuchTx is returned to the client if there hasn't target tx in mempool
	ErrNoSuchTx = errors.New("no such tx in mempool")
)

Functions

func IsPreCheckError

func IsPreCheckError(err error) bool

IsPreCheckError returns true if err is due to pre check failure.

func MultiPriceBump

func MultiPriceBump(rawPrice *big.Int, priceBump int64) *big.Int

func RegisterMessages

func RegisterMessages(cdc *amino.Codec)

Types

type AccountRetriever

type AccountRetriever interface {
	GetAccountNonce(address string) uint64
}

type AddressNonce

type AddressNonce struct {
	// contains filtered or unexported fields
}

type AddressRecord

type AddressRecord struct {
	// contains filtered or unexported fields
}

func (*AddressRecord) AddItem

func (ar *AddressRecord) AddItem(address string, cElement *clist.CElement)

func (*AddressRecord) CleanItems

func (ar *AddressRecord) CleanItems(address string, nonce uint64, cb func(element *clist.CElement))

func (*AddressRecord) DeleteItem

func (ar *AddressRecord) DeleteItem(e *clist.CElement)

func (*AddressRecord) GetAddressList

func (ar *AddressRecord) GetAddressList() []string

func (*AddressRecord) GetAddressNonce

func (ar *AddressRecord) GetAddressNonce(address string) (uint64, bool)

func (*AddressRecord) GetAddressTxs

func (ar *AddressRecord) GetAddressTxs(address string, max int) types.Txs

func (*AddressRecord) GetAddressTxsCnt

func (ar *AddressRecord) GetAddressTxsCnt(address string) int

func (*AddressRecord) GetItems

func (ar *AddressRecord) GetItems(address string) []*clist.CElement

type AddressRecorder

type AddressRecorder interface {
	GetAddressList() []string
	GetAddressNonce(address string) (uint64, bool)
	GetAddressTxsCnt(address string) int
	GetAddressTxs(address string, max int) types.Txs
	CleanItems(address string, nonce uint64)
}

type BaseTxQueue

type BaseTxQueue struct {
	*AddressRecord
	// contains filtered or unexported fields
}

func NewBaseTxQueue

func NewBaseTxQueue() *BaseTxQueue

func (*BaseTxQueue) Back

func (q *BaseTxQueue) Back() *clist.CElement

func (*BaseTxQueue) BroadcastFront

func (q *BaseTxQueue) BroadcastFront() *clist.CElement

func (*BaseTxQueue) BroadcastLen

func (q *BaseTxQueue) BroadcastLen() int

func (*BaseTxQueue) CleanItems

func (q *BaseTxQueue) CleanItems(address string, nonce uint64)

func (*BaseTxQueue) Front

func (q *BaseTxQueue) Front() *clist.CElement

func (*BaseTxQueue) Insert

func (q *BaseTxQueue) Insert(tx *mempoolTx) error

func (*BaseTxQueue) Len

func (q *BaseTxQueue) Len() int

func (*BaseTxQueue) Load

func (q *BaseTxQueue) Load(hash [sha256.Size]byte) (*clist.CElement, bool)

func (*BaseTxQueue) Remove

func (q *BaseTxQueue) Remove(element *clist.CElement)

func (*BaseTxQueue) RemoveByKey

func (q *BaseTxQueue) RemoveByKey(key [32]byte) (ele *clist.CElement)

func (*BaseTxQueue) TxsWaitChan

func (q *BaseTxQueue) TxsWaitChan() <-chan struct{}

type CListMempool

type CListMempool struct {
	// contains filtered or unexported fields
}

CListMempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. The mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.

func NewCListMempool

func NewCListMempool(
	config *cfg.MempoolConfig,
	proxyAppConn proxy.AppConnMempool,
	height int64,
	options ...CListMempoolOption,
) *CListMempool

NewCListMempool returns a new mempool with the given configuration and connection to an application.

func (*CListMempool) BroadcastTxsFront

func (mem *CListMempool) BroadcastTxsFront() *clist.CElement

func (*CListMempool) CheckTx

func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error

It blocks if we're waiting on Update() or Reap(). cb: A callback from the CheckTx command.

It gets called from another goroutine.

CONTRACT: Either cb will get called, or err returned.

Safe for concurrent use by multiple goroutines.

func (*CListMempool) EnableTxsAvailable

func (mem *CListMempool) EnableTxsAvailable()

NOTE: not thread safe - should only be called once, on startup

func (*CListMempool) Flush

func (mem *CListMempool) Flush()

XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.

func (*CListMempool) FlushAppConn

func (mem *CListMempool) FlushAppConn() error

Lock() must be help by the caller during execution.

func (*CListMempool) GetAddressList

func (mem *CListMempool) GetAddressList() []string

func (*CListMempool) GetConfig

func (mem *CListMempool) GetConfig() *cfg.MempoolConfig

func (*CListMempool) GetEnableDeleteMinGPTx

func (mem *CListMempool) GetEnableDeleteMinGPTx() bool

func (*CListMempool) GetPendingNonce

func (mem *CListMempool) GetPendingNonce(address string) (uint64, bool)

func (*CListMempool) GetTxByHash

func (mem *CListMempool) GetTxByHash(hash [sha256.Size]byte) (types.Tx, error)

func (*CListMempool) GetTxSimulateGas

func (mem *CListMempool) GetTxSimulateGas(txHash string) int64

func (*CListMempool) GetUserPendingTxsCnt

func (mem *CListMempool) GetUserPendingTxsCnt(address string) int

func (*CListMempool) Height

func (mem *CListMempool) Height() int64

Safe for concurrent use by multiple goroutines.

func (*CListMempool) Lock

func (mem *CListMempool) Lock()

Safe for concurrent use by multiple goroutines.

func (*CListMempool) ReapEssentialTx

func (mem *CListMempool) ReapEssentialTx(tx types.Tx) abci.TxEssentials

func (*CListMempool) ReapMaxBytesMaxGas

func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx

Safe for concurrent use by multiple goroutines.

func (*CListMempool) ReapMaxTxs

func (mem *CListMempool) ReapMaxTxs(max int) types.Txs

Safe for concurrent use by multiple goroutines.

func (*CListMempool) ReapUserTxs

func (mem *CListMempool) ReapUserTxs(address string, max int) types.Txs

func (*CListMempool) ReapUserTxsCnt

func (mem *CListMempool) ReapUserTxsCnt(address string) int

func (*CListMempool) SetAccountRetriever

func (mem *CListMempool) SetAccountRetriever(retriever AccountRetriever)

func (*CListMempool) SetEventBus

func (mem *CListMempool) SetEventBus(eventBus types.TxEventPublisher)

SetLogger sets the Logger.

func (*CListMempool) SetLogger

func (mem *CListMempool) SetLogger(l log.Logger)

SetLogger sets the Logger.

func (*CListMempool) SetTxInfoParser

func (mem *CListMempool) SetTxInfoParser(parser TxInfoParser)

func (*CListMempool) Size

func (mem *CListMempool) Size() int

Safe for concurrent use by multiple goroutines.

func (*CListMempool) TxsAvailable

func (mem *CListMempool) TxsAvailable() <-chan struct{}

Safe for concurrent use by multiple goroutines.

func (*CListMempool) TxsBytes

func (mem *CListMempool) TxsBytes() int64

Safe for concurrent use by multiple goroutines.

func (*CListMempool) TxsFront

func (mem *CListMempool) TxsFront() *clist.CElement

TxsFront returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. FIXME: leaking implementation details!

Safe for concurrent use by multiple goroutines.

func (*CListMempool) TxsWaitChan

func (mem *CListMempool) TxsWaitChan() <-chan struct{}

TxsWaitChan returns a channel to wait on transactions. It will be closed once the mempool is not empty (ie. the internal `mem.txs` has at least one element)

Safe for concurrent use by multiple goroutines.

func (*CListMempool) Unlock

func (mem *CListMempool) Unlock()

Safe for concurrent use by multiple goroutines.

func (*CListMempool) Update

func (mem *CListMempool) Update(
	height int64,
	txs types.Txs,
	deliverTxResponses []*abci.ResponseDeliverTx,
	preCheck PreCheckFunc,
	postCheck PostCheckFunc,
) error

Lock() must be help by the caller during execution.

type CListMempoolOption

type CListMempoolOption func(*CListMempool)

CListMempoolOption sets an optional parameter on the mempool.

func WithMetrics

func WithMetrics(metrics *Metrics) CListMempoolOption

WithMetrics sets the metrics.

func WithPostCheck

func WithPostCheck(f PostCheckFunc) CListMempoolOption

WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran after CheckTx.

func WithPreCheck

func WithPreCheck(f PreCheckFunc) CListMempoolOption

WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran before CheckTx.

type ErrMempoolIsFull

type ErrMempoolIsFull struct {
	// contains filtered or unexported fields
}

ErrMempoolIsFull means Tendermint & an application can't handle that much load

func (ErrMempoolIsFull) Error

func (e ErrMempoolIsFull) Error() string

type ErrPendingPoolAddressLimit

type ErrPendingPoolAddressLimit struct {
	// contains filtered or unexported fields
}

ErrPendingPoolAddressLimit means address sending too many txs in PendingPool

func (ErrPendingPoolAddressLimit) Error

type ErrPendingPoolIsFull

type ErrPendingPoolIsFull struct {
	// contains filtered or unexported fields
}

ErrPendingPoolIsFull means PendingPool can't handle that much load

func (ErrPendingPoolIsFull) Error

func (e ErrPendingPoolIsFull) Error() string

type ErrPreCheck

type ErrPreCheck struct {
	Reason error
}

ErrPreCheck is returned when tx is too big

func (ErrPreCheck) Error

func (e ErrPreCheck) Error() string

type ErrTxAlreadyInPendingPool

type ErrTxAlreadyInPendingPool struct {
	// contains filtered or unexported fields
}

ErrTxAlreadyInPendingPool means the tx already in PendingPool

func (ErrTxAlreadyInPendingPool) Error

type ErrTxTooLarge

type ErrTxTooLarge struct {
	// contains filtered or unexported fields
}

ErrTxTooLarge means the tx is too big to be sent in a message to other peers

func (ErrTxTooLarge) Error

func (e ErrTxTooLarge) Error() string

type ExTxInfo

type ExTxInfo struct {
	Sender      string   `json:"sender"`
	SenderNonce uint64   `json:"sender_nonce"`
	GasPrice    *big.Int `json:"gas_price"`
	Nonce       uint64   `json:"nonce"`
}

--------------------------------------------------------------------------------

type GasInfo

type GasInfo struct {
	// GasWanted is the maximum units of work we allow this tx to perform.
	GasWanted uint64

	// GasUsed is the amount of gas actually consumed.
	GasUsed uint64
}

GasInfo defines tx execution gas context.

type GasTxQueue

type GasTxQueue struct {
	*AddressRecord
	// contains filtered or unexported fields
}

func NewGasTxQueue

func NewGasTxQueue(txPriceBump int64) *GasTxQueue

func (*GasTxQueue) Back

func (q *GasTxQueue) Back() *clist.CElement

func (*GasTxQueue) BroadcastFront

func (q *GasTxQueue) BroadcastFront() *clist.CElement

func (*GasTxQueue) BroadcastLen

func (q *GasTxQueue) BroadcastLen() int

func (*GasTxQueue) CleanItems

func (q *GasTxQueue) CleanItems(address string, nonce uint64)

func (*GasTxQueue) Front

func (q *GasTxQueue) Front() *clist.CElement

func (*GasTxQueue) Insert

func (q *GasTxQueue) Insert(memTx *mempoolTx) error

func (*GasTxQueue) Len

func (q *GasTxQueue) Len() int

func (*GasTxQueue) Load

func (q *GasTxQueue) Load(hash [sha256.Size]byte) (*clist.CElement, bool)

func (*GasTxQueue) Remove

func (q *GasTxQueue) Remove(element *clist.CElement)

func (*GasTxQueue) RemoveByKey

func (q *GasTxQueue) RemoveByKey(key [32]byte) (ele *clist.CElement)

func (*GasTxQueue) TxsWaitChan

func (q *GasTxQueue) TxsWaitChan() <-chan struct{}

type ITransactionQueue

type ITransactionQueue interface {
	Len() int
	Insert(tx *mempoolTx) error
	Remove(element *clist.CElement)
	RemoveByKey(key [sha256.Size]byte) *clist.CElement
	Front() *clist.CElement
	Back() *clist.CElement
	BroadcastFront() *clist.CElement
	BroadcastLen() int
	Load(hash [sha256.Size]byte) (*clist.CElement, bool)
	TxsWaitChan() <-chan struct{}

	AddressRecorder
}

func NewOptimizedTxQueue

func NewOptimizedTxQueue(txPriceBump int64) ITransactionQueue

type Mempool

type Mempool interface {
	// CheckTx executes a new transaction against the application to determine
	// its validity and whether it should be added to the mempool.
	CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error

	// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
	// bytes total with the condition that the total gasWanted must be less than
	// maxGas.
	// If both maxes are negative, there is no cap on the size of all returned
	// transactions (~ all available transactions).
	ReapMaxBytesMaxGas(maxBytes, maxGas int64) []types.Tx
	ReapEssentialTx(tx types.Tx) abci.TxEssentials

	// ReapMaxTxs reaps up to max transactions from the mempool.
	// If max is negative, there is no cap on the size of all returned
	// transactions (~ all available transactions).
	ReapMaxTxs(max int) types.Txs

	ReapUserTxsCnt(address string) int

	// only for checkTx used!
	GetUserPendingTxsCnt(address string) int

	ReapUserTxs(address string, max int) types.Txs
	GetPendingNonce(address string) (uint64, bool)

	// Lock locks the mempool. The consensus must be able to hold lock to safely update.
	Lock()

	// Unlock unlocks the mempool.
	Unlock()

	// Update informs the mempool that the given txs were committed and can be discarded.
	// NOTE: this should be called *after* block is committed by consensus.
	// NOTE: Lock/Unlock must be managed by caller
	Update(
		blockHeight int64,
		blockTxs types.Txs,
		deliverTxResponses []*abci.ResponseDeliverTx,
		newPreFn PreCheckFunc,
		newPostFn PostCheckFunc,
	) error

	// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
	// done. E.g. from CheckTx.
	// NOTE: Lock/Unlock must be managed by caller
	FlushAppConn() error

	// Flush removes all transactions from the mempool and cache
	Flush()

	// TxsAvailable returns a channel which fires once for every height,
	// and only when transactions are available in the mempool.
	// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
	TxsAvailable() <-chan struct{}

	// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
	// trigger once every height when transactions are available.
	EnableTxsAvailable()

	// Size returns the number of transactions in the mempool.
	Size() int

	// TxsBytes returns the total size of all txs in the mempool.
	TxsBytes() int64

	SetEventBus(eventBus types.TxEventPublisher)

	GetConfig() *cfg.MempoolConfig
	GetTxByHash(hash [sha256.Size]byte) (types.Tx, error)

	GetAddressList() []string
	SetAccountRetriever(retriever AccountRetriever)

	SetTxInfoParser(parser TxInfoParser)

	GetTxSimulateGas(txHash string) int64

	GetEnableDeleteMinGPTx() bool
}

Mempool defines the mempool interface.

Updates to the mempool need to be synchronized with committing a block so apps can reset their transient state on Commit.

type Message

type Message interface{}

Message is a message sent or received by the Reactor.

type Metrics

type Metrics struct {
	// Size of the mempool.
	Size metrics.Gauge
	// Histogram of transaction sizes, in bytes.
	TxSizeBytes metrics.Histogram
	// Number of failed transactions.
	FailedTxs metrics.Counter
	// Number of times transactions are rechecked in the mempool.
	RecheckTimes metrics.Counter
	// Size of the pending pool
	PendingPoolSize metrics.Gauge
	// Size of the pending pool
	GasUsed metrics.Gauge
}

Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.

func NopMetrics

func NopMetrics() *Metrics

NopMetrics returns no-op Metrics.

func PrometheusMetrics

func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics

PrometheusMetrics returns Metrics build using Prometheus client library. Optionally, labels can be provided along with their values ("foo", "fooValue").

type PeerState

type PeerState interface {
	GetHeight() int64
}

PeerState describes the state of a peer.

type PendingPool

type PendingPool struct {
	// contains filtered or unexported fields
}

func (*PendingPool) Size

func (p *PendingPool) Size() int

type PostCheckFunc

type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.

func PostCheckMaxGas

func PostCheckMaxGas(maxGas int64) PostCheckFunc

PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.

type PreCheckFunc

type PreCheckFunc func(types.Tx) error

PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.

func PreCheckAminoMaxBytes

func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc

PreCheckAminoMaxBytes checks that the size of the transaction plus the amino overhead is smaller or equal to the expected maxBytes.

type Reactor

type Reactor struct {
	p2p.BaseReactor
	// contains filtered or unexported fields
}

Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.

func NewReactor

func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor

NewReactor returns a new Reactor with the given config and mempool.

func (*Reactor) AddPeer

func (memR *Reactor) AddPeer(peer p2p.Peer)

AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.

func (*Reactor) GetChannels

func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor

GetChannels implements Reactor. It returns the list of channels for this reactor.

func (*Reactor) InitPeer

func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer

InitPeer implements Reactor by creating a state for the peer.

func (*Reactor) OnStart

func (memR *Reactor) OnStart() error

OnStart implements p2p.BaseReactor.

func (*Reactor) Receive

func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)

Receive implements Reactor. It adds any received transactions to the mempool.

func (*Reactor) RemovePeer

func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{})

RemovePeer implements Reactor.

func (*Reactor) SetLogger

func (memR *Reactor) SetLogger(l log.Logger)

SetLogger sets the Logger on the reactor and the underlying mempool.

func (*Reactor) SetNodeKey

func (memR *Reactor) SetNodeKey(key *p2p.NodeKey)

type Result

type Result struct {
	// Data is any data returned from message or handler execution. It MUST be length
	// prefixed in order to separate data from multiple message executions.
	Data []byte

	// Log contains the log information from message or handler execution.
	Log string

	// Events contains a slice of Event objects that were emitted during message or
	// handler execution.
	Events []abci.Event
}

Result is the union of ResponseFormat and ResponseCheckTx.

type SimulationResponse

type SimulationResponse struct {
	GasInfo
	Result *Result
}

SimulationResponse defines the response generated when a transaction is successfully simulated by the Baseapp.

type TxInfo

type TxInfo struct {
	// SenderID is the internal peer ID used in the mempool to identify the
	// sender, storing 2 bytes with each tx instead of 20 bytes for the p2p.ID.
	SenderID uint16
	// SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging.
	SenderP2PID p2p.ID
	// contains filtered or unexported fields
}

TxInfo are parameters that get passed when attempting to add a tx to the mempool.

type TxInfoParser

type TxInfoParser interface {
	GetRawTxInfo(tx types.Tx) ExTxInfo
	GetTxHistoryGasUsed(tx types.Tx) int64
	GetRealTxFromRawTx(rawTx types.Tx) abci.TxEssentials
}

type TxMessage

type TxMessage struct {
	Tx   types.Tx
	From string
}

TxMessage is a Message containing a transaction.

func (TxMessage) AminoSize

func (m TxMessage) AminoSize(_ *amino.Codec) int

func (TxMessage) MarshalAminoTo

func (m TxMessage) MarshalAminoTo(_ *amino.Codec, buf *bytes.Buffer) error

func (TxMessage) MarshalToAmino

func (m TxMessage) MarshalToAmino(cdc *amino.Codec) ([]byte, error)

func (*TxMessage) String

func (m *TxMessage) String() string

String returns a string representation of the TxMessage.

func (*TxMessage) UnmarshalFromAmino

func (m *TxMessage) UnmarshalFromAmino(_ *amino.Codec, data []byte) error

type WrappedTx

type WrappedTx struct {
	Payload   []byte `json:"payload"`   // std tx or evm tx
	From      string `json:"from"`      // from address of evm tx or ""
	Signature []byte `json:"signature"` // signature for payload
	NodeKey   []byte `json:"nodeKey"`   // pub key of the node who signs the tx
}

func (*WrappedTx) GetFrom

func (wtx *WrappedTx) GetFrom() string

func (*WrappedTx) GetNodeKey

func (wtx *WrappedTx) GetNodeKey() []byte

func (*WrappedTx) GetPayload

func (wtx *WrappedTx) GetPayload() []byte

func (*WrappedTx) GetSignature

func (wtx *WrappedTx) GetSignature() []byte

type WtxMessage

type WtxMessage struct {
	Wtx *WrappedTx
}

WtxMessage is a Message containing a transaction.

func (*WtxMessage) String

func (m *WtxMessage) String() string

String returns a string representation of the WtxMessage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL