ingestion

package
v0.35.3-crescendo-prev... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2024 License: AGPL-3.0 Imports: 27 Imported by: 4

Documentation

Index

Constants

View Source
const DefaultCatchUpThreshold = 500

DefaultCatchUpThreshold is the number of blocks that if the execution is far behind the finalization then we will only lazy load the next unexecuted finalized blocks until the execution has caught up

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockExecutor added in v0.33.19

type BlockExecutor interface {
	ExecuteBlock(ctx context.Context, block *entity.ExecutableBlock) (*execution.ComputationResult, error)
}

type BlockLoader added in v0.32.0

type BlockLoader interface {
	LoadUnexecuted(ctx context.Context) ([]flow.Identifier, error)
}

type BlockThrottle added in v0.33.19

type BlockThrottle struct {
	// contains filtered or unexported fields
}

BlockThrottle is a helper struct that helps throttle the unexecuted blocks to be sent to the block queue for execution. It is useful for case when execution is falling far behind the finalization, in which case we want to throttle the blocks to be sent to the block queue for fetching data to execute them. Without throttle, the block queue will be flooded with blocks, and the network will be flooded with requests fetching collections, and the EN might quickly run out of memory.

func NewBlockThrottle added in v0.33.19

func NewBlockThrottle(
	log zerolog.Logger,
	state protocol.State,
	execState state.ExecutionState,
	headers storage.Headers,
	catchupThreshold int,
) (*BlockThrottle, error)

func (*BlockThrottle) Init added in v0.33.19

func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error

func (*BlockThrottle) OnBlock added in v0.33.19

func (c *BlockThrottle) OnBlock(blockID flow.Identifier) error

func (*BlockThrottle) OnBlockExecuted added in v0.33.19

func (c *BlockThrottle) OnBlockExecuted(_ flow.Identifier, executed uint64) error

func (*BlockThrottle) OnBlockFinalized added in v0.33.19

func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header)

type CollectionFetcher added in v0.32.0

type CollectionFetcher interface {
	// FetchCollection decides which collection nodes to fetch the collection from
	// No error is expected during normal operation
	FetchCollection(blockID flow.Identifier, height uint64, guarantee *flow.CollectionGuarantee) error

	// Force forces the requests to be sent immediately
	Force()
}

CollectionFetcher abstracts the details of how to fetch collection

type Core added in v0.33.19

type Core struct {
	// contains filtered or unexported fields
}

Core connects the execution components when it receives blocks and collections, it forwards them to the block queue. when the block queue decides to execute blocks, it forwards to the executor for execution when the block queue decides to fetch missing collections, it forwards to the collection fetcher when a block is executed, it notifies the block queue and forwards to execution state to save them.

func NewCore added in v0.33.19

func NewCore(
	logger zerolog.Logger,
	throttle Throttle,
	execState state.ExecutionState,
	stopControl *stop.StopControl,
	headers storage.Headers,
	blocks storage.Blocks,
	collections storage.Collections,
	executor BlockExecutor,
	collectionFetcher CollectionFetcher,
	eventConsumer EventConsumer,
) *Core

func (*Core) Done

func (e *Core) Done() <-chan struct{}

func (*Core) OnBlock added in v0.33.19

func (e *Core) OnBlock(header *flow.Header, qc *flow.QuorumCertificate)

func (*Core) OnCollection added in v0.33.19

func (e *Core) OnCollection(col *flow.Collection)

func (*Core) Ready

func (e *Core) Ready() <-chan struct{}

type Engine

type Engine struct {
	psEvents.Noop // satisfy protocol events consumer interface
	// contains filtered or unexported fields
}

An Engine receives and saves incoming blocks.

func New

func New(
	unit *engine.Unit,
	logger zerolog.Logger,
	net network.EngineRegistry,
	collectionFetcher CollectionFetcher,
	headers storage.Headers,
	blocks storage.Blocks,
	collections storage.Collections,
	executionEngine computation.ComputationManager,
	providerEngine provider.ProviderEngine,
	execState state.ExecutionState,
	metrics module.ExecutionMetrics,
	tracer module.Tracer,
	extLog bool,
	pruner *pruner.Pruner,
	uploader *uploader.Manager,
	stopControl *stop.StopControl,
	loader BlockLoader,
) (*Engine, error)

func (*Engine) BlockProcessable added in v0.10.0

func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate)

BlockProcessable handles the new verified blocks (blocks that have passed consensus validation) received from the consensus nodes NOTE: BlockProcessable might be called multiple times for the same block. NOTE: Ready calls reloadUnexecutedBlocks during initialization, which handles dropped protocol events.

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a channel that will close when the engine has successfully stopped.

func (*Engine) OnCollection

func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity)

OnCollection is a callback for handling the collections requested by the collection requester.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a channel that will close when the engine has successfully started.

type EventConsumer added in v0.33.19

type EventConsumer interface {
	BeforeComputationResultSaved(ctx context.Context, result *execution.ComputationResult)
	OnComputationResultSaved(ctx context.Context, result *execution.ComputationResult) string
}

type Mempool

type Mempool struct {
	ExecutionQueue    *stdmap.Queues
	BlockByCollection *stdmap.BlockByCollections
}

func (*Mempool) Run

func (m *Mempool) Run(f func(blockByCollection *stdmap.BlockByCollectionBackdata, executionQueue *stdmap.QueuesBackdata) error) error

type Throttle added in v0.33.19

type Throttle interface {
	Init(processables chan<- flow.Identifier) error
	OnBlock(blockID flow.Identifier) error
	OnBlockExecuted(blockID flow.Identifier, height uint64) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL