orchestrator

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job added in v0.0.13

type Job struct {
	ModuleName string // target
	// contains filtered or unexported fields
}

func NewJob added in v0.0.14

func NewJob(storeName string, requestRange *block.Range, ancestorStoreModules []*pbsubstreams.Module, totalJobs, myJobIndex int) *Job

func (*Job) MarshalLogObject added in v0.0.13

func (j *Job) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Job) String added in v0.0.14

func (j *Job) String() string

type JobStat added in v0.0.19

type JobStat struct {
	ModuleName string
	StartAt    time.Time

	RequestRange *block.Range

	CurrentBlock   uint64
	RemainingBlock uint64
	BlockCount     uint64
	BlockSec       float64

	RemoteHost string
}

func (*JobStat) MarshalLogObject added in v0.0.19

func (j *JobStat) MarshalLogObject(enc zapcore.ObjectEncoder) error

type JobsPlanner added in v0.0.14

type JobsPlanner struct {
	sync.Mutex

	AvailableJobs chan *Job
	// contains filtered or unexported fields
}

func NewJobsPlanner added in v0.0.14

func NewJobsPlanner(
	ctx context.Context,
	workPlan WorkPlan,
	subrequestSplitSize uint64,
	stores map[string]*state.Store,
	graph *manifest.ModuleGraph,
) (*JobsPlanner, error)

func (*JobsPlanner) JobCount added in v0.0.14

func (p *JobsPlanner) JobCount() int

func (*JobsPlanner) MarshalLogObject added in v0.0.14

func (p *JobsPlanner) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*JobsPlanner) SignalCompletionUpUntil added in v0.0.14

func (p *JobsPlanner) SignalCompletionUpUntil(storeName string, blockNum uint64)

type RetryableErr added in v0.0.19

type RetryableErr struct {
	// contains filtered or unexported fields
}

func (*RetryableErr) Error added in v0.0.19

func (r *RetryableErr) Error() string

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, availableJobs chan *Job, squasher *Squasher, workerPool *WorkerPool, respFunc substreams.ResponseFunc) (*Scheduler, error)

func (*Scheduler) Launch

func (s *Scheduler) Launch(ctx context.Context, requestModules *pbsubstreams.Modules, result chan error)

type Snapshot added in v0.0.14

type Snapshot struct {
	block.Range
	Path string
}

type Snapshots added in v0.0.14

type Snapshots struct {
	Completes block.Ranges // Shortest completes first, largest last.
	Partials  block.Ranges // First partials first, last last
}

func (*Snapshots) ContainsPartial added in v0.0.14

func (s *Snapshots) ContainsPartial(r *block.Range) bool

func (*Snapshots) LastCompleteSnapshotBefore added in v0.0.14

func (s *Snapshots) LastCompleteSnapshotBefore(blockNum uint64) *block.Range

func (*Snapshots) LastCompletedBlock added in v0.0.14

func (s *Snapshots) LastCompletedBlock() uint64

func (*Snapshots) Sort added in v0.0.14

func (s *Snapshots) Sort()

func (*Snapshots) String added in v0.0.14

func (s *Snapshots) String() string

type Squasher

type Squasher struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

Squasher produces _complete_ stores, by merging backing partial stores.

func NewSquasher

func NewSquasher(ctx context.Context, workPlan WorkPlan, stores map[string]*state.Store, reqStartBlock uint64, jobsPlanner *JobsPlanner) (*Squasher, error)

NewSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveEndBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actual data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..

func (*Squasher) Squash

func (s *Squasher) Squash(moduleName string, partialsRanges block.Ranges) error

func (*Squasher) ValidateStoresReady added in v0.0.14

func (s *Squasher) ValidateStoresReady() (out map[string]*state.Store, err error)

type StorageState

type StorageState struct {
	sync.Mutex
	Snapshots map[string]*Snapshots
}

func FetchStorageState

func FetchStorageState(ctx context.Context, stores map[string]*state.Store) (out *StorageState, err error)

func NewStorageState

func NewStorageState() *StorageState

func (*StorageState) String added in v0.0.14

func (s *StorageState) String() string

type StoreSquasher added in v0.0.14

type StoreSquasher struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewStoreSquasher added in v0.0.14

func NewStoreSquasher(initialStore *state.Store, targetExclusiveBlock, nextExpectedStartBlock uint64, jobsPlanner *JobsPlanner) *StoreSquasher

func (*StoreSquasher) IsEmpty added in v0.0.14

func (s *StoreSquasher) IsEmpty() bool

func (*StoreSquasher) String added in v0.0.14

func (s *StoreSquasher) String() string

type WorkPlan added in v0.0.14

type WorkPlan map[string]*WorkUnit

func (WorkPlan) ProgressMessages added in v0.0.14

func (p WorkPlan) ProgressMessages() (out []*pbsubstreams.ModuleProgress)

func (WorkPlan) SquashPartialsPresent added in v0.0.14

func (p WorkPlan) SquashPartialsPresent(squasher *Squasher) error

func (WorkPlan) String added in v0.0.14

func (p WorkPlan) String() string

type WorkUnit added in v0.0.14

type WorkUnit struct {
	// contains filtered or unexported fields
}

func SplitWork added in v0.0.13

func SplitWork(modName string, storeSaveInterval, modInitBlock, incomingReqStartBlock uint64, snapshots *Snapshots) *WorkUnit

type Worker added in v0.0.13

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run added in v0.0.13

func (w *Worker) Run(ctx context.Context, job *Job, jobStats map[*Job]*JobStat, requestModules *pbsubstreams.Modules, respFunc substreams.ResponseFunc) ([]*block.Range, error)

type WorkerPool added in v0.0.13

type WorkerPool struct {
	JobStats map[*Job]*JobStat
	// contains filtered or unexported fields
}

func NewWorkerPool added in v0.0.13

func NewWorkerPool(workerCount int, grpcClientFactory substreams.GrpcClientFactory) *WorkerPool

func (*WorkerPool) Borrow added in v0.0.13

func (p *WorkerPool) Borrow() *Worker

func (*WorkerPool) ReturnWorker added in v0.0.13

func (p *WorkerPool) ReturnWorker(worker *Worker)

func (*WorkerPool) StartPeriodicLogger added in v0.0.19

func (p *WorkerPool) StartPeriodicLogger()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL