Documentation ¶
Overview ¶
Package blocksync implements two versions of a reactor Service that are responsible for block propagation and gossip between peers. This mechanism was formerly known as fast-sync.
In order for a full node to successfully participate in consensus, it must have the latest view of state. The blocksync protocol is a mechanism in which peers may exchange and gossip entire blocks with one another, in a request/response type model, until they've successfully synced to the latest head block. Once succussfully synced, the full node can switch to an active role in consensus and will no longer blocksync and thus no longer run the blocksync process.
Note, the blocksync reactor Service gossips entire block and relevant data such that each receiving peer may construct the entire view of the blocksync state.
There is currently only one version of the blocksync reactor Service that is battle-tested, but whose test coverage is lacking and is not formally verified.
The v0 blocksync reactor Service has one p2p channel, BlockchainChannel. This channel is responsible for handling messages that both request blocks and respond to block requests from peers. For every block request from a peer, the reactor will execute respondToPeer which will fetch the block from the node's state store and respond to the peer. For every block response, the node will add the block to its synchronizer.
Internally, v0 runs a poolRoutine that constantly checks for what blocks it needs and requests them. The poolRoutine is also responsible for taking blocks from the synchronizer, saving and executing each block.
Index ¶
- Constants
- func AddNumPending(val int32) store.UpdateFunc[types.NodeID, PeerData]
- func ResetMonitor() store.UpdateFunc[types.NodeID, PeerData]
- func UpdateMonitor(recvSize int) store.UpdateFunc[types.NodeID, PeerData]
- type BlockResponse
- type InMemPeerStore
- func (p *InMemPeerStore) All() []PeerData
- func (p *InMemPeerStore) Delete(peerID types.NodeID)
- func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool)
- func (p *InMemPeerStore) FindTimedoutPeers() []PeerData
- func (p *InMemPeerStore) Get(peerID types.NodeID) (PeerData, bool)
- func (p *InMemPeerStore) GetAndDelete(peerID types.NodeID) (PeerData, bool)
- func (p *InMemPeerStore) IsZero() bool
- func (p *InMemPeerStore) Len() int
- func (p *InMemPeerStore) MaxHeight() int64
- func (p *InMemPeerStore) Put(peerID types.NodeID, newPeer PeerData)
- func (p *InMemPeerStore) Query(spec store.QueryFunc[types.NodeID, PeerData], limit int) []PeerData
- func (p *InMemPeerStore) Update(peerID types.NodeID, updates ...store.UpdateFunc[types.NodeID, PeerData])
- type OptionFunc
- type PeerAdder
- type PeerData
- type PeerRemover
- type Reactor
- func (r *Reactor) GetMaxPeerBlockHeight() int64
- func (r *Reactor) GetRemainingSyncTime() time.Duration
- func (r *Reactor) GetTotalSyncedTime() time.Duration
- func (r *Reactor) OnStart(ctx context.Context) error
- func (r *Reactor) OnStop()
- func (r *Reactor) PublishStatus(event types.EventDataBlockSyncStatus) error
- func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error
- type Synchronizer
- func (s *Synchronizer) AddPeer(peer PeerData)
- func (s *Synchronizer) GetStatus() (int64, int32)
- func (s *Synchronizer) IsCaughtUp() bool
- func (s *Synchronizer) LastAdvance() time.Time
- func (s *Synchronizer) MaxPeerHeight() int64
- func (s *Synchronizer) OnStart(ctx context.Context) error
- func (s *Synchronizer) OnStop()
- func (s *Synchronizer) RemovePeer(peerID types.NodeID)
- func (s *Synchronizer) WaitForSync(ctx context.Context)
Constants ¶
const ( MaxMsgSize = types.MaxBlockSizeBytes + p2pproto.BlockResponseMessagePrefixSize + p2pproto.BlockResponseMessageFieldKeySize )
Variables ¶
This section is empty.
Functions ¶
func AddNumPending ¶
AddNumPending adds a value to the numPending field
func ResetMonitor ¶
func ResetMonitor() store.UpdateFunc[types.NodeID, PeerData]
ResetMonitor replaces a peer monitor on a new one if numPending is zero
func UpdateMonitor ¶
UpdateMonitor adds a block size value to the peer monitor if numPending is greater than zero
Types ¶
type BlockResponse ¶
BlockResponse ...
func BlockResponseFromProto ¶
func BlockResponseFromProto(resp *bcproto.BlockResponse, peerID types.NodeID) (*BlockResponse, error)
func (*BlockResponse) Validate ¶
func (r *BlockResponse) Validate() error
type InMemPeerStore ¶
type InMemPeerStore struct {
// contains filtered or unexported fields
}
InMemPeerStore in-memory peer store
func NewInMemPeerStore ¶
func NewInMemPeerStore(peers ...PeerData) *InMemPeerStore
NewInMemPeerStore creates a new in-memory peer store
func (*InMemPeerStore) All ¶
func (p *InMemPeerStore) All() []PeerData
All returns all stored peers in the store
func (*InMemPeerStore) Delete ¶
func (p *InMemPeerStore) Delete(peerID types.NodeID)
Delete deletes the peer data from the store
func (*InMemPeerStore) FindPeer ¶
func (p *InMemPeerStore) FindPeer(height int64) (PeerData, bool)
FindPeer finds a peer for the request criteria by which the peer is looked up: 1. the number of pending requests must be less allowed (maxPendingRequestsPerPeer) 2. the height must be between two values base and height otherwise return the empty peer data and false
func (*InMemPeerStore) FindTimedoutPeers ¶
func (p *InMemPeerStore) FindTimedoutPeers() []PeerData
FindTimedoutPeers finds and returns the timed out peers
func (*InMemPeerStore) Get ¶
func (p *InMemPeerStore) Get(peerID types.NodeID) (PeerData, bool)
Get returns peer's data and true if the peer is found otherwise empty structure and false
func (*InMemPeerStore) GetAndDelete ¶
func (p *InMemPeerStore) GetAndDelete(peerID types.NodeID) (PeerData, bool)
GetAndDelete combines Get operation and Delete in one call
func (*InMemPeerStore) IsZero ¶
func (p *InMemPeerStore) IsZero() bool
IsZero returns true if the store doesn't have a peer yet otherwise false
func (*InMemPeerStore) Len ¶
func (p *InMemPeerStore) Len() int
Len returns the count of all stored peers
func (*InMemPeerStore) MaxHeight ¶
func (p *InMemPeerStore) MaxHeight() int64
MaxHeight looks at all the peers in the store to get the maximum peer height.
func (*InMemPeerStore) Put ¶
func (p *InMemPeerStore) Put(peerID types.NodeID, newPeer PeerData)
Put adds the peer data to the store if the peer does not exist, otherwise update the current value
func (*InMemPeerStore) Query ¶
Query finds and returns the copy of peers by specification conditions
func (*InMemPeerStore) Update ¶
func (p *InMemPeerStore) Update(peerID types.NodeID, updates ...store.UpdateFunc[types.NodeID, PeerData])
Update applies update functions to the peer if it exists
type OptionFunc ¶
type OptionFunc func(v *Synchronizer)
func WithClock ¶
func WithClock(clock clockwork.Clock) OptionFunc
func WithLogger ¶
func WithLogger(logger log.Logger) OptionFunc
func WithWorkerPool ¶
func WithWorkerPool(wp *workerpool.WorkerPool) OptionFunc
type PeerData ¶
type PeerData struct {
// contains filtered or unexported fields
}
PeerData uses to keep peer related data like base height and the current height etc
type PeerRemover ¶
type Reactor ¶
type Reactor struct { service.BaseService // contains filtered or unexported fields }
Reactor handles long-term catchup syncing.
func NewReactor ¶
func NewReactor( logger log.Logger, stateStore sm.Store, blockExec *sm.BlockExecutor, store *store.BlockStore, nodeProTxHash crypto.ProTxHash, consReactor consensusReactor, p2pClient *client.Client, peerEvents p2p.PeerEventSubscriber, blockSync bool, metrics *consensus.Metrics, eventBus *eventbus.EventBus, ) *Reactor
NewReactor returns new reactor instance.
func (*Reactor) GetMaxPeerBlockHeight ¶
func (*Reactor) GetRemainingSyncTime ¶
func (*Reactor) GetTotalSyncedTime ¶
func (*Reactor) OnStart ¶
OnStart starts separate go routines for each p2p Channel and listens for envelopes on each. In addition, it also listens for peer updates and handles messages on that p2p channel accordingly. The caller must be sure to execute OnStop to ensure the outbound p2p Channels are closed.
If blockSyncFlag is enabled, we also start the synchronizer If the synchronizer fails to start, an error is returned.
func (*Reactor) OnStop ¶
func (r *Reactor) OnStop()
OnStop stops the reactor by signaling to all spawned goroutines to exit and blocking until they all exit.
func (*Reactor) PublishStatus ¶
func (r *Reactor) PublishStatus(event types.EventDataBlockSyncStatus) error
type Synchronizer ¶
type Synchronizer struct { service.BaseService // contains filtered or unexported fields }
Synchronizer keeps track of the block sync peers, block requests and block responses.
func NewSynchronizer ¶
func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApplier, opts ...OptionFunc) *Synchronizer
NewSynchronizer returns a new Synchronizer with the height equal to start
func (*Synchronizer) AddPeer ¶
func (s *Synchronizer) AddPeer(peer PeerData)
AddPeer adds the peer's alleged blockchain base and height
func (*Synchronizer) GetStatus ¶
func (s *Synchronizer) GetStatus() (int64, int32)
GetStatus returns synchronizer's height, count of in progress requests
func (*Synchronizer) IsCaughtUp ¶
func (s *Synchronizer) IsCaughtUp() bool
IsCaughtUp returns true if this node is caught up, false - otherwise.
func (*Synchronizer) LastAdvance ¶
func (s *Synchronizer) LastAdvance() time.Time
LastAdvance returns the time when the last block was processed (or start time if no blocks were processed).
func (*Synchronizer) MaxPeerHeight ¶
func (s *Synchronizer) MaxPeerHeight() int64
MaxPeerHeight returns the highest reported height.
func (*Synchronizer) OnStart ¶
func (s *Synchronizer) OnStart(ctx context.Context) error
OnStart implements service.Service by spawning requesters routine and recording synchronizer's start time.
func (*Synchronizer) OnStop ¶
func (s *Synchronizer) OnStop()
func (*Synchronizer) RemovePeer ¶
func (s *Synchronizer) RemovePeer(peerID types.NodeID)
RemovePeer removes the peer with peerID from the synchronizer. If there's no peer with peerID, function is a no-op.
func (*Synchronizer) WaitForSync ¶
func (s *Synchronizer) WaitForSync(ctx context.Context)