ingestion

package
v0.29.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2023 License: AGPL-3.0 Imports: 34 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateExecutionReceipt added in v0.26.1

func GenerateExecutionReceipt(
	me module.Local,
	receiptHasher hash.Hasher,
	result *flow.ExecutionResult,
	spockSignatures []crypto.Signature,
) (
	*flow.ExecutionReceipt,
	error,
)

Types

type Deltas added in v0.10.0

type Deltas struct {
	*stdmap.Backend
}

func NewDeltas added in v0.10.0

func NewDeltas(limit uint, opts ...stdmap.OptionFunc) (*Deltas, error)

NewDeltas creates a new memory pool for state deltas

func (*Deltas) Add added in v0.10.0

func (s *Deltas) Add(delta *messages.ExecutionStateDelta) bool

Add adds an state deltas to the mempool.

func (*Deltas) All added in v0.10.0

func (s *Deltas) All() []*messages.ExecutionStateDelta

All returns all block Deltass from the pool.

func (*Deltas) ByBlockID added in v0.10.0

func (s *Deltas) ByBlockID(blockID flow.Identifier) (*messages.ExecutionStateDelta, bool)

ByBlockID returns the state deltas for a block from the mempool.

func (*Deltas) Remove added in v0.27.0

func (s *Deltas) Remove(blockID flow.Identifier) bool

Remove will remove a deltas by block ID.

type Engine

type Engine struct {
	psEvents.Noop // satisfy protocol events consumer interface
	// contains filtered or unexported fields
}

An Engine receives and saves incoming blocks.

func New

func New(
	logger zerolog.Logger,
	net network.Network,
	me module.Local,
	request module.Requester,
	state protocol.State,
	blocks storage.Blocks,
	collections storage.Collections,
	events storage.Events,
	serviceEvents storage.ServiceEvents,
	transactionResults storage.TransactionResults,
	executionEngine computation.ComputationManager,
	providerEngine provider.ProviderEngine,
	execState state.ExecutionState,
	metrics module.ExecutionMetrics,
	tracer module.Tracer,
	extLog bool,
	syncFilter flow.IdentityFilter,
	syncDeltas mempool.Deltas,
	syncThreshold int,
	syncFast bool,
	checkAuthorizedAtBlock func(blockID flow.Identifier) (bool, error),
	pruner *pruner.Pruner,
	uploaders []uploader.Uploader,
	stopControl *StopControl,
) (*Engine, error)

func (*Engine) BlockFinalized added in v0.28.0

func (e *Engine) BlockFinalized(h *flow.Header)

BlockFinalized implements part of state.protocol.Consumer interface. Method gets called for every finalized block

func (*Engine) BlockProcessable added in v0.10.0

func (e *Engine) BlockProcessable(b *flow.Header)

BlockProcessable handles the new verified blocks (blocks that have passed consensus validation) received from the consensus nodes NOTE: BlockProcessable might be called multiple times for the same block. NOTE: Ready calls reloadUnexecutedBlocks during initialization, which handles dropped protocol events.

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a channel that will close when the engine has successfully stopped.

func (*Engine) ExecuteScriptAtBlockID

func (e *Engine) ExecuteScriptAtBlockID(ctx context.Context, script []byte, arguments [][]byte, blockID flow.Identifier) ([]byte, error)

func (*Engine) GetAccount

func (e *Engine) GetAccount(ctx context.Context, addr flow.Address, blockID flow.Identifier) (*flow.Account, error)

func (*Engine) GetRegisterAtBlockID added in v0.23.1

func (e *Engine) GetRegisterAtBlockID(ctx context.Context, owner, key []byte, blockID flow.Identifier) ([]byte, error)

func (*Engine) OnCollection

func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity)

OnCollection is a callback for handling the collections requested by the collection requester.

func (*Engine) Process

func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a channel that will close when the engine has successfully started.

func (*Engine) Submit

func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{})

Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(event interface{})

SubmitLocal submits an event originating on the local node.

type IngestRPC

type IngestRPC interface {

	// ExecuteScriptAtBlockID executes a script at the given Block id
	ExecuteScriptAtBlockID(ctx context.Context, script []byte, arguments [][]byte, blockID flow.Identifier) ([]byte, error)

	// GetAccount returns the Account details at the given Block id
	GetAccount(ctx context.Context, address flow.Address, blockID flow.Identifier) (*flow.Account, error)

	// GetRegisterAtBlockID returns the value of a register at the given Block id (if available)
	GetRegisterAtBlockID(ctx context.Context, owner, key []byte, blockID flow.Identifier) ([]byte, error)
}

IngestRPC represents the RPC calls that the execution ingest engine exposes to support the Access Node API calls

type Mempool

type Mempool struct {
	ExecutionQueue    *stdmap.Queues
	BlockByCollection *stdmap.BlockByCollections
}

func (*Mempool) Run

func (m *Mempool) Run(f func(blockByCollection *stdmap.BlockByCollectionBackdata, executionQueue *stdmap.QueuesBackdata) error) error

type StopControl added in v0.28.0

type StopControl struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

StopControl is a specialized component used by ingestion.Engine to encapsulate control of pausing/stopping blocks execution. It is intended to work tightly with the Engine, not as a general mechanism or interface. StopControl follows states described in StopState

func NewStopControl added in v0.28.0

func NewStopControl(log zerolog.Logger, paused bool, lastExecutedHeight uint64) *StopControl

NewStopControl creates new empty NewStopControl

func (*StopControl) GetState added in v0.28.0

func (s *StopControl) GetState() StopControlState

GetState returns current state of StopControl module

func (*StopControl) GetStopHeight added in v0.28.0

func (s *StopControl) GetStopHeight() (uint64, bool)

GetStopHeight returns:

  • height
  • crash

Values are undefined if they were not previously set

func (*StopControl) IsPaused added in v0.28.0

func (s *StopControl) IsPaused() bool

IsPaused returns true is block execution has been paused

func (*StopControl) SetStopHeight added in v0.28.0

func (s *StopControl) SetStopHeight(height uint64, crash bool) (uint64, bool, error)

SetStopHeight sets new stop height and crash mode, and return old values:

  • height
  • crash

Returns error if the stopping process has already commenced, new values will be rejected.

type StopControlState added in v0.28.0

type StopControlState byte
const (
	// StopControlOff default state, envisioned to be used most of the time. Stopping module is simply off,
	// blocks will be processed "as usual".
	StopControlOff StopControlState = iota

	// StopControlSet means stop height is set but not reached yet, and nothing related to stopping happened yet.
	// We could still go back to StopControlOff or progress to StopControlCommenced.
	StopControlSet

	// StopControlCommenced indicates that stopping process has commenced and no parameters can be changed anymore.
	// For example, blocks at or above stop height has been received, but finalization didn't reach stop height yet.
	// It can only progress to StopControlPaused
	StopControlCommenced

	// StopControlPaused means EN has stopped processing blocks. It can happen by reaching the set stopping `height`, or
	// if the node was started in pause mode.
	// It is a final state and cannot be changed
	StopControlPaused
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL