stagedsync

package
v3.0.0-alpha5.0...-c974331 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: LGPL-3.0 Imports: 105 Imported by: 0

README

Staged Sync

Staged Sync is a version of Go-Ethereum's Full Sync that was rearchitected for better performance.

It is I/O intensive and even though we have a goal of being able to sync the node on an HDD, we still recommend using fast SSDs.

Staged Sync, as its name suggests, consists of 10 stages that are executed in order, one after another.

How The Sync Works

For each peer Erigon learns what the HEAD blocks is and it executes each stage in order for the missing blocks between the local HEAD block and the peer's head blocks.

The first stage (downloading headers) sets the local HEAD block.

Each stage is executed in order and a stage N does not stop until the local head is reached for it.

That means, that in the ideal scenario (no network interruptions, the app isn't restarted, etc), for the full initial sync, each stage will be executed exactly once.

After the last stage is finished, the process starts from the beginning, by looking for the new headers to download.

If the app is restarted in between stages, it restarts from the first stage.

If the app is restarted in the middle of the stage execution, it restarts from that stage, giving it the opportunity to complete.

How long do the stages take?

Here is a pie chart showing the proportional time spent on each stage (it was taken from the full sync). It is by all means just an estimation, but it gives an idea.

Full sync breakdown

Reorgs / Unwinds

Sometimes the chain makes a reorg and we need to "undo" some parts of our sync.

This happens backward from the last stage to the first one with one caveat that txn pool is updated after we already unwound the execution so we know the new nonces.

That is the example of stages order to be unwound (unwind happens from right to left).

state.unwindOrder = []*Stage{
        // Unwinding of txn pool (reinjecting transactions into the pool needs to happen after unwinding execution)
        stages[0], stages[1], stages[2], stages[9], stages[3], stages[4], stages[5], stages[6], stages[7], stages[8],
}

Preprocessing with ETL

Some stages use our ETL framework to sort data by keys before inserting it into the database.

That allows to reduce db write amplification significantly.

So, when we are generating indexes or hashed state, we do a multi-step process.

  1. We write the processed data into a couple of temp files in your data directory;
  2. We then use a heap to insert data from the temp files into the database, in the order that minimizes db write amplification.

This optimization sometimes leads to dramatic (orders of magnitude) write speed improvements.

What happens after the Merge?

In the Proof-of-Stake world staged sync becomes somewhat more complicated, as the following diagram shows. Staged Sync in PoS

Stages (for the up to date list see stages.go and stagebuilder.go)

Each stage consists of 2 functions ExecFunc that progresses the stage forward and UnwindFunc that unwinds the stage backwards.

Most of the stages can work offline though it isn't implemented in the current version.

We can add/remove stages, so exact stage numbers may change - but order and names stay the same.

Stage 1: Snapshots

Download Snapshots

Stage 2: Download Headers Stage

During this stage we download all the headers between the local HEAD and our peer's head.

This stage is CPU intensive and can benefit from a multicore processor due to verifying PoW of the headers.

Most of the unwinds are initiated on this stage due to the chain reorgs.

This stage promotes local HEAD pointer.

Stage 3: Cumulative Index

Calculate how much gas has been used up to each block.

Stage 4: Block Hashes

Creates an index of blockHash -> blockNumber extracted from the headers for faster lookups and making the sync friendlier for HDDs.

Stage 5: Download Block Bodies Stage

At that stage, we download bodies for block headers that we already downloaded.

That is the most intensive stage for the network connection, the vast majority of data is downloaded here.

Stage 6: Recover Senders Stage

This stage recovers and stores senders for each transaction in each downloaded block.

This is also a CPU intensive stage and also benefits from multi-core CPUs.

This stage doesn't use any network connection.

Stage 7: Execute Blocks Stage

During this stage, we execute block-by-block everything that we downloaded before.

One important point there, that we don't check root hashes during this execution, we don't even build a merkle trie here.

This stage is single threaded.

This stage doesn't use internet connection.

This stage is disk intensive.

This stage can spawn unwinds if the block execution fails.

Stage 8: Transpile marked VM contracts to TEVM

Translation each marked for translation contract (from EVM to TEVM)

Stage 9: VerkleTrie

[TODO]

Stage 10: Compute State Root Stage

This stage build the Merkle trie and checks the root hash for the current state.

It also builds Intermediate Hashes along the way and stores them into the database.

If there were no intermediate hashes stored before (that could happen during the first initial sync), it builds the full Merkle Trie and its root hash.

If there are intermediate hashes in the database, it uses the block history to figure out which ones are outdated and which ones are still up to date. Then it builds a partial Merkle trie using the up-to-date hashes and only rebuilding the outdated ones.

If the root hash doesn't match, it initiates an unwind one block backwards.

This stage doesn't use a network connection.

Stage 11: Generate Hashed State Stage

Erigon during execution uses Plain state storage.

Plain State: Instead of the normal (we call it "Hashed State") where accounts and storage items are addressed as keccak256(address), in the plain state them are addressed by the address itself.

Though, to make sure that some APIs work and keep the compatibility with the other clients, we generate Hashed state as well.

If the hashed state is not empty, then we are looking at the History ChangeSets and update only the items that were changed.

This stage doesn't use a network connection.

Stages 12, 13, 14, 15 and 16: Generate Indexes

There are 5 indexes that are generated during sync.

They might be disabled because they aren't used for all the APIs.

These stages do not use a network connection.

Account History Index

This index stores the mapping from the account address to the list of blocks where this account was changed in some way.

Storage History Index

This index stores the mapping from the storage item address to the list of blocks where this storage item was changed in some way.

Log Index

This index sets up a link from the [TODO] to [TODO].

Call traces index

[TODO]

Tx Lookup Index

This index sets up a link from the transaction hash to the block number.

Stage 16: Transaction Pool Stage

During this stage we start the transaction pool or update its state. For instance, we remove the transactions from the blocks we have downloaded from the pool.

On unwinds, we add the transactions from the blocks we unwind, back to the pool.

This stage doesn't use a network connection.

Stage 17: Finish

This stage sets the current block number that is then used by RPC calls, such as eth_blockNumber.

Documentation

Index

Constants

View Source
const ShortPoSReorgThresholdBlocks = 10

The number of blocks we should be able to re-org sub-second on commodity hardware. See https://hackmd.io/TdJtNs0dS56q-In8h-ShSg

Variables

View Source
var (
	ErrHeaderValidatorsLengthMismatch = errors.New("header validators length mismatch")
	ErrHeaderValidatorsBytesMismatch  = errors.New("header validators bytes mismatch")
)
View Source
var ErrInvalidStateRootHash = errors.New("invalid state root hash")
View Source
var ErrTooDeepUnwind = errors.New("too deep unwind")
View Source
var ExecUnwind = UnwindReason{nil, nil}
View Source
var ForkChoice = UnwindReason{nil, nil}
View Source
var MiningPruneOrder = PruneOrder{} // nothing to unwind in mining - because mining does not commit db changes
View Source
var MiningUnwindOrder = UnwindOrder{} // nothing to unwind in mining - because mining does not commit db changes
View Source
var StagedUnwind = UnwindReason{nil, nil}

Functions

func BodiesForward

func BodiesForward(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg BodiesCfg, test bool, logger log.Logger) error

BodiesForward progresses Bodies stage in the forward direction

func BorHeimdallForward

func BorHeimdallForward(
	s *StageState,
	u Unwinder,
	ctx context.Context,
	tx kv.RwTx,
	cfg BorHeimdallCfg,
	logger log.Logger,
) (err error)

func BorHeimdallUnwind

func BorHeimdallUnwind(u *UnwindState, ctx context.Context, _ *StageState, tx kv.RwTx, cfg BorHeimdallCfg) (err error)

func CollectTableSizes

func CollectTableSizes(db kv.RoDB, tx kv.Tx, buckets []string) []interface{}

func DownloadAndIndexSnapshotsIfNeed

func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.RwTx, cfg SnapshotsCfg, logger log.Logger) error

func ExecBlockV3

func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error)

func ExecV3

func ExecV3(ctx context.Context,
	execStage *StageState, u Unwinder, workerCount int, cfg ExecuteBlockCfg, txc wrap.TxContainer,
	parallel bool,
	maxBlockNum uint64,
	logger log.Logger,
	initialCycle bool,
	isMining bool,
) error

func FetchSpanZeroForMiningIfNeeded

func FetchSpanZeroForMiningIfNeeded(
	ctx context.Context,
	db kv.RwDB,
	blockReader services.FullBlockReader,
	heimdallClient heimdall.HeimdallClient,
	heimdallStore heimdall.Store,
	logger log.Logger,
) error

func FillDBFromSnapshots

func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs datadir.Dirs, blockReader services.FullBlockReader, agg *state.Aggregator, logger log.Logger) error

func FinishForward

func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg) error

func ForwardPolygonSyncStage

func ForwardPolygonSyncStage(
	ctx context.Context,
	tx kv.RwTx,
	stageState *StageState,
	unwinder Unwinder,
	cfg PolygonSyncStageCfg,
) error

func HeadersPOW

func HeadersPOW(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, test bool, useExternalTx bool, logger log.Logger) error

HeadersPOW progresses Headers stage for Proof-of-Work headers

func HeadersUnwind

func HeadersUnwind(ctx context.Context, u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, test bool) (err error)

func MiningBorHeimdallForward

func MiningBorHeimdallForward(
	ctx context.Context,
	cfg BorHeimdallCfg,
	stageStage *StageState,
	unwinder Unwinder,
	tx kv.RwTx,
	logger log.Logger,
) error

func NotifyNewHeaders

func NotifyNewHeaders(ctx context.Context, notifyFrom, notifyTo uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger) error

[from,to)

func NotifyPendingLogs

func NotifyPendingLogs(logPrefix string, notifier ChainEventNotifier, logs types.Logs, logger log.Logger)

func PruneBlockHashStage

func PruneBlockHashStage(p *PruneState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error)

func PruneExecutionStage

func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx context.Context) (err error)

func PruneFinish

func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context) (err error)

func PruneSendersStage

func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error)

func PruneTxLookup

func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Context, logger log.Logger) (err error)

func PruneVerkleTries

func PruneVerkleTries(s *PruneState, tx kv.RwTx, cfg TrieCfg, ctx context.Context) (err error)

func RebuildPatriciaTrieBasedOnFiles

func RebuildPatriciaTrieBasedOnFiles(ctx context.Context, cfg TrieCfg) (libcommon.Hash, error)

func SnapshotsPrune

func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.RwTx, logger log.Logger) (err error)
====== PRUNING ======

snapshots pruning sections works more as a retiring of blocks retiring blocks means moving block data from db into snapshots

func SpawnBlockHashStage

func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context, logger log.Logger) (err error)

func SpawnCustomTrace

func SpawnCustomTrace(cfg CustomTraceCfg, ctx context.Context, logger log.Logger) error

func SpawnExecuteBlocksStage

func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error)

func SpawnMiningCreateBlockStage

func SpawnMiningCreateBlockStage(s *StageState, txc wrap.TxContainer, cfg MiningCreateBlockCfg, quit <-chan struct{}, logger log.Logger) (err error)

SpawnMiningCreateBlockStage TODO: - resubmitAdjustCh - variable is not implemented

func SpawnMiningExecStage

func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg, sendersCfg SendersCfg, execCfg ExecuteBlockCfg, ctx context.Context, logger log.Logger, u Unwinder) error

SpawnMiningExecStage TODO: - resubmitAdjustCh - variable is not implemented

func SpawnMiningFinishStage

func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit <-chan struct{}, logger log.Logger) error

func SpawnPostExecStage

func SpawnPostExecStage(s *StageState, tx kv.RwTx, cfg PostExecCfg, ctx context.Context) error

func SpawnRecoverSendersStage

func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.RwTx, toBlock uint64, ctx context.Context, logger log.Logger) error

func SpawnStageHeaders

func SpawnStageHeaders(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, test bool, logger log.Logger) error

func SpawnStageSnapshots

func SpawnStageSnapshots(
	s *StageState,
	ctx context.Context,
	tx kv.RwTx,
	cfg SnapshotsCfg,
	logger log.Logger,
) (err error)

func SpawnTxLookup

func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, ctx context.Context, logger log.Logger) (err error)

func SpawnVerkleTrie

func SpawnVerkleTrie(s *StageState, u Unwinder, tx kv.RwTx, cfg TrieCfg, ctx context.Context, logger log.Logger) (libcommon.Hash, error)

func UnwindBlockHashStage

func UnwindBlockHashStage(u *UnwindState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error)

func UnwindBodiesStage

func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg, ctx context.Context) (err error)

func UnwindExecutionStage

func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error)

func UnwindFinish

func UnwindFinish(u *UnwindState, tx kv.RwTx, cfg FinishCfg, ctx context.Context) (err error)

func UnwindPolygonSyncStage

func UnwindPolygonSyncStage(ctx context.Context, tx kv.RwTx, u *UnwindState, cfg PolygonSyncStageCfg) error

func UnwindPostExecStage

func UnwindPostExecStage(u *UnwindState, s *StageState, tx kv.RwTx, cfg PostExecCfg, ctx context.Context) (err error)

func UnwindSendersStage

func UnwindSendersStage(u *UnwindState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error)

func UnwindTxLookup

func UnwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Context, logger log.Logger) (err error)

func UnwindVerkleTrie

func UnwindVerkleTrie(u *UnwindState, s *StageState, tx kv.RwTx, cfg TrieCfg, ctx context.Context, logger log.Logger) (err error)

Types

type BlockHashesCfg

type BlockHashesCfg struct {
	// contains filtered or unexported fields
}

func StageBlockHashesCfg

func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config, headerWriter *blockio.BlockWriter) BlockHashesCfg

type BodiesCfg

type BodiesCfg struct {
	// contains filtered or unexported fields
}

func StageBodiesCfg

func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload,
	bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem),
	blockPropagator adapter.BlockPropagator, timeout int,
	chanConfig chain.Config,
	blockReader services.FullBlockReader,
	blockWriter *blockio.BlockWriter,
) BodiesCfg

type BorHeimdallCfg

type BorHeimdallCfg struct {
	// contains filtered or unexported fields
}

func StageBorHeimdallCfg

func StageBorHeimdallCfg(
	db kv.RwDB,
	snapDb kv.RwDB,
	miningState MiningState,
	chainConfig chain.Config,
	heimdallClient heimdall.HeimdallClient,
	heimdallStore heimdall.Store,
	bridgeStore bridge.Store,
	blockReader services.FullBlockReader,
	hd *headerdownload.HeaderDownload,
	penalize func(context.Context, []headerdownload.PenaltyItem),
	recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
	signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
	recordWaypoints bool,
	userUnwindTypeOverrides []string,
) BorHeimdallCfg

type ChainEventNotifier

type ChainEventNotifier interface {
	OnNewHeader(newHeadersRlp [][]byte)
	OnNewPendingLogs(types.Logs)
	OnLogs([]*remote.SubscribeLogsReply)
	HasLogSubsriptions() bool
}

type ChainReader

type ChainReader struct {
	Cfg         chain.Config
	Db          kv.Tx
	BlockReader services.FullBlockReader
	Logger      log.Logger
}

ChainReader implements consensus.ChainReader

func (ChainReader) BorEventsByBlock

func (cr ChainReader) BorEventsByBlock(_ libcommon.Hash, _ uint64) []rlp.RawValue

func (ChainReader) BorSpan

func (cr ChainReader) BorSpan(spanId uint64) *heimdall.Span

func (ChainReader) BorStartEventId

func (cr ChainReader) BorStartEventId(_ libcommon.Hash, _ uint64) uint64

func (ChainReader) Config

func (cr ChainReader) Config() *chain.Config

Config retrieves the blockchain's chain configuration.

func (ChainReader) CurrentFinalizedHeader

func (cr ChainReader) CurrentFinalizedHeader() *types.Header

CurrentFinalizedHeader retrieves the current finalized header from the local chain.

func (ChainReader) CurrentHeader

func (cr ChainReader) CurrentHeader() *types.Header

CurrentHeader retrieves the current header from the local chain.

func (ChainReader) CurrentSafeHeader

func (cr ChainReader) CurrentSafeHeader() *types.Header

func (ChainReader) FrozenBlocks

func (cr ChainReader) FrozenBlocks() uint64

func (ChainReader) FrozenBorBlocks

func (cr ChainReader) FrozenBorBlocks() uint64

func (ChainReader) GetBlock

func (cr ChainReader) GetBlock(hash libcommon.Hash, number uint64) *types.Block

GetBlock retrieves a block from the database by hash and number.

func (ChainReader) GetHeader

func (cr ChainReader) GetHeader(hash libcommon.Hash, number uint64) *types.Header

GetHeader retrieves a block header from the database by hash and number.

func (ChainReader) GetHeaderByHash

func (cr ChainReader) GetHeaderByHash(hash libcommon.Hash) *types.Header

GetHeaderByHash retrieves a block header from the database by its hash.

func (ChainReader) GetHeaderByNumber

func (cr ChainReader) GetHeaderByNumber(number uint64) *types.Header

GetHeaderByNumber retrieves a block header from the database by number.

func (ChainReader) GetTd

func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int

GetTd retrieves the total difficulty from the database by hash and number.

func (ChainReader) HasBlock

func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool

HasBlock retrieves a block from the database by hash and number.

type ChainReaderImpl

type ChainReaderImpl struct {
	// contains filtered or unexported fields
}

func NewChainReaderImpl

func NewChainReaderImpl(config *chain.Config, tx kv.Tx, blockReader services.FullBlockReader, logger log.Logger) *ChainReaderImpl

func (ChainReaderImpl) BorEventsByBlock

func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue

func (ChainReaderImpl) BorSpan

func (cr ChainReaderImpl) BorSpan(spanId uint64) *heimdall.Span

func (ChainReaderImpl) BorStartEventId

func (cr ChainReaderImpl) BorStartEventId(hash libcommon.Hash, blockNum uint64) uint64

func (ChainReaderImpl) Config

func (cr ChainReaderImpl) Config() *chain.Config

func (ChainReaderImpl) CurrentFinalizedHeader

func (cr ChainReaderImpl) CurrentFinalizedHeader() *types.Header

func (ChainReaderImpl) CurrentHeader

func (cr ChainReaderImpl) CurrentHeader() *types.Header

func (ChainReaderImpl) CurrentSafeHeader

func (cr ChainReaderImpl) CurrentSafeHeader() *types.Header

func (ChainReaderImpl) FrozenBlocks

func (cr ChainReaderImpl) FrozenBlocks() uint64

func (ChainReaderImpl) FrozenBorBlocks

func (cr ChainReaderImpl) FrozenBorBlocks() uint64

func (ChainReaderImpl) GetBlock

func (cr ChainReaderImpl) GetBlock(hash libcommon.Hash, number uint64) *types.Block

func (ChainReaderImpl) GetHeader

func (cr ChainReaderImpl) GetHeader(hash libcommon.Hash, number uint64) *types.Header

func (ChainReaderImpl) GetHeaderByHash

func (cr ChainReaderImpl) GetHeaderByHash(hash libcommon.Hash) *types.Header

func (ChainReaderImpl) GetHeaderByNumber

func (cr ChainReaderImpl) GetHeaderByNumber(number uint64) *types.Header

func (ChainReaderImpl) GetTd

func (cr ChainReaderImpl) GetTd(hash libcommon.Hash, number uint64) *big.Int

func (ChainReaderImpl) HasBlock

func (cr ChainReaderImpl) HasBlock(hash libcommon.Hash, number uint64) bool

type CurrentSyncCycleInfo

type CurrentSyncCycleInfo struct {
	IsInitialCycle bool // means: not-on-chain-tip. can be several sync cycle in this mode.
	IsFirstCycle   bool // means: first cycle
}

type CustomTraceCfg

type CustomTraceCfg struct {
	// contains filtered or unexported fields
}

func StageCustomTraceCfg

func StageCustomTraceCfg(db kv.RwDB, prune prune.Mode, dirs datadir.Dirs, br services.FullBlockReader, cc *chain.Config,
	engine consensus.Engine, genesis *types.Genesis, syncCfg *ethconfig.Sync) CustomTraceCfg

type DownloaderGlue

type DownloaderGlue interface {
	SpawnHeaderDownloadStage([]func() error, *StageState, Unwinder) error
	SpawnBodyDownloadStage(string, string, *StageState, Unwinder, *bodydownload.PrefetchedBlocks) (bool, error)
}

type ExecFunc

type ExecFunc func(badBlockUnwind bool, s *StageState, unwinder Unwinder, txc wrap.TxContainer, logger log.Logger) error

ExecFunc is the execution function for the stage to move forward. * state - is the current state of the stage and contains stage data. * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used.

type ExecuteBlockCfg

type ExecuteBlockCfg struct {
	// contains filtered or unexported fields
}

func StageExecuteBlocksCfg

func StageExecuteBlocksCfg(
	db kv.RwDB,
	pm prune.Mode,
	batchSize datasize.ByteSize,
	chainConfig *chain.Config,
	engine consensus.Engine,
	vmConfig *vm.Config,
	notifications *shards.Notifications,
	stateStream bool,
	badBlockHalt bool,
	keepAllChangesets bool,

	dirs datadir.Dirs,
	blockReader services.FullBlockReader,
	hd headerDownloader,
	genesis *types.Genesis,
	syncCfg ethconfig.Sync,
	silkworm *silkworm.Silkworm,
) ExecuteBlockCfg

type FinishCfg

type FinishCfg struct {
	// contains filtered or unexported fields
}

func StageFinishCfg

func StageFinishCfg(db kv.RwDB, tmpDir string, forkValidator *engine_helpers.ForkValidator) FinishCfg

type HeadersCfg

type HeadersCfg struct {
	// contains filtered or unexported fields
}

func StageHeadersCfg

func StageHeadersCfg(
	db kv.RwDB,
	headerDownload *headerdownload.HeaderDownload,
	bodyDownload *bodydownload.BodyDownload,
	chainConfig chain.Config,
	syncConfig ethconfig.Sync,
	headerReqSend func(context.Context, *headerdownload.HeaderRequest) ([64]byte, bool),
	announceNewHashes func(context.Context, []headerdownload.Announce),
	penalize func(context.Context, []headerdownload.PenaltyItem),
	batchSize datasize.ByteSize,
	noP2PDiscovery bool,
	blockReader services.FullBlockReader,
	blockWriter *blockio.BlockWriter,
	tmpdir string,
	notifications *shards.Notifications,
) HeadersCfg

type MiningBlock

type MiningBlock struct {
	ParentHeaderTime uint64
	Header           *types.Header
	Uncles           []*types.Header
	Txs              types.Transactions
	Receipts         types.Receipts
	Withdrawals      []*types.Withdrawal
	PreparedTxs      types.TransactionsStream
	Requests         types.FlatRequests
}

type MiningCreateBlockCfg

type MiningCreateBlockCfg struct {
	// contains filtered or unexported fields
}

func StageMiningCreateBlockCfg

func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig chain.Config, engine consensus.Engine, txPoolDB kv.RoDB, blockBuilderParameters *core.BlockBuilderParameters, tmpdir string, blockReader services.FullBlockReader) MiningCreateBlockCfg

type MiningExecCfg

type MiningExecCfg struct {
	// contains filtered or unexported fields
}

func StageMiningExecCfg

func StageMiningExecCfg(
	db kv.RwDB, miningState MiningState,
	notifier ChainEventNotifier, chainConfig chain.Config,
	engine consensus.Engine, vmConfig *vm.Config,
	tmpdir string, interrupt *int32, payloadId uint64,
	txPool TxPoolForMining, txPoolDB kv.RoDB,
	blockReader services.FullBlockReader,
) MiningExecCfg

type MiningFinishCfg

type MiningFinishCfg struct {
	// contains filtered or unexported fields
}

func StageMiningFinishCfg

func StageMiningFinishCfg(
	db kv.RwDB,
	chainConfig chain.Config,
	engine consensus.Engine,
	miningState MiningState,
	sealCancel chan struct{},
	blockReader services.FullBlockReader,
	latestBlockBuiltStore *builder.LatestBlockBuiltStore,
) MiningFinishCfg

type MiningState

type MiningState struct {
	MiningConfig    *params.MiningConfig
	PendingResultCh chan *types.Block
	MiningResultCh  chan *types.BlockWithReceipts
	MiningBlock     *MiningBlock
}

func NewMiningState

func NewMiningState(cfg *params.MiningConfig) MiningState

type PolygonSyncStageCfg

type PolygonSyncStageCfg struct {
	// contains filtered or unexported fields
}

func NewPolygonSyncStageCfg

func NewPolygonSyncStageCfg(
	logger log.Logger,
	chainConfig *chain.Config,
	db kv.RwDB,
	heimdallClient heimdall.HeimdallClient,
	heimdallStore heimdall.Store,
	bridgeStore bridge.Store,
	sentry sentryproto.SentryClient,
	maxPeers int,
	statusDataProvider *sentry.StatusDataProvider,
	blockReader services.FullBlockReader,
	stopNode func() error,
	blockLimit uint,
	userUnwindTypeOverrides []string,
) PolygonSyncStageCfg

type PostExecCfg

type PostExecCfg struct {
	// contains filtered or unexported fields
}

func StagePostExecCfg

func StagePostExecCfg(db kv.RwDB, borDb kv.RwDB) PostExecCfg

type Progress

type Progress struct {
	// contains filtered or unexported fields
}

func NewProgress

func NewProgress(prevOutputBlockNum, commitThreshold uint64, workersCount int, updateMetrics bool, logPrefix string, logger log.Logger) *Progress

func (*Progress) Log

func (p *Progress) Log(suffix string, rs *state.StateV3, in *state.QueueWithRetry, rws *state.ResultsQueue, txCount uint64, gas uint64, inputBlockNum uint64, outputBlockNum uint64, outTxNum uint64, repeatCount uint64, idxStepsAmountInDB float64, shouldGenerateChangesets bool)

type PruneFunc

type PruneFunc func(p *PruneState, tx kv.RwTx, logger log.Logger) error

PruneFunc is the execution function for the stage to prune old data. * state - is the current state of the stage and contains stage data.

type PruneOrder

type PruneOrder []stages.SyncStage

type PruneState

type PruneState struct {
	ID              stages.SyncStage
	ForwardProgress uint64 // progress of stage forward move
	PruneProgress   uint64 // progress of stage prune move. after sync cycle it become equal to ForwardProgress by Done() method

	CurrentSyncCycle CurrentSyncCycleInfo
	// contains filtered or unexported fields
}

func (*PruneState) Done

func (s *PruneState) Done(db kv.Putter) error

func (*PruneState) DoneAt

func (s *PruneState) DoneAt(db kv.Putter, blockNum uint64) error

func (*PruneState) LogPrefix

func (s *PruneState) LogPrefix() string

type SendersCfg

type SendersCfg struct {
	// contains filtered or unexported fields
}

func StageSendersCfg

func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload) SendersCfg

type SnapshotsCfg

type SnapshotsCfg struct {
	// contains filtered or unexported fields
}

func StageSnapshotsCfg

func StageSnapshotsCfg(db kv.RwDB,
	chainConfig chain.Config,
	syncConfig ethconfig.Sync,
	dirs datadir.Dirs,
	blockRetire services.BlockRetire,
	snapshotDownloader protodownloader.DownloaderClient,
	blockReader services.FullBlockReader,
	notifier *shards.Notifications,
	agg *state.Aggregator,
	caplin bool,
	blobs bool,
	silkworm *silkworm.Silkworm,
	prune prune.Mode,
) SnapshotsCfg

type Stage

type Stage struct {
	// Description is a string that is shown in the logs.
	Description string
	// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
	DisabledDescription string
	// Forward is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil!
	Forward ExecFunc
	// Unwind is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil!
	Unwind UnwindFunc
	Prune  PruneFunc
	// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
	ID stages.SyncStage
	// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
	Disabled bool
}

Stage is a single sync stage in staged sync.

func DefaultStages

func DefaultStages(ctx context.Context,
	snapshots SnapshotsCfg,
	headers HeadersCfg,
	borHeimdallCfg BorHeimdallCfg,
	blockHashCfg BlockHashesCfg,
	bodies BodiesCfg,
	senders SendersCfg,
	exec ExecuteBlockCfg,
	txLookup TxLookupCfg,
	finish FinishCfg,
	test bool) []*Stage

func DownloadSyncStages

func DownloadSyncStages(
	ctx context.Context,
	snapshots SnapshotsCfg,
) []*Stage

func MiningStages

func MiningStages(
	ctx context.Context,
	createBlockCfg MiningCreateBlockCfg,
	borHeimdallCfg BorHeimdallCfg,
	executeBlockCfg ExecuteBlockCfg,
	sendersCfg SendersCfg,
	execCfg MiningExecCfg,
	finish MiningFinishCfg,
) []*Stage

func PipelineStages

func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage

func PolygonSyncStages

func PolygonSyncStages(
	ctx context.Context,
	snapshots SnapshotsCfg,
	polygonSyncStageCfg PolygonSyncStageCfg,
	senders SendersCfg,
	exec ExecuteBlockCfg,
	txLookup TxLookupCfg,
	finish FinishCfg,
) []*Stage

func StateStages

func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg) []*Stage

StateStages are all stages necessary for basic unwind and stage computation, it is primarily used to process side forks and memory execution.

func UploaderPipelineStages

func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, bodies BodiesCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage

when uploading - potentially from zero we need to include headers and bodies stages otherwise we won't recover the POW portion of the chain

type StageState

type StageState struct {
	ID          stages.SyncStage
	BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution.

	CurrentSyncCycle CurrentSyncCycleInfo
	// contains filtered or unexported fields
}

StageState is the state of the stage.

func (*StageState) ExecutionAt

func (s *StageState) ExecutionAt(db kv.Getter) (uint64, error)

ExecutionAt gets the current state of the "Execution" stage, which block is currently executed.

func (*StageState) LogPrefix

func (s *StageState) LogPrefix() string

func (*StageState) Update

func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error

Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.

func (*StageState) UpdatePrune

func (s *StageState) UpdatePrune(db kv.Putter, blockNum uint64) error

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

func New

func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync

func (*Sync) Cfg

func (s *Sync) Cfg() ethconfig.Sync

func (*Sync) DisableAllStages

func (s *Sync) DisableAllStages() []stages.SyncStage

DisableAllStages - including their unwinds

func (*Sync) DisableStages

func (s *Sync) DisableStages(ids ...stages.SyncStage)

func (*Sync) EnableStages

func (s *Sync) EnableStages(ids ...stages.SyncStage)

func (*Sync) HasUnwindPoint

func (s *Sync) HasUnwindPoint() bool

func (*Sync) IsAfter

func (s *Sync) IsAfter(stage1, stage2 stages.SyncStage) bool

IsAfter returns true if stage1 goes after stage2 in staged sync

func (*Sync) IsBefore

func (s *Sync) IsBefore(stage1, stage2 stages.SyncStage) bool

IsBefore returns true if stage1 goes before stage2 in staged sync

func (*Sync) IsDone

func (s *Sync) IsDone() bool

func (*Sync) Len

func (s *Sync) Len() int

func (*Sync) LogPrefix

func (s *Sync) LogPrefix() string

func (*Sync) MockExecFunc

func (s *Sync) MockExecFunc(id stages.SyncStage, f ExecFunc)

func (*Sync) NewUnwindState

func (s *Sync) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64, initialCycle, firstCycle bool) *UnwindState

func (*Sync) NextStage

func (s *Sync) NextStage()

func (*Sync) PrevUnwindPoint

func (s *Sync) PrevUnwindPoint() *uint64

func (*Sync) PrintTimings

func (s *Sync) PrintTimings() []interface{}

func (*Sync) PruneStageState

func (s *Sync) PruneStageState(id stages.SyncStage, forwardProgress uint64, tx kv.Tx, db kv.RwDB, initialCycle bool) (*PruneState, error)

Get the current prune status from the DB

func (*Sync) Run

func (s *Sync) Run(db kv.RwDB, txc wrap.TxContainer, initialCycle, firstCycle bool) (bool, error)

func (*Sync) RunNoInterrupt

func (s *Sync) RunNoInterrupt(db kv.RwDB, txc wrap.TxContainer) error

func (*Sync) RunPrune

func (s *Sync) RunPrune(db kv.RwDB, tx kv.RwTx, initialCycle bool) error

Run pruning for stages as per the defined pruning order, if enabled for that stage

func (*Sync) RunUnwind

func (s *Sync) RunUnwind(db kv.RwDB, txc wrap.TxContainer) error

func (*Sync) SetCurrentStage

func (s *Sync) SetCurrentStage(id stages.SyncStage) error

func (*Sync) StageState

func (s *Sync) StageState(stage stages.SyncStage, tx kv.Tx, db kv.RoDB, initialCycle, firstCycle bool) (*StageState, error)

func (*Sync) StagesIdsList

func (s *Sync) StagesIdsList() []string

func (*Sync) UnwindPoint

func (s *Sync) UnwindPoint() uint64

func (*Sync) UnwindReason

func (s *Sync) UnwindReason() UnwindReason

func (*Sync) UnwindTo

func (s *Sync) UnwindTo(unwindPoint uint64, reason UnwindReason, tx kv.Tx) error

type Timing

type Timing struct {
	// contains filtered or unexported fields
}

type TrieCfg

type TrieCfg struct {
	// contains filtered or unexported fields
}

func StageTrieCfg

func StageTrieCfg(db kv.RwDB, checkRoot, saveNewHashesToDB, badBlockHalt bool, tmpDir string, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, historyV3 bool, agg *state.Aggregator) TrieCfg

type TxLookupCfg

type TxLookupCfg struct {
	// contains filtered or unexported fields
}

func StageTxLookupCfg

func StageTxLookupCfg(
	db kv.RwDB,
	prune prune.Mode,
	tmpdir string,
	borConfigInterface chain.BorConfig,
	blockReader services.FullBlockReader,
) TxLookupCfg

type TxPoolForMining

type TxPoolForMining interface {
	YieldBest(n uint16, txs *types2.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error)
}

type UnwindFunc

type UnwindFunc func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error

UnwindFunc is the unwinding logic of the stage. * unwindState - contains information about the unwind itself. * stageState - represents the state of this stage at the beginning of unwind.

type UnwindOrder

type UnwindOrder []stages.SyncStage

UnwindOrder represents the order in which the stages needs to be unwound. The unwind order is important and not always just stages going backwards. Let's say, there is txn pool can be unwound only after execution. It's ok to remove some stage from here to disable only unwind of stage

type UnwindReason

type UnwindReason struct {
	// If we're unwinding due to a fork - we want to unlink blocks but not mark
	// them as bad - as they may get replayed then deselected
	Block *libcommon.Hash
	// If unwind is caused by a bad block, this error is not empty
	Err error
}

func BadBlock

func BadBlock(badBlock libcommon.Hash, err error) UnwindReason

func ForkReset

func ForkReset(badBlock libcommon.Hash) UnwindReason

func (UnwindReason) IsBadBlock

func (u UnwindReason) IsBadBlock() bool

type UnwindState

type UnwindState struct {
	ID stages.SyncStage
	// UnwindPoint is the block to unwind to.
	UnwindPoint        uint64
	CurrentBlockNumber uint64
	Reason             UnwindReason

	CurrentSyncCycle CurrentSyncCycleInfo
	// contains filtered or unexported fields
}

UnwindState contains the information about unwind.

func (*UnwindState) Done

func (u *UnwindState) Done(db kv.Putter) error

Done updates the DB state of the stage.

func (*UnwindState) LogPrefix

func (u *UnwindState) LogPrefix() string

type Unwinder

type Unwinder interface {
	// UnwindTo begins staged sync unwind to the specified block.
	UnwindTo(unwindPoint uint64, reason UnwindReason, tx kv.Tx) error
	HasUnwindPoint() bool
	LogPrefix() string
}

Unwinder allows the stage to cause an unwind.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL