Documentation ¶
Index ¶
- Constants
- func CanTry(maxAttempt int, chunk *ChunkStatus) bool
- func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool
- func JobToChunkLocator(job module.Job) *chunks.Locator
- type ChunkConsumer
- type ChunkJob
- type ChunkJobs
- type ChunkStatus
- type Chunks
- type Engine
- func (e *Engine) Done() <-chan struct{}
- func (e *Engine) Process(originID flow.Identifier, event interface{}) error
- func (e *Engine) ProcessLocal(event interface{}) error
- func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)
- func (e *Engine) Ready() <-chan struct{}
- func (e *Engine) Submit(originID flow.Identifier, event interface{})
- func (e *Engine) SubmitLocal(event interface{})
- func (e *Engine) WithFinishProcessing(finishProcessing FinishProcessing)
- type EngineWorker
- type FinishProcessing
- type Worker
Constants ¶
const (
DefaultJobIndex = int64(0)
)
Variables ¶
This section is empty.
Functions ¶
func CanTry ¶
func CanTry(maxAttempt int, chunk *ChunkStatus) bool
CanTry returns checks the history attempts and determine whether a chunk request can be tried again.
func IsSystemChunk ¶
func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool
IsSystemChunk returns true if `chunkIndex` points to a system chunk in `result`. Otherwise, it returns false. In the current version, a chunk is a system chunk if it is the last chunk of the execution result.
Types ¶
type ChunkConsumer ¶
type ChunkConsumer struct {
// contains filtered or unexported fields
}
ChunkConsumer consumes the jobs from the job queue, and pass it to the Worker for processing. It wraps the generic job consumer in order to be used as a ReadyDoneAware on startup
func NewChunkConsumer ¶
func NewChunkConsumer( log zerolog.Logger, processedIndex storage.ConsumerProgress, chunksQueue storage.ChunksQueue, engine EngineWorker, maxProcessing int64, ) *ChunkConsumer
func (ChunkConsumer) Check ¶
func (c ChunkConsumer) Check()
func (*ChunkConsumer) Done ¶
func (c *ChunkConsumer) Done() <-chan struct{}
func (*ChunkConsumer) NotifyJobIsDone ¶
func (c *ChunkConsumer) NotifyJobIsDone(jobID module.JobID)
func (*ChunkConsumer) Ready ¶
func (c *ChunkConsumer) Ready() <-chan struct{}
type ChunkJob ¶
ChunkJob converts a chunk locator into a Job to be used by job queue.
func ChunkLocatorToJob ¶
type ChunkJobs ¶
type ChunkJobs struct {
// contains filtered or unexported fields
}
ChunkJobs wraps the storage layer to provide an abstraction for consumers to read jobs.
type ChunkStatus ¶
type ChunkStatus struct { Chunk *flow.Chunk ExecutionResultID flow.Identifier Height uint64 Agrees []flow.Identifier Disagrees []flow.Identifier LastAttempt time.Time Attempt int }
ChunkStatus is a data struct represents the current status of fetching chunk data pack for the chunk.
func NewChunkStatus ¶
func NewChunkStatus( chunk *flow.Chunk, resultID flow.Identifier, height uint64, agrees []flow.Identifier, disagrees []flow.Identifier, ) *ChunkStatus
func (ChunkStatus) Checksum ¶
func (s ChunkStatus) Checksum() flow.Identifier
func (ChunkStatus) ID ¶
func (s ChunkStatus) ID() flow.Identifier
type Chunks ¶
func (*Chunks) Add ¶
func (cs *Chunks) Add(chunk *ChunkStatus) bool
func (*Chunks) All ¶
func (cs *Chunks) All() []*ChunkStatus
func (*Chunks) ByID ¶
func (cs *Chunks) ByID(chunkID flow.Identifier) (*ChunkStatus, bool)
func (*Chunks) IncrementAttempt ¶
func (cs *Chunks) IncrementAttempt(chunkID flow.Identifier) bool
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Fetch engine processes each chunk in the chunk job queue, fetches its chunk data pack from the execution nodes who produced the receipts, and when the chunk data pack are received, it passes the verifiable chunk data to verifier engine to verify the chunk.
func (*Engine) Done ¶
func (e *Engine) Done() <-chan struct{}
Done terminates the engine and returns a channel that is closed when the termination is done
func (*Engine) Process ¶
func (e *Engine) Process(originID flow.Identifier, event interface{}) error
Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.
func (*Engine) ProcessLocal ¶
ProcessLocal processes an event originating on the local node. Note: this method is required as an Engine implementation, however it should not be invoked as match engine requires origin ID of events it receives. Use Process method instead.
func (*Engine) ProcessMyChunk ¶
func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)
ProcessMyChunk processes the chunk that assigned to me. It should not be blocking since multiple workers might be calling it concurrently. It skips chunks for sealed blocks. It fetches the chunk data pack, once received, verifier engine will be verifying Once a chunk has been processed, it will call the FinishProcessing callback to notify the chunkconsumer in order to process the next chunk.
func (*Engine) Ready ¶
func (e *Engine) Ready() <-chan struct{}
Ready initializes the engine and returns a channel that is closed when the initialization is done
func (*Engine) Submit ¶
func (e *Engine) Submit(originID flow.Identifier, event interface{})
Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.
func (*Engine) SubmitLocal ¶
func (e *Engine) SubmitLocal(event interface{})
SubmitLocal submits an event originating on the local node.
func (*Engine) WithFinishProcessing ¶
func (e *Engine) WithFinishProcessing(finishProcessing FinishProcessing)
type EngineWorker ¶
type EngineWorker interface { ProcessMyChunk(locator *chunks.Locator) WithFinishProcessing(finishProcessing FinishProcessing) }
type FinishProcessing ¶
type FinishProcessing interface {
Notify(chunkID flow.Identifier)
}
FinishProcessing is for the worker's underneath engine to report a chunk has been processed without knowing the job queue it's a callback so that the worker can convert the chunk id into a job id, and notify the consumer about a finished job with the
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker receives job from job consumer and converts it back to Chunk for engine to process
func NewWorker ¶
func NewWorker(engine EngineWorker) *Worker
func (*Worker) Notify ¶
func (w *Worker) Notify(chunkID flow.Identifier)