Documentation ¶
Index ¶
- func JobID(blockID flow.Identifier) module.JobID
- func JobToBlock(job module.Job) (*flow.Block, error)
- func JobToBlockHeader(job module.Job) (*flow.Header, error)
- type BlockHeaderJob
- type BlockJob
- type ComponentConsumer
- func (c *ComponentConsumer) Head() (uint64, error)
- func (c *ComponentConsumer) LastProcessedIndex() uint64
- func (c *ComponentConsumer) NotifyJobIsDone(jobID module.JobID) uint64
- func (c *ComponentConsumer) SetPostNotifier(fn NotifyDone)
- func (c *ComponentConsumer) SetPreNotifier(fn NotifyDone)
- func (c *ComponentConsumer) Size() uint
- type Consumer
- type FinalizedBlockReader
- type JobProcessor
- type NotifyDone
- type SealedBlockHeaderReader
- type Worker
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func JobID ¶
func JobID(blockID flow.Identifier) module.JobID
JobID returns the corresponding unique job id of the BlockJob for this job.
func JobToBlock ¶
JobToBlock converts a block job into its corresponding block.
Types ¶
type BlockHeaderJob ¶
BlockHeaderJob implements the Job interface. It converts a Block Header into a Job to be used by job queue.
In current architecture, BlockHeaderJob represents a finalized block enqueued to be processed by a consumer that implements the JobQueue interface.
func BlockHeaderToJob ¶
func BlockHeaderToJob(header *flow.Header) *BlockHeaderJob
BlockHeaderToJob converts the block to a BlockHeaderJob.
func (BlockHeaderJob) ID ¶
func (j BlockHeaderJob) ID() module.JobID
ID converts block id into job id, which guarantees uniqueness.
type BlockJob ¶
BlockJob implements the Job interface. It converts a Block into a Job to be used by job queue.
In current architecture, BlockJob represents a finalized block enqueued to be processed by the BlockConsumer that implements the JobQueue interface.
func BlockToJob ¶
BlockToJob converts the block to a BlockJob.
type ComponentConsumer ¶
func NewComponentConsumer ¶
func NewComponentConsumer( log zerolog.Logger, workSignal <-chan struct{}, progress storage.ConsumerProgress, jobs module.Jobs, defaultIndex uint64, processor JobProcessor, maxProcessing uint64, maxSearchAhead uint64, ) *ComponentConsumer
NewComponentConsumer creates a new ComponentConsumer consumer
func (*ComponentConsumer) Head ¶
func (c *ComponentConsumer) Head() (uint64, error)
Head returns the highest job index available
func (*ComponentConsumer) LastProcessedIndex ¶
func (c *ComponentConsumer) LastProcessedIndex() uint64
LastProcessedIndex returns the last processed job index
func (*ComponentConsumer) NotifyJobIsDone ¶
func (c *ComponentConsumer) NotifyJobIsDone(jobID module.JobID) uint64
NotifyJobIsDone is invoked by the worker to let the consumer know that it is done processing a (block) job.
func (*ComponentConsumer) SetPostNotifier ¶
func (c *ComponentConsumer) SetPostNotifier(fn NotifyDone)
SetPostNotifier sets a notification function that is invoked after marking a job as done in the consumer.
Note: This guarantees that the function is executed after consumer updates the last processed index, but notifications may be missed in the event of a crash.
func (*ComponentConsumer) SetPreNotifier ¶
func (c *ComponentConsumer) SetPreNotifier(fn NotifyDone)
SetPreNotifier sets a notification function that is invoked before marking a job as done in the consumer.
Note: This guarantees that the function is called at least once for each job, but may be executed before consumer updates the last processed index.
func (*ComponentConsumer) Size ¶
func (c *ComponentConsumer) Size() uint
Size returns number of in-memory block jobs that block consumer is processing.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) Check ¶
func (c *Consumer) Check()
Check allows the job publisher to notify the consumer that a new job has been added, so that the consumer can check if the job is processable since multiple checks at the same time are unnecessary, we could only keep one check by checking. an atomic isChecking value.
func (*Consumer) LastProcessedIndex ¶
LastProcessedIndex returns the last processed job index
func (*Consumer) NotifyJobIsDone ¶
NotifyJobIsDone let the consumer know a job has been finished, so that consumer will take the next job from the job queue if there are workers available. It returns the last processed job index.
type FinalizedBlockReader ¶
type FinalizedBlockReader struct {
// contains filtered or unexported fields
}
FinalizedBlockReader provides an abstraction for consumers to read blocks as job.
func NewFinalizedBlockReader ¶
func NewFinalizedBlockReader(state protocol.State, blocks storage.Blocks) *FinalizedBlockReader
NewFinalizedBlockReader creates and returns a FinalizedBlockReader.
func (FinalizedBlockReader) AtIndex ¶
func (r FinalizedBlockReader) AtIndex(index uint64) (module.Job, error)
AtIndex returns the block job at the given index. The block job at an index is just the finalized block at that index (i.e., height).
func (FinalizedBlockReader) Head ¶
func (r FinalizedBlockReader) Head() (uint64, error)
Head returns the last finalized height as job index.
type JobProcessor ¶
type JobProcessor func(irrecoverable.SignalerContext, module.Job, func())
JobProcessor is called by the worker to execute each job. It should only return when the job has completed, either successfully or after performing any failure handling. It takes 3 arguments:
- irrecoverable.SignalerContext: this is used to signal shutdown to the worker and throw any irrecoverable errors back to the parent. The signaller context is passed in from consumer's Start method
- module.Job: the job to be processed. The processor is responsible for decoding into the expected format.
- func(): Call this closure after the job is considered complete. This is a convenience method that avoid needing to a separate ProcessingNotifier for simple usecases. If a different method is used to signal jobs are done to the consumer, this function can be ignored.
type NotifyDone ¶
NotifyDone should be the consumer's NotifyJobIsDone method, or a wrapper for that method. It is wrapped in a closure and added as an argument to the JobProcessor to notify the consumer that the job is done.
type SealedBlockHeaderReader ¶
type SealedBlockHeaderReader struct {
// contains filtered or unexported fields
}
SealedBlockHeaderReader provides an abstraction for consumers to read blocks as job.
func NewSealedBlockHeaderReader ¶
func NewSealedBlockHeaderReader(state protocol.State, headers storage.Headers) *SealedBlockHeaderReader
NewSealedBlockHeaderReader creates and returns a SealedBlockHeaderReader.
func (SealedBlockHeaderReader) AtIndex ¶
func (r SealedBlockHeaderReader) AtIndex(index uint64) (module.Job, error)
AtIndex returns the block header job at the given index. The block header job at an index is just the finalized block header at that index (i.e., height). Error returns:
- storage.ErrNotFound if the provided index is not sealed
func (SealedBlockHeaderReader) Head ¶
func (r SealedBlockHeaderReader) Head() (uint64, error)
Head returns the last sealed height as job index.
type WorkerPool ¶
WorkerPool implements the jobqueue.Worker interface, and wraps the processing to make it compatible with the Component interface.
func NewWorkerPool ¶
func NewWorkerPool(processor JobProcessor, notify NotifyDone, workers uint64) *WorkerPool
NewWorkerPool returns a new WorkerPool