chunkconsumer

package
v0.28.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2022 License: AGPL-3.0 Imports: 8 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultJobIndex     = uint64(0)
	DefaultChunkWorkers = uint64(5)
)

Variables

This section is empty.

Functions

func JobToChunkLocator

func JobToChunkLocator(job module.Job) (*chunks.Locator, error)

Types

type ChunkConsumer

type ChunkConsumer struct {
	// contains filtered or unexported fields
}

ChunkConsumer consumes the jobs from the job queue, and pass it to the Worker for processing. It wraps the generic job consumer in order to be used as a ReadyDoneAware on startup

func NewChunkConsumer

func NewChunkConsumer(
	log zerolog.Logger,
	metrics module.VerificationMetrics,
	processedIndex storage.ConsumerProgress,
	chunksQueue storage.ChunksQueue,
	chunkProcessor fetcher.AssignedChunkProcessor,
	maxProcessing uint64,
) *ChunkConsumer

func (ChunkConsumer) Check

func (c ChunkConsumer) Check()

func (*ChunkConsumer) Done

func (c *ChunkConsumer) Done() <-chan struct{}

func (*ChunkConsumer) NotifyJobIsDone

func (c *ChunkConsumer) NotifyJobIsDone(jobID module.JobID)

func (*ChunkConsumer) Ready

func (c *ChunkConsumer) Ready() <-chan struct{}

func (*ChunkConsumer) Size added in v0.17.6

func (c *ChunkConsumer) Size() uint

Size returns number of in-memory chunk jobs that chunk consumer is processing.

type ChunkJob

type ChunkJob struct {
	ChunkLocator *chunks.Locator
}

ChunkJob converts a chunk locator into a Job to be used by job queue.

func ChunkLocatorToJob

func ChunkLocatorToJob(locator *chunks.Locator) *ChunkJob

func (ChunkJob) ID

func (j ChunkJob) ID() module.JobID

ID converts chunk locator identifier into job id, which guarantees uniqueness.

type ChunkJobs

type ChunkJobs struct {
	// contains filtered or unexported fields
}

ChunkJobs wraps the storage layer to provide an abstraction for consumers to read jobs.

func (*ChunkJobs) AtIndex

func (j *ChunkJobs) AtIndex(index uint64) (module.Job, error)

func (*ChunkJobs) Head

func (j *ChunkJobs) Head() (uint64, error)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker receives job from job consumer and converts it back to Chunk for engine to process

func NewWorker

func NewWorker(engine fetcher.AssignedChunkProcessor) *Worker

func (*Worker) Notify

func (w *Worker) Notify(chunkLocatorID flow.Identifier)

func (*Worker) Run

func (w *Worker) Run(job module.Job) error

Run converts the job to Chunk, it's guaranteed to work, because ChunkJobs converted chunk into job symmetrically

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL