fetcher

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2021 License: AGPL-3.0 Imports: 19 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CanTry

func CanTry(maxAttempt int, chunk *ChunkStatus) bool

CanTry returns checks the history attempts and determine whether a chunk request can be tried again.

func IsSystemChunk

func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool

IsSystemChunk returns true if `chunkIndex` points to a system chunk in `result`. Otherwise, it returns false. In the current version, a chunk is a system chunk if it is the last chunk of the execution result.

Types

type AssignedChunkProcessor added in v0.15.4

type AssignedChunkProcessor interface {
	// ProcessAssignedChunk receives an assigned chunk locator and processes its corresponding chunk.
	// A chunk processor is expected to shape a verifiable chunk out of the assigned chunk, and pass it to
	// the verifier Engine.
	// Note: it should be implemented in a non-blocking way.
	ProcessAssignedChunk(locator *chunks.Locator)

	// WithChunkConsumerNotifier sets the notifier of this chunk processor.
	// The notifier is called by the internal logic of the processor to let the consumer know that
	// the processor is done by processing a chunk so that the next chunk may be passed to the processor
	// by the consumer through invoking ProcessAssignedChunk of this processor.
	WithChunkConsumerNotifier(notifier module.ProcessingNotifier)
}

AssignedChunkProcessor should be implemented by the verification node engine responsible for processing assigned chunk locators to this node.

In the current version, the fetcher engine is responsible of processing the assigned chunk locators. From the architectural perspective, AssignedChunkProcessor aligns as following on the verification pipeline: ---------------- ------------------ --------------------------- | Chunk Queue | ---> assigned chunk locators --> | Chunk Consumer | ---> chunk workers --> | Assigned Chunk Processor| ---------------- ------------------ ---------------------------

type ChunkDataPackHandler added in v0.15.4

type ChunkDataPackHandler interface {
	// HandleChunkDataPack is called by the ChunkDataPackRequester anytime a new requested chunk arrives.
	// It contains the logic of handling the chunk data pack.
	HandleChunkDataPack(originID flow.Identifier, chunkDataPack *flow.ChunkDataPack, collection *flow.Collection)

	// NotifyChunkDataPackSealed is called by the ChunkDataPackRequester to notify the ChunkDataPackHandler that the chunk ID has been sealed and
	// hence the requester will no longer request it.
	//
	// When the requester calls this callback method, it will never returns a chunk data pack for this chunk ID to the handler (i.e.,
	// through HandleChunkDataPack).
	NotifyChunkDataPackSealed(chunkID flow.Identifier)
}

ChunkDataPackHandler encapsulates the logic of handling a requested chunk data pack upon its arrival.

type ChunkDataPackRequester added in v0.15.4

type ChunkDataPackRequester interface {
	// Request makes the request of chunk data pack for the specified chunk id.
	Request(chunkID flow.Identifier, executorID flow.Identifier) error
}

ChunkDataPackRequester encapsulates the logic of requesting a chunk data pack from an execution node.

type ChunkStatus

type ChunkStatus struct {
	Chunk             *flow.Chunk
	ExecutionResultID flow.Identifier
	Height            uint64
	Agrees            []flow.Identifier
	Disagrees         []flow.Identifier
	LastAttempt       time.Time
	Attempt           int
}

ChunkStatus is a data struct represents the current status of fetching chunk data pack for the chunk.

func NewChunkStatus

func NewChunkStatus(
	chunk *flow.Chunk,
	resultID flow.Identifier,
	height uint64,
	agrees []flow.Identifier,
	disagrees []flow.Identifier,
) *ChunkStatus

func (ChunkStatus) Checksum

func (s ChunkStatus) Checksum() flow.Identifier

func (ChunkStatus) ID

func (s ChunkStatus) ID() flow.Identifier

type Chunks

type Chunks struct {
	*stdmap.Backend
}

func NewChunks

func NewChunks(limit uint) *Chunks

func (*Chunks) Add

func (cs *Chunks) Add(chunk *ChunkStatus) bool

func (*Chunks) All

func (cs *Chunks) All() []*ChunkStatus

func (*Chunks) ByID

func (cs *Chunks) ByID(chunkID flow.Identifier) (*ChunkStatus, bool)

func (*Chunks) IncrementAttempt

func (cs *Chunks) IncrementAttempt(chunkID flow.Identifier) bool

func (*Chunks) Rem

func (cs *Chunks) Rem(chunkID flow.Identifier) bool

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Fetch engine processes each chunk in the chunk job queue, fetches its chunk data pack from the execution nodes who produced the receipts, and when the chunk data pack are received, it passes the verifiable chunk data to verifier engine to verify the chunk.

func New

func New(
	log zerolog.Logger,
	metrics module.VerificationMetrics,
	tracer module.Tracer,
	net module.Network,
	me module.Local,
	verifier network.Engine,
	state protocol.State,
	chunks *Chunks,
	headers storage.Headers,
	retryInterval time.Duration,
	maxAttempt int,
) (*Engine, error)

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done terminates the engine and returns a channel that is closed when the termination is done

func (*Engine) HandleChunkDataPack added in v0.15.4

func (e *Engine) HandleChunkDataPack(originID flow.Identifier, chunkDataPack *flow.ChunkDataPack, collection *flow.Collection) error

HandleChunkDataPack is called by the chunk requester module everytime a new request chunk arrives. The chunks are supposed to be deduplicated by the requester. So invocation of this method indicates arrival of a distinct requested chunk.

func (*Engine) Process

func (e *Engine) Process(originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node. Note: this method is required as an Engine implementation, however it should not be invoked as match engine requires origin ID of events it receives. Use Process method instead.

func (*Engine) ProcessMyChunk

func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)

ProcessMyChunk processes the chunk that assigned to me. It should not be blocking since multiple workers might be calling it concurrently. It skips chunks for sealed blocks. It fetches the chunk data pack, once received, verifier engine will be verifying Once a chunk has been processed, it will call the processing notifier callback to notify the chunk consumer in order to process the next chunk.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready initializes the engine and returns a channel that is closed when the initialization is done

func (*Engine) Submit

func (e *Engine) Submit(originID flow.Identifier, event interface{})

Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(event interface{})

SubmitLocal submits an event originating on the local node.

func (*Engine) WithChunkConsumerNotifier added in v0.15.4

func (e *Engine) WithChunkConsumerNotifier(notifier module.ProcessingNotifier)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL