stagedstreamsync

package
v1.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: LGPL-3.0 Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BlocksPerRequest      int = 10 // number of blocks for each request
	BlocksPerInsertion    int = 50 // number of blocks for each insert batch
	BlockHashesPerRequest int = 20 // number of get block hashes for short range sync
	BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap
	BlockByHashesLowerCap int = 3  // number of get blocks by hashes lower cap

	LastMileBlocksThreshold int    = 10
	SyncLoopBatchSize       uint32 = 30  // maximum size for one query of block hashes
	VerifyHeaderBatchSize   uint64 = 100 // block chain header verification batch size (not used for now)
	LastMileBlocksSize             = 50

	// SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit,
	// no more request will be assigned to workers to wait for InsertChain to finish.
	SoftQueueCap int = 100

	// number of get nodes by hashes for each request
	StatesPerRequest int = 100

	// maximum number of blocks for get receipts request
	ReceiptsPerRequest int = 10

	// DefaultConcurrency is the default settings for concurrency
	DefaultConcurrency int = 4

	// MaxTriesToFetchNodeData is the maximum number of tries to fetch node data
	MaxTriesToFetchNodeData int = 5

	// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
	// to restart automatically when stuck in `getBlockHashes`
	ShortRangeTimeout time.Duration = 1 * time.Minute

	// pivot block distance ranges
	MinPivotDistanceToHead uint64 = 1024
	MaxPivotDistanceToHead uint64 = 2048
)
View Source
const (
	BlocksBucket          = "BlockBodies"
	BlockSignaturesBucket = "BlockSignatures"
	StageProgressBucket   = "StageProgress"

	// cache db keys
	LastBlockHeight = "LastBlockHeight"
	LastBlockHash   = "LastBlockHash"
)

Variables

View Source
var (
	StagesForwardOrder ForwardOrder
	StagesRevertOrder  RevertOrder
	StagesCleanUpOrder CleanUpOrder
)
View Source
var (
	ErrSavingBodiesProgressFail      = WrapStagedSyncError("saving progress for block bodies stage failed")
	ErrSaveStateProgressFail         = WrapStagedSyncError("saving progress for block States stage failed")
	ErrInvalidBlockNumber            = WrapStagedSyncError("invalid block number")
	ErrInvalidBlockBytes             = WrapStagedSyncError("invalid block bytes to insert into chain")
	ErrStageNotFound                 = WrapStagedSyncError("stage not found")
	ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result")
	ErrUnexpectedBlockHashes         = WrapStagedSyncError("unexpected get block hashes result delivered")
	ErrNilBlock                      = WrapStagedSyncError("nil block found")
	ErrNotEnoughStreams              = WrapStagedSyncError("number of streams smaller than minimum required")
	ErrParseCommitSigAndBitmapFail   = WrapStagedSyncError("parse commitSigAndBitmap failed")
	ErrVerifyHeaderFail              = WrapStagedSyncError("verify header failed")
	ErrInsertChainFail               = WrapStagedSyncError("insert to chain failed")
	ErrZeroBlockResponse             = WrapStagedSyncError("zero block number response from remote nodes")
	ErrEmptyWhitelist                = WrapStagedSyncError("empty white list")
	ErrWrongGetBlockNumberType       = WrapStagedSyncError("wrong type of getBlockNumber interface")
	ErrSaveBlocksToDbFailed          = WrapStagedSyncError("saving downloaded blocks to db failed")
)

Errors ...

View Source
var (

	// MaxHash represents the maximum possible hash value.
	MaxHash = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
)

Functions

func ByteCount

func ByteCount(b uint64) string

func CreateView

func CreateView(ctx context.Context, db kv.RwDB, tx kv.Tx, f func(tx kv.Tx) error) error

CreateView creates a view for a given db

func FullAccount

func FullAccount(data []byte) (*types.StateAccount, error)

FullAccount decodes the data on the 'slim RLP' format and returns the consensus format account.

func FullAccountRLP

func FullAccountRLP(data []byte) ([]byte, error)

FullAccountRLP converts data on the 'slim RLP' format into the full RLP-format.

func GetStageCleanUpProgress

func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error)

GetStageCleanUpProgress retrieves saved progress of given sync stage from the database

func GetStageID

func GetStageID(stage SyncStageID, isBeacon bool, prune bool) []byte

GetStageID returns the stage name in bytes

func GetStageName

func GetStageName(stage string, isBeacon bool, prune bool) string

GetStageName returns the stage name in string

func GetStageProgress

func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error)

GetStageProgress retrieves saved progress of a given sync stage from the database

func SaveStageCleanUpProgress

func SaveStageCleanUpProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error

SaveStageCleanUpProgress stores the progress of the clean up for a given sync stage to the database

func SaveStageProgress

func SaveStageProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error

SaveStageProgress saves progress of given sync stage

func WrapStagedSyncError

func WrapStagedSyncError(context string) error

WrapStagedSyncError wraps errors for staged sync and returns error object

func WrapStagedSyncMsg

func WrapStagedSyncMsg(context string) string

WrapStagedSyncMsg wraps message for staged sync and returns string

Types

type BeaconHelperConfig

type BeaconHelperConfig struct {
	BlockC     <-chan *types.Block
	InsertHook func()
}

BeaconHelperConfig is the extra config used for beaconHelper which uses pub-sub block message to do sync.

type BlockDownloadDetails

type BlockDownloadDetails struct {
	// contains filtered or unexported fields
}

type ChainExecution

type ChainExecution uint32
const (
	AllChains                 ChainExecution = iota // Can execute for any shard
	AllChainsExceptEpochChain                       // Can execute for any shard except epoch chain
	OnlyBeaconNode                                  // only for beacon node
	OnlyEpochChain                                  // only for epoch chain
	OnlyShardChain                                  // only for shard node (exclude beacon node and epoch chain)
)

type CleanUpOrder

type CleanUpOrder []SyncStageID

type CleanUpState

type CleanUpState struct {
	ID              SyncStageID
	ForwardProgress uint64 // progress of stage forward move
	CleanUpProgress uint64 // progress of stage prune move. after sync cycle it become equal to ForwardProgress by Done() method
	// contains filtered or unexported fields
}

CleanUpState contains states of cleanup process for a specific stage

func (*CleanUpState) Done

func (s *CleanUpState) Done(db kv.Putter) error

func (*CleanUpState) DoneAt

func (s *CleanUpState) DoneAt(db kv.Putter, blockNum uint64) error

func (*CleanUpState) LogPrefix

func (s *CleanUpState) LogPrefix() string

type Config

type Config struct {
	// Only run stream sync protocol as a server.
	// TODO: remove this when stream sync is fully up.
	ServerOnly bool

	// Synchronization mode of the downloader
	SyncMode SyncMode

	// parameters
	Network              nodeconfig.NetworkType
	Concurrency          int // Number of concurrent sync requests
	MinStreams           int // Minimum number of streams to do sync
	InitStreams          int // Number of streams requirement for initial bootstrap
	MaxAdvertiseWaitTime int // maximum time duration between protocol advertisements
	// stream manager config
	SmSoftLowCap int
	SmHardLowCap int
	SmHiCap      int
	SmDiscBatch  int

	// config for beacon config
	BHConfig *BeaconHelperConfig

	// use memory db
	UseMemDB bool

	// log the stage progress
	LogProgress bool

	// logs every single process and error to help debugging stream sync
	// DebugMode is not accessible to the end user and is only an aid for development
	DebugMode bool
}

Config is the downloader config

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

Downloader is responsible for sync task of one shard

func NewDownloader

func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader

NewDownloader creates a new downloader

func (*Downloader) Close

func (d *Downloader) Close()

Close closes the downloader

func (*Downloader) DownloadAsync

func (d *Downloader) DownloadAsync()

DownloadAsync triggers the download async.

func (*Downloader) NumPeers

func (d *Downloader) NumPeers() int

NumPeers returns the number of peers connected of a specific shard.

func (*Downloader) Start

func (d *Downloader) Start()

Start starts the downloader

func (*Downloader) SubscribeDownloadFinished

func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscription

SubscribeDownloadFinished subscribes the download finished

func (*Downloader) SubscribeDownloadStarted

func (d *Downloader) SubscribeDownloadStarted(ch chan struct{}) event.Subscription

SubscribeDownloadStarted subscribes download started

func (*Downloader) SyncStatus

func (d *Downloader) SyncStatus() (bool, uint64, uint64)

SyncStatus returns the current sync status

type Downloaders

type Downloaders struct {
	// contains filtered or unexported fields
}

Downloaders is the set of downloaders

func NewDownloaders

func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders

NewDownloaders creates Downloaders for sync of multiple blockchains

func (*Downloaders) Close

func (ds *Downloaders) Close()

Close closes the downloaders

func (*Downloaders) DownloadAsync

func (ds *Downloaders) DownloadAsync(shardID uint32)

DownloadAsync triggers a download

func (*Downloaders) GetShardDownloader

func (ds *Downloaders) GetShardDownloader(shardID uint32) *Downloader

GetShardDownloader returns the downloader with the given shard ID

func (*Downloaders) IsActive

func (ds *Downloaders) IsActive() bool

IsActive returns whether the downloader is active

func (*Downloaders) NumPeers

func (ds *Downloaders) NumPeers() map[uint32]int

NumPeers returns the connected peers for each shard

func (*Downloaders) Start

func (ds *Downloaders) Start()

Start starts the downloaders

func (*Downloaders) SyncStatus

func (ds *Downloaders) SyncStatus(shardID uint32) (bool, uint64, uint64)

SyncStatus returns whether the given shard is doing syncing task and the target block number

type ExecFunc

type ExecFunc func(firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

type ForwardOrder

type ForwardOrder []SyncStageID

type FullStateDownloadManager

type FullStateDownloadManager struct {
	// contains filtered or unexported fields
}

FullStateDownloadManager is the helper structure for get blocks request management

func (*FullStateDownloadManager) GetNextBatch

func (s *FullStateDownloadManager) GetNextBatch() (accounts []*accountTask,
	codes []*byteCodeTasksBundle,
	storages *storageTaskBundle,
	healtask *healTask,
	codetask *healTask,
	nItems int,
	err error)

getNextBatch returns objects with a maximum of n state download tasks to send to the remote peer.

func (*FullStateDownloadManager) HandleAccountRequestResult

func (s *FullStateDownloadManager) HandleAccountRequestResult(task *accountTask,
	retAccounts []*message.AccountData,
	proof [][]byte,
	origin []byte,
	last []byte,
	loopID int,
	streamID sttypes.StreamID) error

HandleAccountRequestResult handles get account ranges result

func (*FullStateDownloadManager) HandleByteCodeHealRequestResult

func (s *FullStateDownloadManager) HandleByteCodeHealRequestResult(task *healTask,
	hashes []common.Hash,
	codes [][]byte,
	loopID int,
	streamID sttypes.StreamID) error

HandleByteCodeHealRequestResult handles get byte codes heal result

func (*FullStateDownloadManager) HandleBytecodeRequestResult

func (s *FullStateDownloadManager) HandleBytecodeRequestResult(task interface{},
	reqHashes []common.Hash,
	bytecodes [][]byte,
	loopID int,
	streamID sttypes.StreamID) error

HandleBytecodeRequestResult handles get bytecode result it is a callback method to invoke when a batch of contract bytes codes are received from a remote peer.

func (*FullStateDownloadManager) HandleRequestError

func (s *FullStateDownloadManager) HandleRequestError(accounts []*accountTask,
	codes []*byteCodeTasksBundle,
	storages *storageTaskBundle,
	healtask *healTask,
	codetask *healTask,
	streamID sttypes.StreamID, err error)

HandleRequestError handles the error result

func (*FullStateDownloadManager) HandleStorageRequestResult

func (s *FullStateDownloadManager) HandleStorageRequestResult(mainTask *accountTask,
	subTask *storageTask,
	reqAccounts []common.Hash,
	roots []common.Hash,
	origin common.Hash,
	limit common.Hash,
	receivedSlots [][]*message.StorageData,
	proof [][]byte,
	loopID int,
	streamID sttypes.StreamID) error

HandleStorageRequestResult handles get storages result when ranges of storage slots are received from a remote peer.

func (*FullStateDownloadManager) HandleTrieNodeHealRequestResult

func (s *FullStateDownloadManager) HandleTrieNodeHealRequestResult(task *healTask,
	reqPaths []string,
	reqHashes []common.Hash,
	trienodes [][]byte,
	loopID int,
	streamID sttypes.StreamID) error

HandleTrieNodeHealRequestResult handles get trie nodes heal result when a batch of trie nodes are received from a remote peer.

func (*FullStateDownloadManager) SlimAccountRLP

func (s *FullStateDownloadManager) SlimAccountRLP(account types.StateAccount) []byte

SlimAccountRLP encodes the state account in 'slim RLP' format.

func (*FullStateDownloadManager) SyncCompleted

func (s *FullStateDownloadManager) SyncCompleted()

func (*FullStateDownloadManager) SyncStarted

func (s *FullStateDownloadManager) SyncStarted()

func (*FullStateDownloadManager) UnpackAccountRanges

func (s *FullStateDownloadManager) UnpackAccountRanges(retAccounts []*message.AccountData) ([]common.Hash, [][]byte, error)

UnpackAccountRanges retrieves the accounts from the range packet and converts from slim wire representation to consensus format. The returned data is RLP encoded since it's expected to be serialized to disk without further interpretation.

Note, this method does a round of RLP decoding and re-encoding, so only use it once and cache the results if need be. Ideally discard the packet afterwards to not double the memory use.

func (*FullStateDownloadManager) UnpackStorages

func (s *FullStateDownloadManager) UnpackStorages(slots [][]*message.StorageData) ([][]common.Hash, [][][]byte)

Unpack retrieves the storage slots from the range packet and returns them in a split flat format that's more consistent with the internal data structures.

type InvalidBlock

type InvalidBlock struct {
	Active   bool
	Number   uint64
	Hash     common.Hash
	IsLogged bool
	StreamID []sttypes.StreamID
}

type ProofList

type ProofList []rlp.RawValue

ProofList stores an ordered list of trie nodes. It implements ethdb.KeyValueWriter.

func (ProofList) DataSize

func (n ProofList) DataSize() int

DataSize returns the aggregated data size of nodes in the list

func (*ProofList) Delete

func (n *ProofList) Delete(key []byte) error

Delete panics as there's no reason to remove a node from the list.

func (*ProofList) Put

func (n *ProofList) Put(key []byte, value []byte) error

Put stores a new node at the end of the list

func (ProofList) Set

func (n ProofList) Set() *ProofSet

Set converts the node list to a ProofSet

func (ProofList) Store

func (n ProofList) Store(db ethdb.KeyValueWriter)

Store writes the contents of the list to the given database

type ProofSet

type ProofSet struct {
	// contains filtered or unexported fields
}

ProofSet stores a set of trie nodes. It implements trie.Database and can also act as a cache for another trie.Database.

func NewProofSet

func NewProofSet() *ProofSet

NewProofSet creates an empty node set

func (*ProofSet) DataSize

func (db *ProofSet) DataSize() int

DataSize returns the aggregated data size of nodes in the set

func (*ProofSet) Delete

func (db *ProofSet) Delete(key []byte) error

Delete removes a node from the set

func (*ProofSet) Get

func (db *ProofSet) Get(key []byte) ([]byte, error)

Get returns a stored node

func (*ProofSet) Has

func (db *ProofSet) Has(key []byte) (bool, error)

Has returns true if the node set contains the given key

func (*ProofSet) KeyCount

func (db *ProofSet) KeyCount() int

KeyCount returns the number of nodes in the set

func (*ProofSet) List

func (db *ProofSet) List() ProofList

List converts the node set to a ProofList

func (*ProofSet) Put

func (db *ProofSet) Put(key []byte, value []byte) error

Put stores a new node in the set

func (*ProofSet) Store

func (db *ProofSet) Store(target ethdb.KeyValueWriter)

Store writes the contents of the set to the given database

type RangeExecution

type RangeExecution uint32
const (
	LongRangeAndShortRange RangeExecution = iota // Both short range and long range
	OnlyShortRange                               // only short range
	OnlyLongRange                                // only long range

)

type ReceiptDownloadDetails

type ReceiptDownloadDetails struct {
	// contains filtered or unexported fields
}

type Received

type Received struct {
	// contains filtered or unexported fields
}

type RevertOrder

type RevertOrder []SyncStageID

type RevertState

type RevertState struct {
	ID          SyncStageID
	RevertPoint uint64 // RevertPoint is the block to revert to.
	// contains filtered or unexported fields
}

RevertState contains the information about revert.

func (*RevertState) Done

func (u *RevertState) Done(db kv.Putter) error

Done updates the DB state of the stage.

func (*RevertState) LogPrefix

func (u *RevertState) LogPrefix() string

type Reverter

type Reverter interface {
	// RevertTo begins staged sync revert to the specified block.
	RevertTo(revertPoint uint64, invalidBlockNumber uint64, invalidBlockHash common.Hash, invalidBlockStreamID sttypes.StreamID)
}

Reverter allows the stage to cause an revert.

type SlimAccount

type SlimAccount struct {
	Nonce    uint64
	Balance  *big.Int
	Root     []byte // Nil if root equals to types.EmptyRootHash
	CodeHash []byte // Nil if hash equals to types.EmptyCodeHash
}

SlimAccount is a modified version of an Account, where the root is replaced with a byte slice. This format can be used to represent full-consensus format or slim format which replaces the empty root and code hash as nil byte slice.

type Stage

type Stage struct {
	// ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`).
	ID SyncStageID
	// Handler handles the logic for the stage
	Handler StageHandler
	// Description is a string that is shown in the logs.
	Description string
	// DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page.
	DisabledDescription string
	// Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`.
	Disabled bool
	// Range defines whether stage has to be executed for either long range or short range
	RangeMode RangeExecution
	// ShardExecution defines this stage has to be executed for which shards
	ChainExecutionMode ChainExecution
}

Stage is a single sync stage in staged sync.

func DefaultStages

func DefaultStages(ctx context.Context,
	headsCfg StageHeadsCfg,
	seCfg StageEpochCfg,
	srCfg StageShortRangeCfg,
	bodiesCfg StageBodiesCfg,
	stateSyncCfg StageStateSyncCfg,
	fullStateSyncCfg StageFullStateSyncCfg,
	statesCfg StageStatesCfg,
	receiptsCfg StageReceiptsCfg,
	lastMileCfg StageLastMileCfg,
	finishCfg StageFinishCfg,
) []*Stage

type StageBodies

type StageBodies struct {
	// contains filtered or unexported fields
}

func NewStageBodies

func NewStageBodies(cfg StageBodiesCfg) *StageBodies

func (*StageBodies) CleanUp

func (b *StageBodies) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageBodies) Exec

func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

Exec progresses Bodies stage in the forward direction

func (*StageBodies) Revert

func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageBodiesCfg

type StageBodiesCfg struct {
	// contains filtered or unexported fields
}

func NewStageBodiesCfg

func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, extractReceiptHashes bool, logProgress bool) StageBodiesCfg

type StageEpoch

type StageEpoch struct {
	// contains filtered or unexported fields
}

func NewStageEpoch

func NewStageEpoch(cfg StageEpochCfg) *StageEpoch

func (*StageEpoch) CleanUp

func (sr *StageEpoch) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageEpoch) Exec

func (sr *StageEpoch) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

func (*StageEpoch) Revert

func (sr *StageEpoch) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageEpochCfg

type StageEpochCfg struct {
	// contains filtered or unexported fields
}

func NewStageEpochCfg

func NewStageEpochCfg(bc core.BlockChain, db kv.RwDB) StageEpochCfg

type StageFinish

type StageFinish struct {
	// contains filtered or unexported fields
}

func NewStageFinish

func NewStageFinish(cfg StageFinishCfg) *StageFinish

func (*StageFinish) CleanUp

func (finish *StageFinish) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageFinish) Exec

func (finish *StageFinish) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

func (*StageFinish) Revert

func (finish *StageFinish) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageFinishCfg

type StageFinishCfg struct {
	// contains filtered or unexported fields
}

func NewStageFinishCfg

func NewStageFinishCfg(db kv.RwDB) StageFinishCfg

type StageFullStateSync

type StageFullStateSync struct {
	// contains filtered or unexported fields
}

func NewStageFullStateSync

func NewStageFullStateSync(cfg StageFullStateSyncCfg) *StageFullStateSync

func (*StageFullStateSync) CleanUp

func (stg *StageFullStateSync) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageFullStateSync) Exec

func (sss *StageFullStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

Exec progresses States stage in the forward direction

func (*StageFullStateSync) Revert

func (stg *StageFullStateSync) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageFullStateSyncCfg

type StageFullStateSyncCfg struct {
	// contains filtered or unexported fields
}

func NewStageFullStateSyncCfg

func NewStageFullStateSyncCfg(bc core.BlockChain,
	db kv.RwDB,
	concurrency int,
	protocol syncProtocol,
	logger zerolog.Logger,
	logProgress bool) StageFullStateSyncCfg

type StageHandler

type StageHandler interface {
	// Exec is the execution function for the stage to move forward.
	// * firstCycle - is it the first cycle of syncing.
	// * invalidBlockRevert - whether the execution is to solve the invalid block
	// * s - is the current state of the stage and contains stage data.
	// * reverter - if the stage needs to cause reverting, `reverter` methods can be used.
	Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

	// Revert is the reverting logic of the stage.
	// * firstCycle - is it the first cycle of syncing.
	// * u - contains information about the revert itself.
	// * s - represents the state of this stage at the beginning of revert.
	Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) error

	// CleanUp is the execution function for the stage to prune old data.
	// * firstCycle - is it the first cycle of syncing.
	// * p - is the current state of the stage and contains stage data.
	CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) error
}

type StageHeads

type StageHeads struct {
	// contains filtered or unexported fields
}

func NewStageHeads

func NewStageHeads(cfg StageHeadsCfg) *StageHeads

func (*StageHeads) CleanUp

func (heads *StageHeads) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageHeads) Exec

func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

func (*StageHeads) Revert

func (heads *StageHeads) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageHeadsCfg

type StageHeadsCfg struct {
	// contains filtered or unexported fields
}

func NewStageHeadersCfg

func NewStageHeadersCfg(bc core.BlockChain, db kv.RwDB) StageHeadsCfg

type StageLastMile

type StageLastMile struct {
	// contains filtered or unexported fields
}

func NewStageLastMile

func NewStageLastMile(cfg StageLastMileCfg) *StageLastMile

func (*StageLastMile) CleanUp

func (lm *StageLastMile) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageLastMile) Exec

func (lm *StageLastMile) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

func (*StageLastMile) Revert

func (lm *StageLastMile) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageLastMileCfg

type StageLastMileCfg struct {
	// contains filtered or unexported fields
}

func NewStageLastMileCfg

func NewStageLastMileCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageLastMileCfg

type StageReceipts

type StageReceipts struct {
	// contains filtered or unexported fields
}

func NewStageReceipts

func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts

func (*StageReceipts) CleanUp

func (r *StageReceipts) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageReceipts) Exec

func (r *StageReceipts) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

Exec progresses receipts stage in the forward direction

func (*StageReceipts) Revert

func (r *StageReceipts) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageReceiptsCfg

type StageReceiptsCfg struct {
	// contains filtered or unexported fields
}

func NewStageReceiptsCfg

func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeacon bool, logProgress bool) StageReceiptsCfg

type StageShortRange

type StageShortRange struct {
	// contains filtered or unexported fields
}

func NewStageShortRange

func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange

func (*StageShortRange) CleanUp

func (sr *StageShortRange) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageShortRange) Exec

func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) error

func (*StageShortRange) Revert

func (sr *StageShortRange) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageShortRangeCfg

type StageShortRangeCfg struct {
	// contains filtered or unexported fields
}

func NewStageShortRangeCfg

func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg

type StageState

type StageState struct {
	ID          SyncStageID
	BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution.
	// contains filtered or unexported fields
}

StageState is the state of the stage.

func (*StageState) CurrentStageProgress

func (s *StageState) CurrentStageProgress(db kv.Getter) (uint64, error)

func (*StageState) LogPrefix

func (s *StageState) LogPrefix() string

func (*StageState) StageProgress

func (s *StageState) StageProgress(db kv.Getter, id SyncStageID) (uint64, error)

func (*StageState) Update

func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error

Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.

func (*StageState) UpdateCleanUp

func (s *StageState) UpdateCleanUp(db kv.Putter, blockNum uint64) error

type StageStateSync

type StageStateSync struct {
	// contains filtered or unexported fields
}

func NewStageStateSync

func NewStageStateSync(cfg StageStateSyncCfg) *StageStateSync

func (*StageStateSync) CleanUp

func (stg *StageStateSync) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageStateSync) Exec

func (sss *StageStateSync) Exec(ctx context.Context, bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

Exec progresses States stage in the forward direction

func (*StageStateSync) Revert

func (stg *StageStateSync) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageStateSyncCfg

type StageStateSyncCfg struct {
	// contains filtered or unexported fields
}

func NewStageStateSyncCfg

func NewStageStateSyncCfg(bc core.BlockChain,
	db kv.RwDB,
	concurrency int,
	protocol syncProtocol,
	logger zerolog.Logger,
	logProgress bool) StageStateSyncCfg

type StageStates

type StageStates struct {
	// contains filtered or unexported fields
}

func NewStageStates

func NewStageStates(cfg StageStatesCfg) *StageStates

func (*StageStates) CleanUp

func (stg *StageStates) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error)

func (*StageStates) Exec

func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error)

Exec progresses States stage in the forward direction

func (*StageStates) Revert

func (stg *StageStates) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error)

type StageStatesCfg

type StageStatesCfg struct {
	// contains filtered or unexported fields
}

func NewStageStatesCfg

func NewStageStatesCfg(
	bc core.BlockChain,
	db kv.RwDB,
	blockDBs []kv.RwDB,
	concurrency int,
	logger zerolog.Logger,
	logProgress bool) StageStatesCfg

type StagedStreamSync

type StagedStreamSync struct {
	UseMemDB bool

	LogProgress bool
	// contains filtered or unexported fields
}

func CreateStagedSync

func CreateStagedSync(ctx context.Context,
	bc core.BlockChain,
	consensus *consensus.Consensus,
	dbDir string,
	isBeaconNode bool,
	protocol syncProtocol,
	config Config,
	logger zerolog.Logger,
) (*StagedStreamSync, error)

CreateStagedSync creates an instance of staged sync

func New

func New(
	bc core.BlockChain,
	consensus *consensus.Consensus,
	db kv.RwDB,
	stagesList []*Stage,
	isBeacon bool,
	protocol syncProtocol,
	isBeaconNode bool,
	config Config,
	logger zerolog.Logger,
) *StagedStreamSync

New creates a new StagedStreamSync instance

func (*StagedStreamSync) AddLastMileBlock

func (ss *StagedStreamSync) AddLastMileBlock(block *types.Block)

AddLastMileBlock adds the latest a few block into queue for syncing only keep the latest blocks with size capped by LastMileBlocksSize

func (*StagedStreamSync) Blockchain

func (s *StagedStreamSync) Blockchain() core.BlockChain

func (*StagedStreamSync) CleanUpStageState

func (s *StagedStreamSync) CleanUpStageState(ctx context.Context, id SyncStageID, forwardProgress uint64, tx kv.Tx, db kv.RwDB) (*CleanUpState, error)

func (*StagedStreamSync) CurrentBlockNumber

func (s *StagedStreamSync) CurrentBlockNumber() uint64

func (*StagedStreamSync) DB

func (s *StagedStreamSync) DB() kv.RwDB

func (*StagedStreamSync) Debug

func (s *StagedStreamSync) Debug(source string, msg interface{})

func (*StagedStreamSync) DisableAllStages

func (s *StagedStreamSync) DisableAllStages() []SyncStageID

DisableAllStages disables all stages including their reverts

func (*StagedStreamSync) DisableStages

func (s *StagedStreamSync) DisableStages(ids ...SyncStageID)

DisableStages disables stages by a set of given stage IDs

func (*StagedStreamSync) Done

func (s *StagedStreamSync) Done()

func (*StagedStreamSync) EnableStages

func (s *StagedStreamSync) EnableStages(ids ...SyncStageID)

EnableStages enables stages by a set of given stage IDs

func (*StagedStreamSync) IsAfter

func (s *StagedStreamSync) IsAfter(stage1, stage2 SyncStageID) bool

IsAfter returns true if stage1 goes after stage2 in staged sync

func (*StagedStreamSync) IsBeacon

func (s *StagedStreamSync) IsBeacon() bool

func (*StagedStreamSync) IsBefore

func (s *StagedStreamSync) IsBefore(stage1, stage2 SyncStageID) bool

IsBefore returns true if stage1 goes before stage2 in staged sync

func (*StagedStreamSync) IsDone

func (s *StagedStreamSync) IsDone() bool

IsDone returns true if last stage have been done

func (*StagedStreamSync) IsExplorer

func (s *StagedStreamSync) IsExplorer() bool

func (*StagedStreamSync) Len

func (s *StagedStreamSync) Len() int

func (*StagedStreamSync) LogPrefix

func (s *StagedStreamSync) LogPrefix() string

func (*StagedStreamSync) NewRevertState

func (s *StagedStreamSync) NewRevertState(id SyncStageID, revertPoint uint64) *RevertState

func (*StagedStreamSync) NextStage

func (s *StagedStreamSync) NextStage()

func (*StagedStreamSync) PrevRevertPoint

func (s *StagedStreamSync) PrevRevertPoint() *uint64

func (*StagedStreamSync) RevertTo

func (s *StagedStreamSync) RevertTo(revertPoint uint64, invalidBlockNumber uint64, invalidBlockHash common.Hash, invalidBlockStreamID sttypes.StreamID)

RevertTo sets the revert point

func (*StagedStreamSync) RollbackLastMileBlocks

func (ss *StagedStreamSync) RollbackLastMileBlocks(ctx context.Context, hashes []common.Hash) error

func (*StagedStreamSync) Run

func (s *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, firstCycle bool) error

Run runs a full cycle of stages

func (*StagedStreamSync) SetCurrentStage

func (s *StagedStreamSync) SetCurrentStage(id SyncStageID) error

SetCurrentStage sets the current stage to a given stage id

func (*StagedStreamSync) StageState

func (s *StagedStreamSync) StageState(ctx context.Context, stage SyncStageID, tx kv.Tx, db kv.RwDB) (*StageState, error)

StageState retrieves the latest stage state from db

func (*StagedStreamSync) UpdateBlockAndStatus

func (ss *StagedStreamSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error

UpdateBlockAndStatus updates block and its status in db

type StagedStreamSyncService

type StagedStreamSyncService struct {
	Downloaders *Downloaders
}

StagedStreamSyncService is simply a adapter of downloaders, which support block synchronization

func NewService

func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService

NewService creates a new downloader service

func (*StagedStreamSyncService) Start

func (s *StagedStreamSyncService) Start() error

Start starts the service

func (*StagedStreamSyncService) Stop

func (s *StagedStreamSyncService) Stop() error

Stop stops the service

type StateDownloadManager

type StateDownloadManager struct {
	// contains filtered or unexported fields
}

StateDownloadManager is the helper structure for get blocks request management

func (*StateDownloadManager) GetNextBatch

func (s *StateDownloadManager) GetNextBatch() (nodes []common.Hash, paths []string, codes []common.Hash, err error)

getNextBatch returns objects with a maximum of n state download tasks to send to the remote peer.

func (*StateDownloadManager) HandleRequestError

func (s *StateDownloadManager) HandleRequestError(codeHashes []common.Hash, triePaths []string, streamID sttypes.StreamID, err error)

HandleRequestError handles the error result

func (*StateDownloadManager) HandleRequestResult

func (s *StateDownloadManager) HandleRequestResult(codeHashes []common.Hash, triePaths []string, response [][]byte, loopID int, streamID sttypes.StreamID) error

HandleRequestResult handles get trie paths and code hashes result

type SyncCycle

type SyncCycle struct {
	Number       uint64
	TargetHeight uint64
	// contains filtered or unexported fields
}

type SyncMode

type SyncMode uint32

SyncMode represents the synchronization mode of the downloader. It is a uint32 as it is used with atomic operations.

const (
	FullSync SyncMode = iota // Synchronize the entire blockchain history from full blocks
	FastSync                 // Download all blocks and states
	SnapSync                 // Download the chain and the state via compact snapshots
)

type SyncProgress

type SyncProgress struct {
	Tasks map[uint64]*accountTask // The suspended account tasks (contract tasks within)

	// Status report during syncing phase
	AccountSynced  uint64             // Number of accounts downloaded
	AccountBytes   common.StorageSize // Number of account trie bytes persisted to disk
	BytecodeSynced uint64             // Number of bytecodes downloaded
	BytecodeBytes  common.StorageSize // Number of bytecode bytes downloaded
	StorageSynced  uint64             // Number of storage slots downloaded
	StorageBytes   common.StorageSize // Number of storage trie bytes persisted to disk

	// Status report during healing phase
	TrienodeHealSynced uint64             // Number of state trie nodes downloaded
	TrienodeHealBytes  common.StorageSize // Number of state trie bytes persisted to disk
	BytecodeHealSynced uint64             // Number of bytecodes downloaded
	BytecodeHealBytes  common.StorageSize // Number of bytecodes persisted to disk
}

SyncProgress is a database entry to allow suspending and resuming a snapshot state sync. Opposed to full and fast sync, there is no way to restart a suspended snap sync without prior knowledge of the suspension point.

type SyncStageID

type SyncStageID string

SyncStageID represents the stages in the Mode.StagedSync mode

const (
	Heads         SyncStageID = "Heads"         // Heads are downloaded
	ShortRange    SyncStageID = "ShortRange"    // short range
	SyncEpoch     SyncStageID = "SyncEpoch"     // epoch sync
	BlockBodies   SyncStageID = "BlockBodies"   // Block bodies are downloaded, TxHash and UncleHash are getting verified
	States        SyncStageID = "States"        // will construct most recent state from downloaded blocks
	StateSync     SyncStageID = "StateSync"     // State sync
	FullStateSync SyncStageID = "FullStateSync" // Full State Sync
	Receipts      SyncStageID = "Receipts"      // Receipts
	LastMile      SyncStageID = "LastMile"      // update blocks after sync and update last mile blocks as well
	Finish        SyncStageID = "Finish"        // Nominal stage after all other stages
)

type Timing

type Timing struct {
	// contains filtered or unexported fields
}

type TrieNodePathSet

type TrieNodePathSet [][]byte

of only the account path. There's no need to be able to address both an account node and a storage node in the same request as it cannot happen that a slot is accessed before the account path is fully expanded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL