stagedsync

package
v0.0.0-...-215623d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2021 License: GPL-3.0 Imports: 53 Imported by: 0

README

Staged Sync

Staged Sync is a version of Go-Ethereum's Full Sync that was rearchitected for better performance.

It is I/O intensive and even though we have a goal on being able to sync the node on an HDD, we still recommend using fast SSDs.

Staged Sync, as its name suggests, consists of 10 stages that are executed in order, one after another.

How The Sync Works

For each peer Turbo-Geth learns what the HEAD blocks is and it executes each stage in order for the missing blocks between the local HEAD block and the peer's head blocks.

The first stage (downloading headers) sets the local HEAD block.

Each stage is executed in order and a stage N does not stop until the local head is reached for it.

That mean, that in the ideal scenario (no network interruptions, the app isn't restarted, etc), for the full initial sync, each stage will be executed exactly once.

After the last stage is finished, the process starts from the beginning, by looking for the new headers to download.

If the app is restarted in between stages, it restarts from the first stage.

If the app is restared in the middle of the stage execution, it restarts from that stage, giving it the opportunity to complete.

How long do the stages take?

Here is a pie chart showing the proportional time spent on each stage (it was taken from the full sync). It is by all means just an estimation, but it gives an idea.

Reorgs / Unwinds

Sometimes the chain makes a reorg and we need to "undo" some parts of our sync.

This happens backward from the last stage to the first one with one caveat that tx pool is updated after we already unwound the execution so we know the new nonces.

That is the example of stages order to be unwound (unwind happens from right to left).

state.unwindOrder = []*Stage{
		// Unwinding of tx pool (reinjecting transactions into the pool needs to happen after unwinding execution)
		stages[0], stages[1], stages[2], stages[9], stages[3], stages[4], stages[5], stages[6], stages[7], stages[8],
	}

Preprocessing with ETL

Some stages use our ETL framework to sort data by keys before inserting it into the database.

That allows to reduce db write amplification significantly.

So, when we are generating indexes or hashed state, we do a multi-step process.

  1. We write the processed data into a couple of temp files in your data directory;
  2. We then use a heap to insert data from the temp files into the database, in the order that minimizes db write amplification.

This optimization sometimes leads to dramatic (orders of magnitude) write speed improvements.

Stages (for the up to date list see stagedsync.go):

Each stage consists of 2 functions ExecFunc that progesses the stage forward and UnwindFunc that unwinds the stage backwards.

Some of the stages can theoretically work offline though it isn't implemented in the current version.

Stage 1: Download Headers Stage

During this stage we download all the headers between the local HEAD and our peer's head.

This stage is CPU intensive and can benefit from a multicore processor due to verifying PoW of the headers.

Most of the unwinds are initiated on this stage due to the chain reorgs.

This stage promotes local HEAD pointer.

Stage 2: Block Hashes

Creates an index of blockHash -> blockNumber extracted from the headers for faster lookups and making the sync friendlier for HDDs.

Stage 3: Download Block Bodies Stage

At that stage, we download bodies for block headers that we already downloaded.

That is the most intensive stage for the network connection, the vast majority of data is downloaded here.

Stage 4: Recover Senders Stage

This stage recovers and stores senders for each transaction in each downloaded block.

This is also a CPU intensive stage and also benefits from multi-core CPUs.

This stage doesn't use any network connection.

Stage 5: Execute Blocks Stage

During this stage, we execute block-by-block everything that we downloaded before.

One important point there, that we don't check root hashes during this execution, we don't even build a merkle trie here.

This stage is single threaded.

This stage doesn't use internet connection.

This stage is disk intensive.

This stage can spawn unwinds if the block execution fails.

Stage 6: Compute State Root Stage

This stage build the Merkle trie and checks the root hash for the current state.

It also builds Intermediate Hashes along the way and stores them into the database.

If there were no intermediate hashes stored before (that could happend during the first inital sync), it builds the full Merkle Trie and its root hash.

If there are intermediate hashes in the database, it uses the block history to figure out which ones are outdated and which ones are still up to date. Then it builds a partial Merkle trie using the up-to-date hashes and only rebuilding the outdated ones.

If the root hash doesn't match, it initiates an unwind one block backwards.

This stage doesn't use a network connection.

Stage 7: Generate Hashed State Stage

Turbo-Geth during execution uses Plain state storage.

Plain State: Instead of the normal (we call it "Hashed State") where accounts and storage items are addressed as keccak256(address), in the plain state them are addressed by the address itself.

Though, to make sure that some APIs work and keep the compatibility with the other clients, we generate Hashed state as well.

If the hashed state is not empty, then we are looking at the History ChangeSets and update only the items that were changed.

This stage doesn't use a network connection.

Stages 8, 9, 10, 11: Generate Indexes Stages 8, 9, 10, and 11

There are 4 indexes that are generated during sync.

They might be disabled because they aren't used for all the APIs.

These stages do not use a network connection.

Account History Index

This index stores the mapping from the account address to the list of blocks where this account was changed in some way.

Storage History Index

This index stores the mapping from the storage item address to the list of blocks where this storage item was changed in some way.

Log Index

This index sets up a link from the [TODO] to [TODO].

Tx Lookup Index

This index sets up a link from the transaction hash to the block number.

Stage 12: Transaction Pool Stage

During this stage we start the transaction pool or update its state. For instance, we remove the transactions from the blocks we have downloaded from the pool.

On unwinds, we add the transactions from the blocks we unwind, back to the pool.

This stage doesn't use a network connection.

Stage 13: Finish

This stage sets the current block number that is then used by RPC calls, such as eth_blockNumber.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BodiesForward

func BodiesForward(
	s *StageState,
	ctx context.Context,
	tx ethdb.RwTx,
	cfg BodiesCfg) error

BodiesForward progresses Bodies stage in the forward direction

func FinishForward

func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg, btClient *snapshotsync.Client, snBuilder *snapshotsync.SnapshotMigrator) error

func HeadersForward

func HeadersForward(
	s *StageState,
	u Unwinder,
	ctx context.Context,
	tx ethdb.RwTx,
	cfg HeadersCfg,
	initialCycle bool,
) error

HeadersForward progresses Headers stage in the forward direction

func HeadersUnwind

func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg) error

func InsertBlockInStages

func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, vmConfig *vm.Config, engine consensus.Engine, block *types.Block, checkRoot bool) (bool, error)

func InsertBlocksInStages

func InsertBlocksInStages(db ethdb.Database, storageMode ethdb.StorageMode, config *params.ChainConfig, vmConfig *vm.Config, engine consensus.Engine, blocks []*types.Block, checkRoot bool) (bool, error)

func InsertHeaderChain

func InsertHeaderChain(logPrefix string, db ethdb.Database, headers []*types.Header, verifyDuration time.Duration) (bool, bool, uint64, error)

func InsertHeadersInStages

func InsertHeadersInStages(db ethdb.Database, config *params.ChainConfig, engine consensus.Engine, headers []*types.Header) (bool, bool, uint64, error)

func MigrateSnapshot

func MigrateSnapshot(to uint64, tx ethdb.RwTx, db ethdb.RwKV, btClient *snapshotsync.Client, mg *snapshotsync.SnapshotMigrator) error

func NotifyNewHeaders

func NotifyNewHeaders(from, to uint64, notifier ChainEventNotifier, db ethdb.Database) error

func NotifyNewHeaders2

func NotifyNewHeaders2(finishStageBeforeSync, unwindTo uint64, notifier ChainEventNotifier, db ethdb.Database) error

func NotifyPendingBlock

func NotifyPendingBlock(logPrefix string, notifier ChainEventNotifier, block *types.Block)

func NotifyPendingLogs

func NotifyPendingLogs(logPrefix string, notifier ChainEventNotifier, logs types.Logs)

func PromoteHashedStateCleanly

func PromoteHashedStateCleanly(logPrefix string, db ethdb.RwTx, cfg HashStateCfg, quit <-chan struct{}) error

func RegenerateIntermediateHashes

func RegenerateIntermediateHashes(logPrefix string, db ethdb.RwTx, cfg TrieCfg, expectedRootHash common.Hash, quit <-chan struct{}) (common.Hash, error)

func ResetHashState

func ResetHashState(tx ethdb.RwTx) error

func ResetIH

func ResetIH(tx ethdb.RwTx) error

func SpawnAccountHistoryIndex

func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, quitCh <-chan struct{}) error

func SpawnBlockHashStage

func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, quit <-chan struct{}) error

func SpawnBodiesSnapshotGenerationStage

func SpawnBodiesSnapshotGenerationStage(s *StageState, db ethdb.RwKV, tx ethdb.RwTx, snapshotDir string, torrentClient *snapshotsync.Client, quit <-chan struct{}) error

func SpawnCallTraces

func SpawnCallTraces(s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg CallTracesCfg) error

func SpawnExecuteBlocksStage

func SpawnExecuteBlocksStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit <-chan struct{}, cfg ExecuteBlockCfg) error

func SpawnHashStateStage

func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, quit <-chan struct{}) error

func SpawnHeaderDownloadStage

func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, headersFetchers []func() error) error

func SpawnHeadersSnapshotGenerationStage

func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg HeadersSnapshotGenCfg, sm *snapshotsync.SnapshotMigrator, torrentClient *snapshotsync.Client, quit <-chan struct{}) error

func SpawnIntermediateHashesStage

func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg TrieCfg, quit <-chan struct{}) (common.Hash, error)

func SpawnLogIndex

func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, quit <-chan struct{}) error

func SpawnMiningCreateBlockStage

func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, current *miningBlock, chainConfig params.ChainConfig, engine consensus.Engine, extra hexutil.Bytes, gasFloor, gasCeil uint64, coinbase common.Address, txPool *core.TxPool, quit <-chan struct{}) error

SpawnMiningCreateBlockStage TODO: - resubmitAdjustCh - variable is not implemented

func SpawnMiningExecStage

func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, current *miningBlock, chainConfig *params.ChainConfig, vmConfig *vm.Config, engine consensus.Engine, localTxs, remoteTxs types.TransactionsStream, coinbase common.Address, noempty bool, notifier ChainEventNotifier, quit <-chan struct{}) error

SpawnMiningExecStage TODO: - resubmitAdjustCh - variable is not implemented

func SpawnMiningFinishStage

func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, current *miningBlock, engine consensus.Engine, chainConfig params.ChainConfig, pendingBlocksCh chan<- *types.Block, minedBlocksCh chan<- *types.Block, sealCancel <-chan struct{}, quit <-chan struct{}) error

func SpawnRecoverSendersStage

func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, tx ethdb.RwTx, toBlock uint64, quitCh <-chan struct{}) error

func SpawnStateSnapshotGenerationStage

func SpawnStateSnapshotGenerationStage(s *StageState, db ethdb.RwKV, tx ethdb.RwTx, snapshotDir string, torrentClient *snapshotsync.Client, quit <-chan struct{}) error

func SpawnStorageHistoryIndex

func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, quitCh <-chan struct{}) error

func SpawnTxLookup

func SpawnTxLookup(s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, quitCh <-chan struct{}) error

func SpawnTxPool

func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, quitCh <-chan struct{}) error

func TxLookupTransform

func TxLookupTransform(logPrefix string, tx ethdb.RwTx, startKey, endKey []byte, quitCh <-chan struct{}, cfg TxLookupCfg) error

func UnwindAccountHistoryIndex

func UnwindAccountHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, quitCh <-chan struct{}) error

func UnwindBlockHashStage

func UnwindBlockHashStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg) error

func UnwindBodiesStage

func UnwindBodiesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg BodiesCfg) error

func UnwindCallTraces

func UnwindCallTraces(u *UnwindState, s *StageState, tx ethdb.RwTx, quitCh <-chan struct{}, cfg CallTracesCfg) error

func UnwindExecutionStage

func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg) error

func UnwindFinish

func UnwindFinish(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg FinishCfg) error

func UnwindHashStateStage

func UnwindHashStateStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HashStateCfg, quit <-chan struct{}) error

func UnwindIntermediateHashesStage

func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TrieCfg, quit <-chan struct{}) error

func UnwindLogIndex

func UnwindLogIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, quitCh <-chan struct{}) error

func UnwindSendersStage

func UnwindSendersStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SendersCfg) error

func UnwindStorageHistoryIndex

func UnwindStorageHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, quitCh <-chan struct{}) error

func UnwindTxLookup

func UnwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, quitCh <-chan struct{}) error

func UnwindTxPool

func UnwindTxPool(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, quitCh <-chan struct{}) error

func UpdateMetrics

func UpdateMetrics(db ethdb.Getter) error

UpdateMetrics - need update metrics manually because current "metrics" package doesn't support labels need to fix it in future

func VerifyHeaders

func VerifyHeaders(db ethdb.Getter, headers []*types.Header, config *params.ChainConfig, engine consensus.Engine, checkFreq int) error

Types

type BlockHashesCfg

type BlockHashesCfg struct {
	// contains filtered or unexported fields
}

func StageBlockHashesCfg

func StageBlockHashesCfg(db ethdb.RwKV, tmpDir string) BlockHashesCfg

type BodiesCfg

type BodiesCfg struct {
	// contains filtered or unexported fields
}

func StageBodiesCfg

func StageBodiesCfg(
	db ethdb.RwKV,
	bd *bodydownload.BodyDownload,
	bodyReqSend func(context.Context, *bodydownload.BodyRequest) []byte,
	penalise func(context.Context, []headerdownload.PenaltyItem),
	updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int),
	blockPropagator adapter.BlockPropagator,
	timeout int,
	chanConfig params.ChainConfig,
	batchSize datasize.ByteSize,
) BodiesCfg

type CallTracer

type CallTracer struct {
	// contains filtered or unexported fields
}

func NewCallTracer

func NewCallTracer() *CallTracer

func (*CallTracer) CaptureAccountRead

func (ct *CallTracer) CaptureAccountRead(account common.Address) error

func (*CallTracer) CaptureAccountWrite

func (ct *CallTracer) CaptureAccountWrite(account common.Address) error

func (*CallTracer) CaptureEnd

func (ct *CallTracer) CaptureEnd(depth int, output []byte, gasUsed uint64, t time.Duration, err error) error

func (*CallTracer) CaptureFault

func (ct *CallTracer) CaptureFault(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *stack.Stack, contract *vm.Contract, depth int, err error) error

func (*CallTracer) CaptureSelfDestruct

func (ct *CallTracer) CaptureSelfDestruct(from common.Address, to common.Address, value *big.Int)

func (*CallTracer) CaptureStart

func (ct *CallTracer) CaptureStart(depth int, from common.Address, to common.Address, precompile bool, create bool, calltype vm.CallType, input []byte, gas uint64, value *big.Int) error

func (*CallTracer) CaptureState

func (ct *CallTracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *stack.Stack, rData []byte, contract *vm.Contract, depth int, err error) error

type CallTracesCfg

type CallTracesCfg struct {
	ToBlock   uint64 // not setting this params means no limit
	BatchSize datasize.ByteSize
	// contains filtered or unexported fields
}

func StageCallTracesCfg

func StageCallTracesCfg(
	db ethdb.RwKV,
	ToBlock uint64,
	BatchSize datasize.ByteSize,
	tmpdir string,
	chainConfig *params.ChainConfig,
	engine consensus.Engine,
) CallTracesCfg

type ChainEventNotifier

type ChainEventNotifier interface {
	OnNewHeader(*types.Header)
	OnNewPendingLogs(types.Logs)
	OnNewPendingBlock(*types.Block)
	OnNewPendingTxs([]types.Transaction)
}

type ChainReader

type ChainReader struct {
	Cfg params.ChainConfig
	Db  ethdb.Getter
}

Implements consensus.ChainReader

func (ChainReader) Config

func (cr ChainReader) Config() *params.ChainConfig

Config retrieves the blockchain's chain configuration.

func (ChainReader) CurrentHeader

func (cr ChainReader) CurrentHeader() *types.Header

CurrentHeader retrieves the current header from the local chain.

func (ChainReader) GetBlock

func (cr ChainReader) GetBlock(hash common.Hash, number uint64) *types.Block

GetBlock retrieves a block from the database by hash and number.

func (ChainReader) GetHeader

func (cr ChainReader) GetHeader(hash common.Hash, number uint64) *types.Header

GetHeader retrieves a block header from the database by hash and number.

func (ChainReader) GetHeaderByHash

func (cr ChainReader) GetHeaderByHash(hash common.Hash) *types.Header

GetHeaderByHash retrieves a block header from the database by its hash.

func (ChainReader) GetHeaderByNumber

func (cr ChainReader) GetHeaderByNumber(number uint64) *types.Header

GetHeaderByNumber retrieves a block header from the database by number.

func (ChainReader) HasBlock

func (cr ChainReader) HasBlock(hash common.Hash, number uint64) bool

HasBlock retrieves a block from the database by hash and number.

type ChangeSetHook

type ChangeSetHook func(blockNum uint64, wr *state.ChangeSetWriter)

type DownloaderGlue

type DownloaderGlue interface {
	SpawnHeaderDownloadStage([]func() error, *StageState, Unwinder) error
	SpawnBodyDownloadStage(string, string, *StageState, Unwinder, *bodydownload.PrefetchedBlocks) (bool, error)
}

type ExecFunc

type ExecFunc func(state *StageState, unwinder Unwinder, tx ethdb.RwTx) error

ExecFunc is the execution function for the stage to move forward. * state - is the current state of the stage and contains stage data. * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used.

type ExecuteBlockCfg

type ExecuteBlockCfg struct {
	// contains filtered or unexported fields
}

func StageExecuteBlocksCfg

func StageExecuteBlocksCfg(
	kv ethdb.RwKV,
	WriteReceipts bool,
	WriteCallTraces bool,
	BatchSize datasize.ByteSize,
	ReaderBuilder StateReaderBuilder,
	WriterBuilder StateWriterBuilder,
	SilkwormExecutionFunc unsafe.Pointer,
	ChangeSetHook ChangeSetHook,
	chainConfig *params.ChainConfig,
	engine consensus.Engine,
	vmConfig *vm.Config,
	tmpdir string,
) ExecuteBlockCfg

type FinishCfg

type FinishCfg struct {
	// contains filtered or unexported fields
}

func StageFinishCfg

func StageFinishCfg(db ethdb.RwKV, tmpDir string) FinishCfg

type HasChangeSetWriter

type HasChangeSetWriter interface {
	ChangeSetWriter() *state.ChangeSetWriter
}

type HashPromoter

type HashPromoter struct {
	ChangeSetBufSize uint64
	TempDir          string
	// contains filtered or unexported fields
}

func NewHashPromoter

func NewHashPromoter(db ethdb.RwTx, quitCh <-chan struct{}) *HashPromoter

func (*HashPromoter) Promote

func (p *HashPromoter) Promote(logPrefix string, s *StageState, from, to uint64, storage bool, load etl.LoadFunc) error

func (*HashPromoter) Unwind

func (p *HashPromoter) Unwind(logPrefix string, s *StageState, u *UnwindState, storage bool, load etl.LoadFunc) error

type HashStateCfg

type HashStateCfg struct {
	// contains filtered or unexported fields
}

func StageHashStateCfg

func StageHashStateCfg(db ethdb.RwKV, tmpDir string) HashStateCfg

type HeadersCfg

type HeadersCfg struct {
	// contains filtered or unexported fields
}

func StageHeadersCfg

func StageHeadersCfg(
	db ethdb.RwKV,
	headerDownload *headerdownload.HeaderDownload,
	chainConfig params.ChainConfig,
	headerReqSend func(context.Context, *headerdownload.HeaderRequest) []byte,
	announceNewHashes func(context.Context, []headerdownload.Announce),
	penalize func(context.Context, []headerdownload.PenaltyItem),
	batchSize datasize.ByteSize,
	increment uint64,
) HeadersCfg

type HeadersSnapshotGenCfg

type HeadersSnapshotGenCfg struct {
	// contains filtered or unexported fields
}

func StageHeadersSnapshotGenCfg

func StageHeadersSnapshotGenCfg(db ethdb.RwKV, snapshotDir string) HeadersSnapshotGenCfg

type HistoryCfg

type HistoryCfg struct {
	// contains filtered or unexported fields
}

func StageHistoryCfg

func StageHistoryCfg(db ethdb.RwKV, tmpDir string) HistoryCfg

type LogIndexCfg

type LogIndexCfg struct {
	// contains filtered or unexported fields
}

func StageLogIndexCfg

func StageLogIndexCfg(db ethdb.RwKV, tmpDir string) LogIndexCfg

type MiningCfg

type MiningCfg struct {
	// configs
	params.MiningConfig

	// runtime dat
	Block *miningBlock
	// contains filtered or unexported fields
}

func StageMiningCfg

func StageMiningCfg(cfg params.MiningConfig, noempty bool, pendingBlocks chan<- *types.Block, resultCh chan<- *types.Block, sealCancel <-chan struct{}) *MiningCfg

type OldestAppearedLoad

type OldestAppearedLoad struct {
	// contains filtered or unexported fields
}

func (*OldestAppearedLoad) LoadFunc

func (l *OldestAppearedLoad) LoadFunc(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error

type OptionalParameters

type OptionalParameters struct {
	// StateReaderBuilder is a function that returns state reader for the block execution stage.
	// It can be used to add someting like bloom filters to figure out non-existing accounts and similar experiments.
	StateReaderBuilder StateReaderBuilder

	// StateReaderBuilder is a function that returns state writer for the block execution stage.
	// It can be used to update bloom or other types of filters between block execution.
	StateWriterBuilder StateWriterBuilder

	// Notifier allows sending some data when new headers or new blocks are added
	Notifier ChainEventNotifier

	SilkwormExecutionFunc unsafe.Pointer

	SnapshotDir      string
	TorrnetClient    *snapshotsync.Client
	SnapshotMigrator *snapshotsync.SnapshotMigrator
}

OptionalParameters contains any non-necessary parateres you can specify to fine-tune and experiment on StagedSync.

type PersistentUnwindStack

type PersistentUnwindStack struct {
	// contains filtered or unexported fields
}

func NewPersistentUnwindStack

func NewPersistentUnwindStack() *PersistentUnwindStack

func (*PersistentUnwindStack) Add

func (*PersistentUnwindStack) AddFromDB

func (s *PersistentUnwindStack) AddFromDB(db ethdb.KVGetter, stageID stages.SyncStage) error

func (*PersistentUnwindStack) Empty

func (s *PersistentUnwindStack) Empty() bool

func (*PersistentUnwindStack) LoadFromDB

func (s *PersistentUnwindStack) LoadFromDB(db ethdb.KVGetter, stageID stages.SyncStage) (*UnwindState, error)

func (*PersistentUnwindStack) Pop

type Promoter

type Promoter struct {
	ChangeSetBufSize uint64
	TempDir          string
	// contains filtered or unexported fields
}

func NewPromoter

func NewPromoter(db ethdb.RwTx, quitCh <-chan struct{}) *Promoter

func (*Promoter) Promote

func (p *Promoter) Promote(logPrefix string, s *StageState, from, to uint64, storage bool, codes bool) error

func (*Promoter) Unwind

func (p *Promoter) Unwind(logPrefix string, s *StageState, u *UnwindState, storage bool, codes bool) error

type SendersCfg

type SendersCfg struct {
	// contains filtered or unexported fields
}

func StageSendersCfg

func StageSendersCfg(db ethdb.RwKV, chainCfg *params.ChainConfig, tmpdir string) SendersCfg

type Stage

type Stage struct {
	// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
	ID stages.SyncStage
	// Description is a string that is shown in the logs.
	Description string
	// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
	Disabled bool
	// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
	DisabledDescription string
	// ExecFunc is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil!
	ExecFunc ExecFunc
	// UnwindFunc is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil!
	UnwindFunc UnwindFunc
}

Stage is a single sync stage in staged sync.

type StageBuilder

type StageBuilder struct {
	// ID is the stage identifier. Should be unique. It is recommended to prefix it with reverse domain `com.example.my-stage` to avoid conflicts.
	ID stages.SyncStage
	// Build is a factory function that initializes the sync stage based on the `StageParameters` provided.
	Build func(StageParameters) *Stage
}

StageBuilder represent an object to create a single stage for staged sync

type StageBuilders

type StageBuilders []StageBuilder

StageBuilders represents an ordered list of builders to build different stages. It also contains helper methods to change the list of stages.

func DefaultStages

func DefaultStages() StageBuilders

DefaultStages contains the list of default stage builders that are used by turbo-geth.

func MiningStages

func MiningStages() StageBuilders

func ReplacementStages

func ReplacementStages(ctx context.Context,
	sm ethdb.StorageMode,
	headers HeadersCfg,
	blockHashCfg BlockHashesCfg,
	bodies BodiesCfg,
	senders SendersCfg,
	exec ExecuteBlockCfg,
	hashState HashStateCfg,
	trieCfg TrieCfg,
	history HistoryCfg,
	logIndex LogIndexCfg,
	callTraces CallTracesCfg,
	txLookup TxLookupCfg,
	txPool TxPoolCfg,
	finish FinishCfg,
) StageBuilders

func (StageBuilders) Build

func (bb StageBuilders) Build(world StageParameters) []*Stage

Build creates sync states out of builders

func (StageBuilders) MustReplace

func (bb StageBuilders) MustReplace(id stages.SyncStage, newBuilder StageBuilder) StageBuilders

MustReplace finds a stage with a specific ID and then sets the new one instead of that. Chainable but panics if it can't find stage to replace.

type StageParameters

type StageParameters struct {
	ChainConfig *params.ChainConfig

	Engine consensus.Engine
	DB     ethdb.Database

	BatchSize datasize.ByteSize // Batch size for the execution stage

	TmpDir string
	// QuitCh is a channel that is closed. This channel is useful to listen to when
	// the stage can take significant time and gracefully shutdown at Ctrl+C.
	QuitCh <-chan struct{}

	InitialCycle bool

	SnapshotBuilder *snapshotsync.SnapshotMigrator
	// contains filtered or unexported fields
}

StageParameters contains the stage that stages receives at runtime when initializes. Then the stage can use it to receive different useful functions.

type StageState

type StageState struct {

	// Stage is the ID of this stage.
	Stage stages.SyncStage
	// BlockNumber is the current block number of the stage at the beginning of the state execution.
	BlockNumber uint64
	// contains filtered or unexported fields
}

StageState is the state of the stage.

func (*StageState) Done

func (s *StageState) Done()

Done makes sure that the stage execution is complete and proceeds to the next state. If Done() is not called and the stage `ExecFunc` exits, then the same stage will be called again. This side effect is useful for something like block body download.

func (*StageState) DoneAndUpdate

func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error

DoneAndUpdate a convenience method combining both `Done()` and `Update()` calls together.

func (*StageState) ExecutionAt

func (s *StageState) ExecutionAt(db ethdb.KVGetter) (uint64, error)

ExecutionAt gets the current state of the "Execution" stage, which block is currently executed.

func (*StageState) LogPrefix

func (s *StageState) LogPrefix() string

func (*StageState) Update

func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error

Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.

type StagedSync

type StagedSync struct {
	PrefetchedBlocks *bodydownload.PrefetchedBlocks

	Notifier ChainEventNotifier
	// contains filtered or unexported fields
}

func New

func New(stages StageBuilders, unwindOrder UnwindOrder, params OptionalParameters) *StagedSync

func (*StagedSync) Prepare

func (stagedSync *StagedSync) Prepare(
	d DownloaderGlue,
	chainConfig *params.ChainConfig,
	engine consensus.Engine,
	vmConfig *vm.Config,
	db ethdb.Database,
	tx ethdb.Tx,
	pid string,
	storageMode ethdb.StorageMode,
	tmpdir string,
	batchSize datasize.ByteSize,
	quitCh <-chan struct{},
	headersFetchers []func() error,
	txPool *core.TxPool,
	initialCycle bool,
	miningConfig *MiningCfg,
) (*State, error)

func (*StagedSync) SetTorrentParams

func (stagedSync *StagedSync) SetTorrentParams(client *snapshotsync.Client, snapshotsDir string, snapshotMigrator *snapshotsync.SnapshotMigrator)

type State

type State struct {
	// contains filtered or unexported fields
}

func NewState

func NewState(stagesList []*Stage) *State

func (*State) BeforeStageRun

func (s *State) BeforeStageRun(id stages.SyncStage, f func(tx ethdb.RwTx) (ethdb.RwTx, error))

func (*State) BeforeStageUnwind

func (s *State) BeforeStageUnwind(id stages.SyncStage, f func(tx ethdb.RwTx) (ethdb.RwTx, error))

func (*State) CurrentStage

func (s *State) CurrentStage() (uint, *Stage)

func (*State) DisableAllStages

func (s *State) DisableAllStages()

func (*State) DisableStages

func (s *State) DisableStages(ids ...stages.SyncStage)

func (*State) EnableStages

func (s *State) EnableStages(ids ...stages.SyncStage)

func (*State) GetLocalHeight

func (s *State) GetLocalHeight(db ethdb.KVGetter) (uint64, error)

func (*State) GetUnwindTo

func (s *State) GetUnwindTo(db ethdb.KVGetter) (uint64, error)

func (*State) IsAfter

func (s *State) IsAfter(stage1, stage2 stages.SyncStage) bool

IsAfter returns true if stage1 goes after stage2 in staged sync

func (*State) IsBefore

func (s *State) IsBefore(stage1, stage2 stages.SyncStage) bool

IsBefore returns true if stage1 goes before stage2 in staged sync

func (*State) IsDone

func (s *State) IsDone() bool

func (*State) Len

func (s *State) Len() int

func (*State) LoadUnwindInfo

func (s *State) LoadUnwindInfo(db ethdb.KVGetter) error

func (*State) LogPrefix

func (s *State) LogPrefix() string

func (*State) MockExecFunc

func (s *State) MockExecFunc(id stages.SyncStage, f ExecFunc)

func (*State) NextStage

func (s *State) NextStage()

func (*State) OnBeforeUnwind

func (s *State) OnBeforeUnwind(f func(id stages.SyncStage, tx ethdb.RwTx) (ethdb.RwTx, error))

func (*State) Run

func (s *State) Run(db ethdb.GetterPutter, tx ethdb.RwTx) error

func (*State) SetCurrentStage

func (s *State) SetCurrentStage(id stages.SyncStage) error

func (*State) StageByID

func (s *State) StageByID(id stages.SyncStage) (*Stage, error)

func (*State) StageState

func (s *State) StageState(stage stages.SyncStage, db ethdb.KVGetter) (*StageState, error)

func (*State) UnwindStage

func (s *State) UnwindStage(unwind *UnwindState, db TxOrDb, tx ethdb.RwTx) error

func (*State) UnwindTo

func (s *State) UnwindTo(blockNumber uint64, tx TxOrDb) error

type StateReaderBuilder

type StateReaderBuilder func(ethdb.Database) state.StateReader

type StateWriterBuilder

type StateWriterBuilder func(db ethdb.Database, changeSetsDB ethdb.RwTx, blockNumber uint64) state.WriterWithChangeSets

type TrieCfg

type TrieCfg struct {
	// contains filtered or unexported fields
}

func StageTrieCfg

func StageTrieCfg(db ethdb.RwKV, checkRoot, saveNewHashesToDB bool, tmpDir string) TrieCfg

type TxLookupCfg

type TxLookupCfg struct {
	// contains filtered or unexported fields
}

func StageTxLookupCfg

func StageTxLookupCfg(
	db ethdb.RwKV,
	tmpdir string,
) TxLookupCfg

type TxOrDb

type TxOrDb interface {
	ethdb.KVGetter
	ethdb.Putter
}

type TxPoolCfg

type TxPoolCfg struct {
	// contains filtered or unexported fields
}

func StageTxPoolCfg

func StageTxPoolCfg(db ethdb.RwKV, pool *core.TxPool, startFunc func()) TxPoolCfg

type UnwindFunc

type UnwindFunc func(unwindState *UnwindState, state *StageState, tx ethdb.RwTx) error

UnwindFunc is the unwinding logic of the stage. * unwindState - contains information about the unwind itself. * stageState - represents the state of this stage at the beginning of unwind.

type UnwindOrder

type UnwindOrder []int

UnwindOrder represents the order in which the stages needs to be unwound. Currently it is using indexes of stages, 0-based. The unwind order is important and not always just stages going backwards. Let's say, there is tx pool (state 10) can be unwound only after execution is fully unwound (stages 9...3).

func DefaultUnwindOrder

func DefaultUnwindOrder() UnwindOrder

DefaultUnwindOrder contains the default unwind order for `DefaultStages()`. Just adding stages that don't do unwinding, don't require altering the default order.

func MiningUnwindOrder

func MiningUnwindOrder() UnwindOrder

func ReplacementUnwindOrder

func ReplacementUnwindOrder() UnwindOrder

type UnwindState

type UnwindState struct {
	// Stage is the ID of the stage
	Stage stages.SyncStage
	// UnwindPoint is the block to unwind to.
	UnwindPoint uint64
}

UnwindState contains the information about unwind.

func (*UnwindState) Done

func (u *UnwindState) Done(db ethdb.Putter) error

Done() updates the DB state of the stage.

func (*UnwindState) Skip

func (u *UnwindState) Skip(db ethdb.Putter) error

Skip() ignores the unwind

type Unwinder

type Unwinder interface {
	// UnwindTo begins staged sync unwind to the specified block.
	UnwindTo(uint64, TxOrDb) error
}

Unwinder allows the stage to cause an unwind.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL