blocksync

package
v1.1.0-dev.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package blocksync implements two versions of a reactor Service that are responsible for block propagation and gossip between peers. This mechanism was formerly known as fast-sync.

In order for a full node to successfully participate in consensus, it must have the latest view of state. The blocksync protocol is a mechanism in which peers may exchange and gossip entire blocks with one another, in a request/response type model, until they've successfully synced to the latest head block. Once succussfully synced, the full node can switch to an active role in consensus and will no longer blocksync and thus no longer run the blocksync process.

Note, the blocksync reactor Service gossips entire block and relevant data such that each receiving peer may construct the entire view of the blocksync state.

There is currently only one version of the blocksync reactor Service that is battle-tested, but whose test coverage is lacking and is not formally verified.

The v0 blocksync reactor Service has one p2p channel, BlockchainChannel. This channel is responsible for handling messages that both request blocks and respond to block requests from peers. For every block request from a peer, the reactor will execute respondToPeer which will fetch the block from the node's state store and respond to the peer. For every block response, the node will add the block to its synchronizer.

Internally, v0 runs a poolRoutine that constantly checks for what blocks it needs and requests them. The poolRoutine is also responsible for taking blocks from the synchronizer, saving and executing each block.

Index

Constants

Variables

This section is empty.

Functions

func AddNumPending

func AddNumPending(val int32) store.UpdateFunc[types.NodeID, PeerData]

AddNumPending adds a value to the numPending field

func ResetMonitor

func ResetMonitor() store.UpdateFunc[types.NodeID, PeerData]

ResetMonitor replaces a peer monitor on a new one if numPending is zero

func UpdateMonitor

func UpdateMonitor(recvSize int) store.UpdateFunc[types.NodeID, PeerData]

UpdateMonitor adds a block size value to the peer monitor if numPending is greater than zero

Types

type BlockResponse

type BlockResponse struct {
	PeerID types.NodeID
	Block  *types.Block
	Commit *types.Commit
}

BlockResponse ...

func BlockResponseFromProto

func BlockResponseFromProto(resp *bcproto.BlockResponse, peerID types.NodeID) (*BlockResponse, error)

func (*BlockResponse) Validate

func (r *BlockResponse) Validate() error

type InMemPeerStore

type InMemPeerStore struct {
	// contains filtered or unexported fields
}

InMemPeerStore in-memory peer store

func NewInMemPeerStore

func NewInMemPeerStore(peers ...PeerData) *InMemPeerStore

NewInMemPeerStore creates a new in-memory peer store

func (*InMemPeerStore) All

func (p *InMemPeerStore) All() []PeerData

All returns all stored peers in the store

func (*InMemPeerStore) Delete

func (p *InMemPeerStore) Delete(peerID types.NodeID)

Delete deletes the peer data from the store

func (*InMemPeerStore) FindPeer

func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool)

FindPeer finds a peer for the request criteria by which the peer is looked up: 1. the number of pending requests must be less allowed (maxPendingRequestsPerPeer) 2. the height must be between two values base and height otherwise return the empty peer data and false

func (*InMemPeerStore) FindTimedoutPeers

func (p *InMemPeerStore) FindTimedoutPeers() []PeerData

FindTimedoutPeers finds and returns the timed out peers

func (*InMemPeerStore) Get

func (p *InMemPeerStore) Get(peerID types.NodeID) (PeerData, bool)

Get returns peer's data and true if the peer is found otherwise empty structure and false

func (*InMemPeerStore) GetAndDelete

func (p *InMemPeerStore) GetAndDelete(peerID types.NodeID) (PeerData, bool)

GetAndDelete combines Get operation and Delete in one call

func (*InMemPeerStore) IsZero

func (p *InMemPeerStore) IsZero() bool

IsZero returns true if the store doesn't have a peer yet otherwise false

func (*InMemPeerStore) Len

func (p *InMemPeerStore) Len() int

Len returns the count of all stored peers

func (*InMemPeerStore) MaxHeight

func (p *InMemPeerStore) MaxHeight() int64

MaxHeight looks at all the peers in the store to get the maximum peer height.

func (*InMemPeerStore) Put

func (p *InMemPeerStore) Put(peerID types.NodeID, newPeer PeerData)

Put adds the peer data to the store if the peer does not exist, otherwise update the current value

func (*InMemPeerStore) Query

func (p *InMemPeerStore) Query(spec store.QueryFunc[types.NodeID, PeerData], limit int) []PeerData

Query finds and returns the copy of peers by specification conditions

func (*InMemPeerStore) Update

func (p *InMemPeerStore) Update(peerID types.NodeID, updates ...store.UpdateFunc[types.NodeID, PeerData])

Update applies update functions to the peer if it exists

type OptionFunc

type OptionFunc func(v *Synchronizer)

func WithClock

func WithClock(clock clockwork.Clock) OptionFunc

func WithLogger

func WithLogger(logger log.Logger) OptionFunc

func WithWorkerPool

func WithWorkerPool(wp *workerpool.WorkerPool) OptionFunc

type PeerAdder

type PeerAdder interface {
	AddPeer(peer PeerData)
}

type PeerData

type PeerData struct {
	// contains filtered or unexported fields
}

PeerData uses to keep peer related data like base height and the current height etc

type PeerRemover

type PeerRemover interface {
	RemovePeer(peerID types.NodeID)
}

type Reactor

type Reactor struct {
	service.BaseService
	// contains filtered or unexported fields
}

Reactor handles long-term catchup syncing.

func NewReactor

func NewReactor(
	logger log.Logger,
	stateStore sm.Store,
	blockExec *sm.BlockExecutor,
	store *store.BlockStore,
	nodeProTxHash crypto.ProTxHash,
	consReactor consensusReactor,
	p2pClient *client.Client,
	peerEvents p2p.PeerEventSubscriber,
	blockSync bool,
	metrics *consensus.Metrics,
	eventBus *eventbus.EventBus,
) *Reactor

NewReactor returns new reactor instance.

func (*Reactor) GetMaxPeerBlockHeight

func (r *Reactor) GetMaxPeerBlockHeight() int64

func (*Reactor) GetRemainingSyncTime

func (r *Reactor) GetRemainingSyncTime() time.Duration

func (*Reactor) GetTotalSyncedTime

func (r *Reactor) GetTotalSyncedTime() time.Duration

func (*Reactor) OnStart

func (r *Reactor) OnStart(ctx context.Context) error

OnStart starts separate go routines for each p2p Channel and listens for envelopes on each. In addition, it also listens for peer updates and handles messages on that p2p channel accordingly. The caller must be sure to execute OnStop to ensure the outbound p2p Channels are closed.

If blockSyncFlag is enabled, we also start the synchronizer If the synchronizer fails to start, an error is returned.

func (*Reactor) OnStop

func (r *Reactor) OnStop()

OnStop stops the reactor by signaling to all spawned goroutines to exit and blocking until they all exit.

func (*Reactor) PublishStatus

func (r *Reactor) PublishStatus(event types.EventDataBlockSyncStatus) error

func (*Reactor) SwitchToBlockSync

func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error

SwitchToBlockSync is called by the state sync reactor when switching to fast sync.

type Synchronizer

type Synchronizer struct {
	service.BaseService
	// contains filtered or unexported fields
}

Synchronizer keeps track of the block sync peers, block requests and block responses.

func NewSynchronizer

func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApplier, opts ...OptionFunc) *Synchronizer

NewSynchronizer returns a new Synchronizer with the height equal to start

func (*Synchronizer) AddPeer

func (s *Synchronizer) AddPeer(peer PeerData)

AddPeer adds the peer's alleged blockchain base and height

func (*Synchronizer) GetStatus

func (s *Synchronizer) GetStatus() (int64, int32)

GetStatus returns synchronizer's height, count of in progress requests

func (*Synchronizer) IsCaughtUp

func (s *Synchronizer) IsCaughtUp() bool

IsCaughtUp returns true if this node is caught up, false - otherwise.

func (*Synchronizer) LastAdvance

func (s *Synchronizer) LastAdvance() time.Time

LastAdvance returns the time when the last block was processed (or start time if no blocks were processed).

func (*Synchronizer) MaxPeerHeight

func (s *Synchronizer) MaxPeerHeight() int64

MaxPeerHeight returns the highest reported height.

func (*Synchronizer) OnStart

func (s *Synchronizer) OnStart(ctx context.Context) error

OnStart implements service.Service by spawning requesters routine and recording synchronizer's start time.

func (*Synchronizer) OnStop

func (s *Synchronizer) OnStop()

func (*Synchronizer) RemovePeer

func (s *Synchronizer) RemovePeer(peerID types.NodeID)

RemovePeer removes the peer with peerID from the synchronizer. If there's no peer with peerID, function is a no-op.

func (*Synchronizer) WaitForSync

func (s *Synchronizer) WaitForSync(ctx context.Context)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL