mempool

package
v1.0.3-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

A mempool basically does these functions:

  • Provide a proposed set of requests (refs) for the consensus.
  • Provide a set of requests for a TX as decided by the consensus.
  • Share Off-Ledger requests between the committee and the server nodes.

When the consensus asks for a proposal set, the mempool has to determine, if a reorg or a rollback has happened and adjust the request set accordingly. For this to work the mempool has to maintain not only the requests, but also the latest state for which it has provided the proposal. Let's say the mempool has provided proposals for PrevAO (AO≡AliasOutput).

Upon reception of the proposal query (ConsensusProposalAsync) for NextAO from the consensus, it asks the StateMgr for the virtual state VS(NextAO) corresponding to the NextAO and a list of blocks that has to be reverted. The state manager collects this information by finding a common ancestor of the NextAO and PrevAO, say CommonAO = NextAO ⊓ PrevAO. The blocks to be reverted are those in the range (CommonAO, PrevAO].

When the mempool gets VS(NextAO) and RevertBlocks = (CommonAO, PrevAO] it re-adds the requests from RevertBlocks to the mempool and then drops the requests that are already processed in VS(NextAO). If the RevertBlocks set is not empty, it has to drop all the on-ledger requests and re-read them from the L1. In the normal execution, we'll have RevertBlocks=∅ and VS(NextAO) will differ from VS(PrevAO) in a single block.

The response to the requests decided by the consensus (ConsensusRequestsAsync) should be unconditional and should ignore the current state of the requests. This call should not modify nor the NextAO not the PrevAO. The state will be updated later with the proposal query, because then the chain will know, which branch to work on.

Time-locked requests are maintained in the mempool as well. They are provided to the proposal based on a tangle time. The tangle time is received from the L1 with the milestones.

NOTE: A node looses its off-ledger requests on restart. The on-ledger requests will be added back to the mempool by reading them from the L1 node.

TODO: Propose subset of the requests. That's for the next release.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChainListener added in v1.0.3

type ChainListener interface {
	// This function is called by the chain when new block is applied to the
	// state. This block might be not confirmed yet, but the chain is going
	// to build the next block on top of this one.
	BlockApplied(chainID isc.ChainID, block state.Block, latestState kv.KVStoreReader)
}

Partial interface for providing chain events to the outside. This interface is in the mempool part only because it tracks the actual state for checking the consumed requests.

type Mempool

type Mempool interface {
	consGR.Mempool
	// Invoked by the chain, when new alias output is considered as a tip/head
	// of the chain. Mempool can reorganize its state by removing/rejecting
	// or re-adding some requests, depending on how the head has changed.
	// It can mean simple advance of the chain, or a rollback or a reorg.
	// This function is guaranteed to be called in the order, which is
	// considered the chain block order by the ChainMgr.
	TrackNewChainHead(st state.State, from, till *isc.AliasOutputWithID, added, removed []state.Block) <-chan bool
	// Invoked by the chain when a new off-ledger request is received from a node user.
	// Inter-node off-ledger dissemination is NOT performed via this function.
	ReceiveOnLedgerRequest(request isc.OnLedgerRequest)
	// This is called when this node receives an off-ledger request from a user directly.
	// I.e. when this node is an entry point of the off-ledger request.
	ReceiveOffLedgerRequest(request isc.OffLedgerRequest) error
	// Invoked by the ChainMgr when a time of a tangle changes.
	TangleTimeUpdated(tangleTime time.Time)
	// Invoked by the chain when a set of server nodes has changed.
	// These nodes should be used to disseminate the off-ledger requests.
	ServerNodesUpdated(committeePubKeys []*cryptolib.PublicKey, serverNodePubKeys []*cryptolib.PublicKey)
	AccessNodesUpdated(committeePubKeys []*cryptolib.PublicKey, accessNodePubKeys []*cryptolib.PublicKey)
	ConsensusInstancesUpdated(activeConsensusInstances []consGR.ConsensusID)

	GetContents() io.Reader
}

func New

func New(
	ctx context.Context,
	chainID isc.ChainID,
	nodeIdentity *cryptolib.KeyPair,
	net peering.NetworkProvider,
	log *logger.Logger,
	metrics *metrics.ChainMempoolMetrics,
	pipeMetrics *metrics.ChainPipeMetrics,
	listener ChainListener,
	ttl time.Duration,
	broadcastInterval time.Duration,
) Mempool

type OrderedPoolEntry added in v1.0.3

type OrderedPoolEntry[V isc.OffLedgerRequest] struct {
	// contains filtered or unexported fields
}

type RequestPool added in v1.0.3

type RequestPool[V isc.Request] interface {
	Has(reqRef *isc.RequestRef) bool
	Get(reqRef *isc.RequestRef) V
	Add(request V)
	Remove(request V)
	// this removes requests from the pool if predicate returns false
	Filter(predicate func(request V, ts time.Time) bool)
	Iterate(f func(e *typedPoolEntry[V]))
	StatusString() string
	WriteContent(io.Writer)
}

func NewTypedPool added in v1.0.3

func NewTypedPool[V isc.Request](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V]

type TimePool added in v1.0.3

type TimePool interface {
	AddRequest(timestamp time.Time, request isc.Request)
	TakeTill(timestamp time.Time) []isc.Request
	Has(reqID *isc.RequestRef) bool
	Filter(predicate func(request isc.Request, ts time.Time) bool)
}

Maintains a pool of requests that have to be postponed until specified timestamp.

func NewTimePool added in v1.0.3

func NewTimePool(sizeMetric func(int), log *logger.Logger) TimePool

type TypedPoolByNonce added in v1.0.3

type TypedPoolByNonce[V isc.OffLedgerRequest] struct {
	// contains filtered or unexported fields
}

keeps a map of requests ordered by nonce for each account

func NewTypedPoolByNonce added in v1.0.3

func NewTypedPoolByNonce[V isc.OffLedgerRequest](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *TypedPoolByNonce[V]

func (*TypedPoolByNonce[V]) Add added in v1.0.3

func (p *TypedPoolByNonce[V]) Add(request V)

func (*TypedPoolByNonce[V]) Filter added in v1.0.3

func (p *TypedPoolByNonce[V]) Filter(predicate func(request V, ts time.Time) bool)

func (*TypedPoolByNonce[V]) Get added in v1.0.3

func (p *TypedPoolByNonce[V]) Get(reqRef *isc.RequestRef) V

func (*TypedPoolByNonce[V]) Has added in v1.0.3

func (p *TypedPoolByNonce[V]) Has(reqRef *isc.RequestRef) bool

func (*TypedPoolByNonce[V]) Iterate added in v1.0.3

func (p *TypedPoolByNonce[V]) Iterate(f func(account string, requests []*OrderedPoolEntry[V]))

func (*TypedPoolByNonce[V]) Remove added in v1.0.3

func (p *TypedPoolByNonce[V]) Remove(request V)

func (*TypedPoolByNonce[V]) StatusString added in v1.0.3

func (p *TypedPoolByNonce[V]) StatusString() string

func (*TypedPoolByNonce[V]) WriteContent added in v1.0.3

func (p *TypedPoolByNonce[V]) WriteContent(w io.Writer)

type WaitReq added in v1.0.3

type WaitReq interface {
	WaitMany(ctx context.Context, reqRefs []*isc.RequestRef, cb func(req isc.Request)) // Called per block.
	WaitAny(ctx context.Context, cb func(req isc.Request))                             // Called per block.
	MarkAvailable(req isc.Request)                                                     // Called often, per request.
}

This object provides the synchronization between the functions asking for requests and the arriving requests. The former have to be notified upon reception of the latter.

func NewWaitReq added in v1.0.3

func NewWaitReq(cleanupEvery int) WaitReq

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL