Documentation ¶
Index ¶
- Constants
- type BlockExecutor
- type BlockIDHeight
- type BlockLoader
- type BlockThrottle
- func (c *BlockThrottle) Done() error
- func (c *BlockThrottle) Init(processables chan<- BlockIDHeight, threshold int) error
- func (c *BlockThrottle) OnBlock(blockID flow.Identifier, height uint64) error
- func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) error
- func (c *BlockThrottle) OnBlockFinalized(finalizedHeight uint64)
- type CollectionFetcher
- type CollectionRequester
- type Core
- type Engine
- func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate)
- func (e *Engine) Done() <-chan struct{}
- func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity)
- func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error
- func (e *Engine) ProcessLocal(event interface{}) error
- func (e *Engine) Ready() <-chan struct{}
- func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{})
- func (e *Engine) SubmitLocal(event interface{})
- type EventConsumer
- type Machine
- func (e *Machine) BeforeComputationResultSaved(ctx context.Context, result *execution.ComputationResult)
- func (e *Machine) BlockFinalized(b *flow.Header)
- func (e *Machine) BlockProcessable(header *flow.Header, qc *flow.QuorumCertificate)
- func (e *Machine) ExecuteBlock(ctx context.Context, executableBlock *entity.ExecutableBlock) (*execution.ComputationResult, error)
- func (e *Machine) OnComputationResultSaved(ctx context.Context, result *execution.ComputationResult) string
- type Mempool
- type Throttle
Constants ¶
const DefaultCatchUpThreshold = 500
DefaultCatchUpThreshold is the number of blocks that if the execution is far behind the finalization then we will only lazy load the next unexecuted finalized blocks until the execution has caught up
const MaxConcurrentBlockExecutor = 5
MaxConcurrentBlockExecutor is the maximum number of concurrent block executors
const MaxProcessableBlocks = 10000
MaxProcessableBlocks is the maximum number of blocks that is queued to be processed
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BlockExecutor ¶ added in v0.33.19
type BlockExecutor interface {
ExecuteBlock(ctx context.Context, block *entity.ExecutableBlock) (*execution.ComputationResult, error)
}
type BlockIDHeight ¶ added in v0.33.21
type BlockIDHeight struct { ID flow.Identifier Height uint64 }
BlockIDHeight is a helper struct that holds the block ID and height
func HeaderToBlockIDHeight ¶ added in v0.33.21
func HeaderToBlockIDHeight(header *flow.Header) BlockIDHeight
type BlockLoader ¶ added in v0.32.0
type BlockLoader interface {
LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error)
}
type BlockThrottle ¶ added in v0.33.19
type BlockThrottle struct {
// contains filtered or unexported fields
}
BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent to the block queue for execution. It is useful for case when execution is falling far behind the finalization, in which case we want to throttle the blocks to be sent to the block queue for fetching data to execute them. Without throttle, the block queue will be flooded with blocks, and the network will be flooded with requests fetching collections, and the EN might quickly run out of memory.
func NewBlockThrottle ¶ added in v0.33.19
func NewBlockThrottle( log zerolog.Logger, state protocol.State, execState state.ExecutionState, headers storage.Headers, ) (*BlockThrottle, error)
func (*BlockThrottle) Done ¶ added in v0.33.19
func (c *BlockThrottle) Done() error
Done marks the throttle as done, and no more blocks will be processed
func (*BlockThrottle) Init ¶ added in v0.33.19
func (c *BlockThrottle) Init(processables chan<- BlockIDHeight, threshold int) error
func (*BlockThrottle) OnBlock ¶ added in v0.33.19
func (c *BlockThrottle) OnBlock(blockID flow.Identifier, height uint64) error
func (*BlockThrottle) OnBlockExecuted ¶ added in v0.33.19
func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) error
func (*BlockThrottle) OnBlockFinalized ¶ added in v0.33.19
func (c *BlockThrottle) OnBlockFinalized(finalizedHeight uint64)
type CollectionFetcher ¶ added in v0.32.0
type CollectionFetcher interface { // FetchCollection decides which collection nodes to fetch the collection from // No error is expected during normal operation FetchCollection(blockID flow.Identifier, height uint64, guarantee *flow.CollectionGuarantee) error // Force forces the requests to be sent immediately Force() }
CollectionFetcher abstracts the details of how to fetch collection
type CollectionRequester ¶ added in v0.33.19
type CollectionRequester interface { module.ReadyDoneAware WithHandle(requester.HandleFunc) }
type Core ¶ added in v0.33.19
type Core struct { *component.ComponentManager // contains filtered or unexported fields }
Core connects the execution components when it receives blocks and collections, it forwards them to the block queue. when the block queue decides to execute blocks, it forwards to the executor for execution when the block queue decides to fetch missing collections, it forwards to the collection fetcher when a block is executed, it notifies the block queue and forwards to execution state to save them.
func NewCore ¶ added in v0.33.19
func NewCore( logger zerolog.Logger, throttle Throttle, execState state.ExecutionState, stopControl *stop.StopControl, blocks storage.Blocks, collections storage.Collections, executor BlockExecutor, collectionFetcher CollectionFetcher, eventConsumer EventConsumer, metrics module.ExecutionMetrics, ) (*Core, error)
func (*Core) OnCollection ¶ added in v0.33.19
func (e *Core) OnCollection(col *flow.Collection)
type Engine ¶
type Engine struct { psEvents.Noop // satisfy protocol events consumer interface // contains filtered or unexported fields }
An Engine receives and saves incoming blocks.
func New ¶
func New( unit *engine.Unit, logger zerolog.Logger, net network.EngineRegistry, me module.Local, collectionFetcher CollectionFetcher, headers storage.Headers, blocks storage.Blocks, collections storage.Collections, executionEngine computation.ComputationManager, providerEngine provider.ProviderEngine, execState state.ExecutionState, metrics module.ExecutionMetrics, tracer module.Tracer, extLog bool, pruner *pruner.Pruner, uploader *uploader.Manager, stopControl *stop.StopControl, loader BlockLoader, ) (*Engine, error)
func (*Engine) BlockProcessable ¶ added in v0.10.0
func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate)
BlockProcessable handles the new verified blocks (blocks that have passed consensus validation) received from the consensus nodes NOTE: BlockProcessable might be called multiple times for the same block. NOTE: Ready calls reloadUnexecutedBlocks during initialization, which handles dropped protocol events.
func (*Engine) Done ¶
func (e *Engine) Done() <-chan struct{}
Done returns a channel that will close when the engine has successfully stopped.
func (*Engine) OnCollection ¶
func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity)
OnCollection is a callback for handling the collections requested by the collection requester.
func (*Engine) ProcessLocal ¶
ProcessLocal processes an event originating on the local node.
func (*Engine) Ready ¶
func (e *Engine) Ready() <-chan struct{}
Ready returns a channel that will close when the engine has successfully started.
func (*Engine) Submit ¶
func (e *Engine) Submit( channel channels.Channel, originID flow.Identifier, event interface{}, )
Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.
func (*Engine) SubmitLocal ¶
func (e *Engine) SubmitLocal(event interface{})
SubmitLocal submits an event originating on the local node.
type EventConsumer ¶ added in v0.33.19
type EventConsumer interface { BeforeComputationResultSaved(ctx context.Context, result *execution.ComputationResult) OnComputationResultSaved(ctx context.Context, result *execution.ComputationResult) string }
type Machine ¶ added in v0.33.19
type Machine struct { events.Noop // satisfy protocol events consumer interface // contains filtered or unexported fields }
Machine forwards blocks and collections to the core for processing.
func NewMachine ¶ added in v0.33.19
func NewMachine( logger zerolog.Logger, protocolEvents *events.Distributor, collectionRequester CollectionRequester, collectionFetcher CollectionFetcher, headers storage.Headers, blocks storage.Blocks, collections storage.Collections, execState state.ExecutionState, state protocol.State, metrics module.ExecutionMetrics, computationManager computation.ComputationManager, broadcaster provider.ProviderEngine, uploader *uploader.Manager, stopControl *stop.StopControl, ) (*Machine, module.ReadyDoneAware, error)
func (*Machine) BeforeComputationResultSaved ¶ added in v0.33.19
func (e *Machine) BeforeComputationResultSaved( ctx context.Context, result *execution.ComputationResult, )
func (*Machine) BlockFinalized ¶ added in v0.33.21
func (*Machine) BlockProcessable ¶ added in v0.33.19
func (e *Machine) BlockProcessable(header *flow.Header, qc *flow.QuorumCertificate)
Protocol Events implementation
func (*Machine) ExecuteBlock ¶ added in v0.33.19
func (e *Machine) ExecuteBlock(ctx context.Context, executableBlock *entity.ExecutableBlock) (*execution.ComputationResult, error)
func (*Machine) OnComputationResultSaved ¶ added in v0.33.19
type Mempool ¶
type Mempool struct { ExecutionQueue *stdmap.Queues BlockByCollection *stdmap.BlockByCollections }
func (*Mempool) Run ¶
func (m *Mempool) Run(f func(blockByCollection *stdmap.BlockByCollectionBackdata, executionQueue *stdmap.QueuesBackdata) error) error
type Throttle ¶ added in v0.33.19
type Throttle interface { // Init initializes the throttle with the processables channel to forward the blocks Init(processables chan<- BlockIDHeight, threshold int) error // OnBlock is called when a block is received, the throttle will check if the execution // is falling far behind the finalization, and add the block to the processables channel // if it's not falling far behind. OnBlock(blockID flow.Identifier, height uint64) error // OnBlockExecuted is called when a block is executed, the throttle will check whether // the execution is caught up with the finalization, and allow all the remaining blocks // to be added to the processables channel. OnBlockExecuted(blockID flow.Identifier, height uint64) error // OnBlockFinalized is called when a block is finalized, the throttle will update the // finalized height. OnBlockFinalized(height uint64) // Done stops the throttle, and stop sending new blocks to the processables channel Done() error }
Throttle is used to throttle the blocks to be added to the processables channel