fetcher

package
v0.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2021 License: AGPL-3.0 Imports: 20 Imported by: 6

Documentation

Index

Constants

View Source
const (
	DefaultJobIndex = uint64(0)
)

Variables

This section is empty.

Functions

func CanTry

func CanTry(maxAttempt int, chunk *ChunkStatus) bool

CanTry returns checks the history attempts and determine whether a chunk request can be tried again.

func IsSystemChunk

func IsSystemChunk(chunkIndex uint64, result *flow.ExecutionResult) bool

IsSystemChunk returns true if `chunkIndex` points to a system chunk in `result`. Otherwise, it returns false. In the current version, a chunk is a system chunk if it is the last chunk of the execution result.

func JobToChunkLocator

func JobToChunkLocator(job module.Job) (*chunks.Locator, error)

Types

type ChunkConsumer

type ChunkConsumer struct {
	// contains filtered or unexported fields
}

ChunkConsumer consumes the jobs from the job queue, and pass it to the Worker for processing. It wraps the generic job consumer in order to be used as a ReadyDoneAware on startup

func NewChunkConsumer

func NewChunkConsumer(
	log zerolog.Logger,
	processedIndex storage.ConsumerProgress,
	chunksQueue storage.ChunksQueue,
	engine EngineWorker,
	maxProcessing int64,
) *ChunkConsumer

func (ChunkConsumer) Check

func (c ChunkConsumer) Check()

func (*ChunkConsumer) Done

func (c *ChunkConsumer) Done() <-chan struct{}

func (*ChunkConsumer) NotifyJobIsDone

func (c *ChunkConsumer) NotifyJobIsDone(jobID module.JobID)

func (*ChunkConsumer) Ready

func (c *ChunkConsumer) Ready() <-chan struct{}

type ChunkJob

type ChunkJob struct {
	ChunkLocator *chunks.Locator
}

ChunkJob converts a chunk locator into a Job to be used by job queue.

func ChunkLocatorToJob

func ChunkLocatorToJob(locator *chunks.Locator) *ChunkJob

func (ChunkJob) ID

func (j ChunkJob) ID() module.JobID

ID converts chunk locator identifier into job id, which guarantees uniqueness.

type ChunkJobs

type ChunkJobs struct {
	// contains filtered or unexported fields
}

ChunkJobs wraps the storage layer to provide an abstraction for consumers to read jobs.

func (*ChunkJobs) AtIndex

func (j *ChunkJobs) AtIndex(index uint64) (module.Job, error)

func (*ChunkJobs) Head

func (j *ChunkJobs) Head() (uint64, error)

type ChunkStatus

type ChunkStatus struct {
	Chunk             *flow.Chunk
	ExecutionResultID flow.Identifier
	Height            uint64
	Agrees            []flow.Identifier
	Disagrees         []flow.Identifier
	LastAttempt       time.Time
	Attempt           int
}

ChunkStatus is a data struct represents the current status of fetching chunk data pack for the chunk.

func NewChunkStatus

func NewChunkStatus(
	chunk *flow.Chunk,
	resultID flow.Identifier,
	height uint64,
	agrees []flow.Identifier,
	disagrees []flow.Identifier,
) *ChunkStatus

func (ChunkStatus) Checksum

func (s ChunkStatus) Checksum() flow.Identifier

func (ChunkStatus) ID

func (s ChunkStatus) ID() flow.Identifier

type Chunks

type Chunks struct {
	*stdmap.Backend
}

func NewChunks

func NewChunks(limit uint) *Chunks

func (*Chunks) Add

func (cs *Chunks) Add(chunk *ChunkStatus) bool

func (*Chunks) All

func (cs *Chunks) All() []*ChunkStatus

func (*Chunks) ByID

func (cs *Chunks) ByID(chunkID flow.Identifier) (*ChunkStatus, bool)

func (*Chunks) IncrementAttempt

func (cs *Chunks) IncrementAttempt(chunkID flow.Identifier) bool

func (*Chunks) Rem

func (cs *Chunks) Rem(chunkID flow.Identifier) bool

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Fetch engine processes each chunk in the chunk job queue, fetches its chunk data pack from the execution nodes who produced the receipts, and when the chunk data pack are received, it passes the verifiable chunk data to verifier engine to verify the chunk.

func New

func New(
	log zerolog.Logger,
	metrics module.VerificationMetrics,
	tracer module.Tracer,
	net module.Network,
	me module.Local,
	verifier network.Engine,
	state protocol.State,
	chunks *Chunks,
	headers storage.Headers,
	retryInterval time.Duration,
	maxAttempt int,
) (*Engine, error)

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done terminates the engine and returns a channel that is closed when the termination is done

func (*Engine) Process

func (e *Engine) Process(originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node. Note: this method is required as an Engine implementation, however it should not be invoked as match engine requires origin ID of events it receives. Use Process method instead.

func (*Engine) ProcessMyChunk

func (e *Engine) ProcessMyChunk(c *flow.Chunk, resultID flow.Identifier)

ProcessMyChunk processes the chunk that assigned to me. It should not be blocking since multiple workers might be calling it concurrently. It skips chunks for sealed blocks. It fetches the chunk data pack, once received, verifier engine will be verifying Once a chunk has been processed, it will call the FinishProcessing callback to notify the chunkconsumer in order to process the next chunk.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready initializes the engine and returns a channel that is closed when the initialization is done

func (*Engine) Submit

func (e *Engine) Submit(originID flow.Identifier, event interface{})

Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(event interface{})

SubmitLocal submits an event originating on the local node.

func (*Engine) WithFinishProcessing

func (e *Engine) WithFinishProcessing(finishProcessing FinishProcessing)

type EngineWorker

type EngineWorker interface {
	ProcessMyChunk(locator *chunks.Locator)
	WithFinishProcessing(finishProcessing FinishProcessing)
}

type FinishProcessing

type FinishProcessing interface {
	Notify(chunkID flow.Identifier)
}

FinishProcessing is for the worker's underneath engine to report a chunk has been processed without knowing the job queue it's a callback so that the worker can convert the chunk id into a job id, and notify the consumer about a finished job with the

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker receives job from job consumer and converts it back to Chunk for engine to process

func NewWorker

func NewWorker(engine EngineWorker) *Worker

func (*Worker) Notify

func (w *Worker) Notify(chunkID flow.Identifier)

func (*Worker) Run

func (w *Worker) Run(job module.Job) error

Run converts the job to Chunk, it's guaranteed to work, because ChunkJobs converted chunk into job symmetrically

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL