Documentation ¶
Overview ¶
Package downloader handles downloading data from other nodes for sync. Sync refers to the process of catching up with other nodes, once caught up nodes use a different process to maintain their syncronisation.
There are a few different modes for syncing ¶
Full: Get all the blocks and apply them all to build the chain state
Fast: Get all the blocks and block receipts and insert them in the db without processing them and at the same time download the state for a block near the head block. Once the state has been downloaded, process subsequent blocks as a full sync would in order to reach the tip of the chain.
Fast sync introduces the concept of the pivot, which is a block at some point behind the head block for which the node attempts to sync state for. In geth the pivot was chosen to be 64 blocks behind the head block, the reason for choosing a point behind the head was to ensure that the block that you are syncing state for a block which is on the main chain and won't get reorged out. (see https://github.com/ethereum/go-ethereum/issues/25100), it was called the pivot because before the pivot the fast sync approach is used but after the pivot full sync is used, so you could imagine the syncing strategy pivoting around that point.
In celo we don't have the problem of reorgs but we still retain the pivot point because the validator uptime scores historically were required to be calculated by processing blocks from an epoch boundary. However since https://github.com/celo-org/celo-blockchain/pull/1833 which removes the requirement to process blocks from an epoch boundary we could in fact drop the concept of pivot.
Snap: Not currently working, but in theory works like fast sync except that nodes download a flat file to get the state, as opposed to making hundreds of thousands of individual requests for it. This should significantly speed up sync.
Light: Downloads only headers during sync and then downloads other data on demand in order to service rpc requests.
Lightest: Like light but downloads only one header per epoch, which on mainnet means one header out of every 17280 headers. This is particularly fast only takes 20 seconds or so to get synced.
Sync process detail ¶
Syncing is initiated with one peer (see eth.loop), the peer selected to sync with is the one with the highest total difficulty of all peers (see eth.nextSyncOp). Syncing may be cancelled and started with a different peer if a peer with a higher total difficulty becomes available.
Syncing introduces the concept of a checkpoint (see params.TrustedCheckpoint). The checkpoint is a hard coded set of trie roots that allow state sync to start before the whole header chain has been downloaded.
The pivot point is the point which the fast sync syncs state for, it is calculated as the first block of the epoch containing the block that is fsMinFullBlocks behind the current head block of the peer we are syncing against (fsMinFullBlocks is hardcoded to 64, chosen by the geth team to make re-orgs of the synced state unlikely).
The first step in syncing with a peer is to fetch the latest block header and pivot header. The geth implementation simply calculates the pivot as being 64 blocks before (fsMinFullBlocks) the head, and so if the head is currently < 64 then there is no valid pivot, in that case geth code uses the head as the pivot (they say to avoid nil pointer exceptions, but its not clear what this will do to the sync). From the celo side there should never be a case without a pivot block because we instead choose the pivot to be zero if head is currently < 64.
Next the sync finds the common ancestor (aka origin) between the node and the peer is syncing against.
If fast syncing {
The pivot is written to a file. If the origin turns out to be after the pivot then it is set to be just before the pivot. The ancient limit is set on the downloader (it would be much nicer if the concept of ancient could be encapsulated in the database rather than leaking here). The ancient defines boundary between freezer blocks and current blocks. Setting ancient limit here enables "direct-ancient mode" which I guess bypasses putting stuff into the main chain and then having it be moved to the freezer later. I guess in full sync mode since all blocks need to be processed all blocks need to go into the main database first and only after they have been process can they be moved to the freezer, but since fast sync does not process all blocks that step can be skipped. Then, and I'm not really clear why if the origin is greater than the last frozen block (IE there is stuff in the current database beyond whats in the Freezer) the "direct-ancient mode is disabled", maybe because it is only applicable for nodes that are starting from scratch or have never reached the pivot. If the origin turns out to be lower than the most recent frozen block then the blockchain is rewound to the origin. set the pivotHeader on the downloader as the pivot.
}
Then a number of go routines are started to fetch data from the origin to the head.
fetchHeaders fetchBodies fetchReceipts
And one to process the headers. (processHeaders)
If fast sync { start a routine to process the fast sync content (processFastSyncContent) }
If full syncing {
start a routine to process the full sync content (processFullSyncContent) }
These goroutines form a pipeline where the downloaded data flows as follows.
-> fetchBodies -> processFullSyncContent / \ fetchHeaders -> processHeaders \ \ \ -> fetchReceipts --> processFastSyncContent
fetchHeaders
fetchHeaders introduces the skeleton concept. The idea is that the node requests a set of headers from the peer that are spaced out at regular intervals, and then uses all peers to request headers to fill the gaps. The header hashes allow the node to easily verify that the received headers match the chain of the main peer they are syncing against. Whether fetching skeleton headers or not requests for headers are done in batches of up to 192 (MaxHeaderFetch) headers.
If lightest sync { fetch just epoch headers till current epoch then fetch all subsequent headers. (no skeleton) } else { fetch headers using the skeleton approach, until no more skeleton headers are returned then switch to requesting all subsequent headers from the peer. }
Wait for headers to be received.
Pass the received headers to the processHeaders routine.
If no more headers are returned and the pivot state has been fully synced then exit. (The pivot being synced is communicated via an atomic from processFastSyncContent)
Fetch more headers as done to start with.
processHeaders
Waits to receive headers from fetchHeaders inserts the received headers into the header chain.
If full sync { request blocks for inserted headers. (fetchBodies) } If fast sync { request blocks and receipts for inserted headers. (fetchBodies & fetchReceipts) }
processFastSyncContent
Reads fetch results (each fetch result has all the data required for a block (header txs, receipts, randomnes & epochSnarkData)from the downloader queue.
Updates the pivot block point if it has fallen sufficiently behind head.
Splits the fetch results around the pivot.
Results before the pivot are inserted with BlockChain.InsertReceiptChain (which inserts receipts, because in fast sync most blocks are not processed) and those after the pivot
If the pivot has completed syncing { Inserts the results after the pivot with, BlockChain.InsertChain and exits. } else { Start the process again prepending the results after the pivot point to the newly fetched results. (Note that if the pivot point is subsequently updated those results will be processed as fast sync results and inserted via BlockChain.InsertReceiptChain, but there seems to be a problem with our current implementation that means that the pivot would have to get 2 days old before it would be updated, so actually it looks like the list of result s will grow a lot during this time could be an OOM consideration) }
fetchBodies
A routine that gets notified of bodies to fetch and calls into a beast of a function (fetchParts) to fetch batches of block bodies from different peers, those bodies are delivered to the queue that collates them along with other delivered data into fetch results that are then retrieved by either processFastSyncContent or processFullSyncContent.
fetchReceipts
like fetchBodies but for receipts.
Package downloader contains the manual full chain synchronisation.
Index ¶
- Variables
- type BlockChain
- type DoneEvent
- type Downloader
- func (d *Downloader) Cancel()
- func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, randomness []*types.Randomness, ...) (err error)
- func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error
- func (d *Downloader) DeliverNodeData(id string, data [][]byte) error
- func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error
- func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error
- func (d *Downloader) Progress() ethereum.SyncProgress
- func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error
- func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error
- func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error
- func (d *Downloader) Synchronising() bool
- func (d *Downloader) Terminate()
- func (d *Downloader) UnregisterPeer(id string) error
- type FailedEvent
- type LightChain
- type LightPeer
- type Peer
- type PublicDownloaderAPI
- type StartEvent
- type SyncMode
- type SyncStatusSubscription
- type SyncingResult
Constants ¶
This section is empty.
Variables ¶
var ( MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxEpochHeaderFetch = 192 // Number of epoch block headers to fetch (only used in IBFT consensus + Lightest sync mode) MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request )
Functions ¶
This section is empty.
Types ¶
type BlockChain ¶
type BlockChain interface { LightChain // HasBlock verifies a block's presence in the local chain. HasBlock(common.Hash, uint64) bool // HasFastBlock verifies a fast block's presence in the local chain. HasFastBlock(common.Hash, uint64) bool // GetBlockByHash retrieves a block from the local chain. GetBlockByHash(common.Hash) *types.Block // CurrentBlock retrieves the head block from the local chain. CurrentBlock() *types.Block // CurrentFastBlock retrieves the head fast block from the local chain. CurrentFastBlock() *types.Block // FastSyncCommitHead directly commits the head block to a certain entity. FastSyncCommitHead(common.Hash) error // InsertChain inserts a batch of blocks into the local chain. InsertChain(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) // GetBlockByNumber retrieves a block from the database by number. GetBlockByNumber(uint64) *types.Block // Snapshots returns the blockchain snapshot tree to paused it during sync. Snapshots() *snapshot.Tree }
BlockChain encapsulates functions required to sync a (full or fast) blockchain.
type Downloader ¶
type Downloader struct { SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now // contains filtered or unexported fields }
func New ¶
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader
New creates a new downloader to fetch hashes and blocks from remote peers.
func (*Downloader) Cancel ¶
func (d *Downloader) Cancel()
Cancel aborts all of the operations and waits for all download goroutines to finish before returning.
func (*Downloader) DeliverBodies ¶
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, randomness []*types.Randomness, epochSnarkData []*types.EpochSnarkData) (err error)
DeliverBodies injects a new batch of block bodies received from a remote node.
func (*Downloader) DeliverHeaders ¶
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error
DeliverHeaders injects a new batch of block headers received from a remote node into the download schedule.
func (*Downloader) DeliverNodeData ¶
func (d *Downloader) DeliverNodeData(id string, data [][]byte) error
DeliverNodeData injects a new batch of node state data received from a remote node.
func (*Downloader) DeliverReceipts ¶
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error
DeliverReceipts injects a new batch of receipts received from a remote node.
func (*Downloader) DeliverSnapPacket ¶ added in v1.5.0
DeliverSnapPacket is invoked from a peer's message handler when it transmits a data packet for the local node to consume.
func (*Downloader) Progress ¶
func (d *Downloader) Progress() ethereum.SyncProgress
Progress retrieves the synchronisation boundaries, specifically the origin block where synchronisation started at (may have failed/suspended); the block or header sync is currently at; and the latest known block which the sync targets.
In addition, during the state download phase of fast synchronisation the number of processed and the total number of known states are also returned. Otherwise these are zero.
func (*Downloader) RegisterLightPeer ¶
func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error
RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
func (*Downloader) RegisterPeer ¶
func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error
RegisterPeer injects a new download peer into the set of block source to be used for fetching hashes and blocks from.
func (*Downloader) Synchronise ¶
Synchronise tries to sync up our local block chain with a remote peer, both adding various sanity checks as well as wrapping it with various log entries.
func (*Downloader) Synchronising ¶
func (d *Downloader) Synchronising() bool
Synchronising returns whether the downloader is currently retrieving blocks.
func (*Downloader) Terminate ¶
func (d *Downloader) Terminate()
Terminate interrupts the downloader, canceling all pending operations. The downloader cannot be reused after calling Terminate.
func (*Downloader) UnregisterPeer ¶
func (d *Downloader) UnregisterPeer(id string) error
UnregisterPeer remove a peer from the known list, preventing any action from the specified peer. An effort is also made to return any pending fetches into the queue.
type FailedEvent ¶
type FailedEvent struct{ Err error }
type LightChain ¶
type LightChain interface { // HasHeader verifies a header's presence in the local chain. HasHeader(common.Hash, uint64) bool // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header // GetHeaderByHash retrieves a header from the local chain by number. GetHeaderByNumber(uint64) *types.Header // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int // InsertHeaderChain inserts a batch of headers into the local chain. InsertHeaderChain([]*types.Header, int, bool) (int, error) Config() *params.ChainConfig // SetHead rewinds the local chain to a new head. SetHead(uint64) error }
LightChain encapsulates functions required to synchronise a light chain.
type LightPeer ¶
type LightPeer interface { Head() (common.Hash, *big.Int) RequestHeadersByHash(common.Hash, int, int, bool) error RequestHeadersByNumber(uint64, int, int, bool) error }
LightPeer encapsulates the methods required to synchronise with a remote light peer.
type Peer ¶
type Peer interface { LightPeer RequestBodies([]common.Hash) error RequestReceipts([]common.Hash) error RequestNodeData([]common.Hash) error }
Peer encapsulates the methods required to synchronise with a remote full peer.
type PublicDownloaderAPI ¶
type PublicDownloaderAPI struct {
// contains filtered or unexported fields
}
PublicDownloaderAPI provides an API which gives information about the current synchronisation status. It offers only methods that operates on data that can be available to anyone without security risks.
func NewPublicDownloaderAPI ¶
func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI
NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that listens for events from the downloader through the global event mux. In case it receives one of these events it broadcasts it to all syncing subscriptions that are installed through the installSyncSubscription channel.
func (*PublicDownloaderAPI) SubscribeSyncStatus ¶
func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription
SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates. The given channel must receive interface values, the result can either
func (*PublicDownloaderAPI) Syncing ¶
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error)
Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
type StartEvent ¶
type StartEvent struct{}
type SyncMode ¶
type SyncMode uint32
SyncMode represents the synchronisation mode of the downloader. It is a uint32 as it is used with atomic operations.
const ( FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks FastSync // Quickly download the headers, full sync only at the chain head SnapSync // Download the chain and the state via compact snapshots LightSync // Download only the headers and terminate afterwards LightestSync // Synchronise one block per Epoch (Celo-specific mode) )
func (SyncMode) MarshalText ¶
func (SyncMode) SyncFullBlockChain ¶
Returns true if the full blocks (and not just headers) are fetched. If a mode returns true here then it will return true for `SyncFullHeaderChain` as well.
func (SyncMode) SyncFullHeaderChain ¶
Returns true if the all headers and not just some a small, discontinuous, set of headers are fetched.
func (*SyncMode) UnmarshalText ¶
type SyncStatusSubscription ¶
type SyncStatusSubscription struct {
// contains filtered or unexported fields
}
SyncStatusSubscription represents a syncing subscription.
func (*SyncStatusSubscription) Unsubscribe ¶
func (s *SyncStatusSubscription) Unsubscribe()
Unsubscribe uninstalls the subscription from the DownloadAPI event loop. The status channel that was passed to subscribeSyncStatus isn't used anymore after this method returns.
type SyncingResult ¶
type SyncingResult struct { Syncing bool `json:"syncing"` Status ethereum.SyncProgress `json:"status"` }
SyncingResult provides information about the current synchronisation status for this node.