orchestrator

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PartialsChannelClosed = errors.New("partial chunks done")
View Source
var SkipRange = errors.New("skip range")

Functions

This section is empty.

Types

type MultiSquasher added in v0.0.21

type MultiSquasher struct {
	// contains filtered or unexported fields
}

MultiSquasher produces _complete_ stores, by merging backing partial stores.

func NewMultiSquasher added in v0.0.21

func NewMultiSquasher(
	ctx context.Context,
	runtimeConfig config.RuntimeConfig,
	modulesStorageStateMap storage.ModuleStorageStateMap,
	storeConfigs store.ConfigMap,
	upToBlock uint64,
	onStoreCompletedUntilBlock func(storeName string, blockNum uint64),
) (*MultiSquasher, error)

NewMultiSquasher receives stores, initializes them and fetches them from the existing storage. It prepares itself to receive Squash() requests that should correspond to what is missing for those stores to reach `targetExclusiveEndBlock`. This is managed externally by the Scheduler/Strategy. Eventually, ideally, all components are synchronizes around the actual data: the state of storages present, the requests needed to fill in those stores up to the target block, etc..

func (*MultiSquasher) Launch added in v0.0.21

func (s *MultiSquasher) Launch(ctx context.Context)

func (*MultiSquasher) Squash added in v0.0.21

func (s *MultiSquasher) Squash(ctx context.Context, moduleName string, partialsRanges block.Ranges) error

func (*MultiSquasher) Wait added in v0.0.21

func (s *MultiSquasher) Wait(ctx context.Context) (out store.Map, err error)

type NoopMapSquasher added in v0.1.0

type NoopMapSquasher struct {
	// contains filtered or unexported fields
}

type ParallelProcessor added in v0.1.0

type ParallelProcessor struct {
	// contains filtered or unexported fields
}

func BuildParallelProcessor added in v0.1.0

func BuildParallelProcessor(
	ctx context.Context,
	reqDetails *reqctx.RequestDetails,
	runtimeConfig config.RuntimeConfig,
	outputGraph *outputmodules.Graph,
	execoutStorage *execout.Configs,
	respFunc func(resp substreams.ResponseFromAnyTier) error,
	storeConfigs store.ConfigMap,
	pendingUndoMessage *pbsubstreamsrpc.Response,
) (*ParallelProcessor, error)

BuildParallelProcessor is only called on tier1

func (*ParallelProcessor) Run added in v0.1.0

func (b *ParallelProcessor) Run(ctx context.Context) (storeMap store.Map, err error)

type Scheduler

type Scheduler struct {
	OnStoreJobTerminated func(ctx context.Context, moduleName string, partialsWritten block.Ranges) error
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(workPlan *work.Plan, respFunc substreams.ResponseFunc, upstreamRequestModules *pbsubstreams.Modules) *Scheduler

func (*Scheduler) OnStoreCompletedUntilBlock added in v0.0.21

func (s *Scheduler) OnStoreCompletedUntilBlock(storeName string, blockNum uint64)

OnStoreCompletedUntilBlock is called to indicate that the given storeName has snapshots at the `storeSaveIntervals` up to `blockNum` here.

This should unlock all jobs that were dependent

func (*Scheduler) Schedule added in v0.0.21

func (s *Scheduler) Schedule(ctx context.Context, pool work.WorkerPool) (err error)

type StoreSquasher added in v0.0.14

type StoreSquasher struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewStoreSquasher added in v0.0.14

func NewStoreSquasher(
	initialStore *store.FullKV,
	targetExclusiveBlock,
	nextExpectedStartBlock uint64,
	storeSaveInterval uint64,
	onStoreCompletedUntilBlock func(storeName string, blockNum uint64),
) *StoreSquasher

func (*StoreSquasher) IsEmpty added in v0.0.14

func (s *StoreSquasher) IsEmpty() bool

func (*StoreSquasher) String added in v0.0.14

func (s *StoreSquasher) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL