ingestion

package
v0.37.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: AGPL-3.0 Imports: 31 Imported by: 4

Documentation

Index

Constants

View Source
const DefaultCatchUpThreshold = 500

DefaultCatchUpThreshold is the number of blocks that if the execution is far behind the finalization then we will only lazy load the next unexecuted finalized blocks until the execution has caught up

View Source
const MaxConcurrentBlockExecutor = 5

MaxConcurrentBlockExecutor is the maximum number of concurrent block executors

View Source
const MaxProcessableBlocks = 10000

MaxProcessableBlocks is the maximum number of blocks that is queued to be processed

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockExecutor added in v0.33.19

type BlockExecutor interface {
	ExecuteBlock(ctx context.Context, block *entity.ExecutableBlock) (*execution.ComputationResult, error)
}

type BlockIDHeight added in v0.33.21

type BlockIDHeight struct {
	ID     flow.Identifier
	Height uint64
}

BlockIDHeight is a helper struct that holds the block ID and height

func HeaderToBlockIDHeight added in v0.33.21

func HeaderToBlockIDHeight(header *flow.Header) BlockIDHeight

type BlockLoader added in v0.32.0

type BlockLoader interface {
	LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error)
}

type BlockThrottle added in v0.33.19

type BlockThrottle struct {
	// contains filtered or unexported fields
}

BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent to the block queue for execution. It is useful for case when execution is falling far behind the finalization, in which case we want to throttle the blocks to be sent to the block queue for fetching data to execute them. Without throttle, the block queue will be flooded with blocks, and the network will be flooded with requests fetching collections, and the EN might quickly run out of memory.

func NewBlockThrottle added in v0.33.19

func NewBlockThrottle(
	log zerolog.Logger,
	state protocol.State,
	execState state.ExecutionState,
	headers storage.Headers,
) (*BlockThrottle, error)

func (*BlockThrottle) Done added in v0.33.19

func (c *BlockThrottle) Done() error

Done marks the throttle as done, and no more blocks will be processed

func (*BlockThrottle) Init added in v0.33.19

func (c *BlockThrottle) Init(processables chan<- BlockIDHeight, threshold int) error

func (*BlockThrottle) OnBlock added in v0.33.19

func (c *BlockThrottle) OnBlock(blockID flow.Identifier, height uint64) error

func (*BlockThrottle) OnBlockExecuted added in v0.33.19

func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) error

func (*BlockThrottle) OnBlockFinalized added in v0.33.19

func (c *BlockThrottle) OnBlockFinalized(finalizedHeight uint64)

type CollectionFetcher added in v0.32.0

type CollectionFetcher interface {
	// FetchCollection decides which collection nodes to fetch the collection from
	// No error is expected during normal operation
	FetchCollection(blockID flow.Identifier, height uint64, guarantee *flow.CollectionGuarantee) error

	// Force forces the requests to be sent immediately
	Force()
}

CollectionFetcher abstracts the details of how to fetch collection

type CollectionRequester added in v0.33.19

type CollectionRequester interface {
	module.ReadyDoneAware
	WithHandle(requester.HandleFunc)
}

type Core added in v0.33.19

type Core struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Core connects the execution components when it receives blocks and collections, it forwards them to the block queue. when the block queue decides to execute blocks, it forwards to the executor for execution when the block queue decides to fetch missing collections, it forwards to the collection fetcher when a block is executed, it notifies the block queue and forwards to execution state to save them.

func NewCore added in v0.33.19

func NewCore(
	logger zerolog.Logger,
	throttle Throttle,
	execState state.ExecutionState,
	stopControl *stop.StopControl,
	blocks storage.Blocks,
	collections storage.Collections,
	executor BlockExecutor,
	collectionFetcher CollectionFetcher,
	eventConsumer EventConsumer,
	metrics module.ExecutionMetrics,
) (*Core, error)

func (*Core) OnCollection added in v0.33.19

func (e *Core) OnCollection(col *flow.Collection)

type Engine

type Engine struct {
	psEvents.Noop // satisfy protocol events consumer interface
	execution_data.ProcessedHeightRecorder
	// contains filtered or unexported fields
}

An Engine receives and saves incoming blocks.

func New

func New(
	unit *engine.Unit,
	logger zerolog.Logger,
	net network.EngineRegistry,
	collectionFetcher CollectionFetcher,
	headers storage.Headers,
	blocks storage.Blocks,
	collections storage.Collections,
	executionEngine computation.ComputationManager,
	providerEngine provider.ProviderEngine,
	execState state.ExecutionState,
	metrics module.ExecutionMetrics,
	tracer module.Tracer,
	extLog bool,
	pruner *pruner.Pruner,
	uploader *uploader.Manager,
	stopControl *stop.StopControl,
	loader BlockLoader,
) (*Engine, error)

func (*Engine) BlockProcessable added in v0.10.0

func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate)

BlockProcessable handles the new verified blocks (blocks that have passed consensus validation) received from the consensus nodes NOTE: BlockProcessable might be called multiple times for the same block. NOTE: Ready calls reloadUnexecutedBlocks during initialization, which handles dropped protocol events.

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a channel that will close when the engine has successfully stopped.

func (*Engine) OnCollection

func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity)

OnCollection is a callback for handling the collections requested by the collection requester.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a channel that will close when the engine has successfully started.

type EventConsumer added in v0.33.19

type EventConsumer interface {
	BeforeComputationResultSaved(ctx context.Context, result *execution.ComputationResult)
	OnComputationResultSaved(ctx context.Context, result *execution.ComputationResult) string
}

type Machine added in v0.33.19

type Machine struct {
	events.Noop // satisfy protocol events consumer interface
	// contains filtered or unexported fields
}

Machine forwards blocks and collections to the core for processing.

func NewMachine added in v0.33.19

func NewMachine(
	logger zerolog.Logger,
	protocolEvents *events.Distributor,
	collectionRequester CollectionRequester,

	collectionFetcher CollectionFetcher,
	headers storage.Headers,
	blocks storage.Blocks,
	collections storage.Collections,
	execState state.ExecutionState,
	state protocol.State,
	metrics module.ExecutionMetrics,
	computationManager computation.ComputationManager,
	broadcaster provider.ProviderEngine,
	uploader *uploader.Manager,
	stopControl *stop.StopControl,
) (*Machine, module.ReadyDoneAware, error)

func (*Machine) BeforeComputationResultSaved added in v0.33.19

func (e *Machine) BeforeComputationResultSaved(
	ctx context.Context,
	result *execution.ComputationResult,
)

func (*Machine) BlockFinalized added in v0.33.21

func (e *Machine) BlockFinalized(b *flow.Header)

func (*Machine) BlockProcessable added in v0.33.19

func (e *Machine) BlockProcessable(header *flow.Header, qc *flow.QuorumCertificate)

Protocol Events implementation

func (*Machine) ExecuteBlock added in v0.33.19

func (e *Machine) ExecuteBlock(ctx context.Context, executableBlock *entity.ExecutableBlock) (*execution.ComputationResult, error)

func (*Machine) OnComputationResultSaved added in v0.33.19

func (e *Machine) OnComputationResultSaved(
	ctx context.Context,
	result *execution.ComputationResult,
) string

type Mempool

type Mempool struct {
	ExecutionQueue    *stdmap.Queues
	BlockByCollection *stdmap.BlockByCollections
}

func (*Mempool) Run

func (m *Mempool) Run(f func(blockByCollection *stdmap.BlockByCollectionBackdata, executionQueue *stdmap.QueuesBackdata) error) error

type Throttle added in v0.33.19

type Throttle interface {
	// Init initializes the throttle with the processables channel to forward the blocks
	Init(processables chan<- BlockIDHeight, threshold int) error
	// OnBlock is called when a block is received, the throttle will check if the execution
	// is falling far behind the finalization, and add the block to the processables channel
	// if it's not falling far behind.
	OnBlock(blockID flow.Identifier, height uint64) error
	// OnBlockExecuted is called when a block is executed, the throttle will check whether
	// the execution is caught up with the finalization, and allow all the remaining blocks
	// to be added to the processables channel.
	OnBlockExecuted(blockID flow.Identifier, height uint64) error
	// OnBlockFinalized is called when a block is finalized, the throttle will update the
	// finalized height.
	OnBlockFinalized(height uint64)
	// Done stops the throttle, and stop sending new blocks to the processables channel
	Done() error
}

Throttle is used to throttle the blocks to be added to the processables channel

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL