Documentation ¶
Index ¶
- func CanTry(maxAttempt int, chunk *ChunkStatus) bool
- func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool
- type AssignedChunkProcessor
- type ChunkDataPackHandler
- type ChunkDataPackRequester
- type ChunkStatus
- type Chunks
- type Engine
- func (e *Engine) Done() <-chan struct{}
- func (e *Engine) HandleChunkDataPack(originID flow.Identifier, chunkDataPack *flow.ChunkDataPack, ...) error
- func (e *Engine) Process(originID flow.Identifier, event interface{}) error
- func (e *Engine) ProcessLocal(event interface{}) error
- func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)
- func (e *Engine) Ready() <-chan struct{}
- func (e *Engine) Submit(originID flow.Identifier, event interface{})
- func (e *Engine) SubmitLocal(event interface{})
- func (e *Engine) WithChunkConsumerNotifier(notifier module.ProcessingNotifier)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CanTry ¶
func CanTry(maxAttempt int, chunk *ChunkStatus) bool
CanTry returns checks the history attempts and determine whether a chunk request can be tried again.
func IsSystemChunk ¶
func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool
IsSystemChunk returns true if `chunkIndex` points to a system chunk in `result`. Otherwise, it returns false. In the current version, a chunk is a system chunk if it is the last chunk of the execution result.
Types ¶
type AssignedChunkProcessor ¶ added in v0.15.4
type AssignedChunkProcessor interface { // ProcessAssignedChunk receives an assigned chunk locator and processes its corresponding chunk. // A chunk processor is expected to shape a verifiable chunk out of the assigned chunk, and pass it to // the verifier Engine. // Note: it should be implemented in a non-blocking way. ProcessAssignedChunk(locator *chunks.Locator) // WithChunkConsumerNotifier sets the notifier of this chunk processor. // The notifier is called by the internal logic of the processor to let the consumer know that // the processor is done by processing a chunk so that the next chunk may be passed to the processor // by the consumer through invoking ProcessAssignedChunk of this processor. WithChunkConsumerNotifier(notifier module.ProcessingNotifier) }
AssignedChunkProcessor should be implemented by the verification node engine responsible for processing assigned chunk locators to this node.
In the current version, the fetcher engine is responsible of processing the assigned chunk locators. From the architectural perspective, AssignedChunkProcessor aligns as following on the verification pipeline: ---------------- ------------------ --------------------------- | Chunk Queue | ---> assigned chunk locators --> | Chunk Consumer | ---> chunk workers --> | Assigned Chunk Processor| ---------------- ------------------ ---------------------------
type ChunkDataPackHandler ¶ added in v0.15.4
type ChunkDataPackHandler interface { // HandleChunkDataPack is called by the ChunkDataPackRequester anytime a new requested chunk arrives. // It contains the logic of handling the chunk data pack. HandleChunkDataPack(originID flow.Identifier, chunkDataPack *flow.ChunkDataPack, collection *flow.Collection) // NotifyChunkDataPackSealed is called by the ChunkDataPackRequester to notify the ChunkDataPackHandler that the chunk ID has been sealed and // hence the requester will no longer request it. // // When the requester calls this callback method, it will never returns a chunk data pack for this chunk ID to the handler (i.e., // through HandleChunkDataPack). NotifyChunkDataPackSealed(chunkID flow.Identifier) }
ChunkDataPackHandler encapsulates the logic of handling a requested chunk data pack upon its arrival.
type ChunkDataPackRequester ¶ added in v0.15.4
type ChunkDataPackRequester interface { // Request makes the request of chunk data pack for the specified chunk id. Request(chunkID flow.Identifier, executorID flow.Identifier) error }
ChunkDataPackRequester encapsulates the logic of requesting a chunk data pack from an execution node.
type ChunkStatus ¶
type ChunkStatus struct { Chunk *flow.Chunk ExecutionResultID flow.Identifier Height uint64 Agrees []flow.Identifier Disagrees []flow.Identifier LastAttempt time.Time Attempt int }
ChunkStatus is a data struct represents the current status of fetching chunk data pack for the chunk.
func NewChunkStatus ¶
func NewChunkStatus( chunk *flow.Chunk, resultID flow.Identifier, height uint64, agrees []flow.Identifier, disagrees []flow.Identifier, ) *ChunkStatus
func (ChunkStatus) Checksum ¶
func (s ChunkStatus) Checksum() flow.Identifier
func (ChunkStatus) ID ¶
func (s ChunkStatus) ID() flow.Identifier
type Chunks ¶
func (*Chunks) Add ¶
func (cs *Chunks) Add(chunk *ChunkStatus) bool
func (*Chunks) All ¶
func (cs *Chunks) All() []*ChunkStatus
func (*Chunks) ByID ¶
func (cs *Chunks) ByID(chunkID flow.Identifier) (*ChunkStatus, bool)
func (*Chunks) IncrementAttempt ¶
func (cs *Chunks) IncrementAttempt(chunkID flow.Identifier) bool
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Fetch engine processes each chunk in the chunk job queue, fetches its chunk data pack from the execution nodes who produced the receipts, and when the chunk data pack are received, it passes the verifiable chunk data to verifier engine to verify the chunk.
func (*Engine) Done ¶
func (e *Engine) Done() <-chan struct{}
Done terminates the engine and returns a channel that is closed when the termination is done
func (*Engine) HandleChunkDataPack ¶ added in v0.15.4
func (e *Engine) HandleChunkDataPack(originID flow.Identifier, chunkDataPack *flow.ChunkDataPack, collection *flow.Collection) error
HandleChunkDataPack is called by the chunk requester module everytime a new request chunk arrives. The chunks are supposed to be deduplicated by the requester. So invocation of this method indicates arrival of a distinct requested chunk.
func (*Engine) Process ¶
func (e *Engine) Process(originID flow.Identifier, event interface{}) error
Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.
func (*Engine) ProcessLocal ¶
ProcessLocal processes an event originating on the local node. Note: this method is required as an Engine implementation, however it should not be invoked as match engine requires origin ID of events it receives. Use Process method instead.
func (*Engine) ProcessMyChunk ¶
func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)
ProcessMyChunk processes the chunk that assigned to me. It should not be blocking since multiple workers might be calling it concurrently. It skips chunks for sealed blocks. It fetches the chunk data pack, once received, verifier engine will be verifying Once a chunk has been processed, it will call the processing notifier callback to notify the chunk consumer in order to process the next chunk.
func (*Engine) Ready ¶
func (e *Engine) Ready() <-chan struct{}
Ready initializes the engine and returns a channel that is closed when the initialization is done
func (*Engine) Submit ¶
func (e *Engine) Submit(originID flow.Identifier, event interface{})
Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.
func (*Engine) SubmitLocal ¶
func (e *Engine) SubmitLocal(event interface{})
SubmitLocal submits an event originating on the local node.
func (*Engine) WithChunkConsumerNotifier ¶ added in v0.15.4
func (e *Engine) WithChunkConsumerNotifier(notifier module.ProcessingNotifier)