array

package
v0.0.0-...-267b159 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const AwsBatchTaskType = "aws-batch"
View Source
const (
	ErrSystem errors.ErrorCode = "SYSTEM_ERROR"
)

Variables

This section is empty.

Functions

func AssembleFinalOutputs

func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tCtx pluginCore.TaskExecutionContext,
	terminalPhase arrayCore.Phase, terminalVersion uint32, state *arrayCore.State) (*arrayCore.State, error)

Assembles a single outputs.pb that contain all the outputs of the subtasks and write them to the final OutputWriter. This step can potentially be expensive (hence the metrics) and why it's offloaded to a background process.

func CatalogBitsetToLiteralCollection

func CatalogBitsetToLiteralCollection(catalogResults *bitarray.BitSet, size int) *idlCore.LiteralCollection

When an AWS Batch array job kicks off, it is given the index of the array job in an environment variable. The SDK will use this index to look up the real index of the job using the output of this function. That is, if there are five subtasks originally, but 0-2 are cached in Catalog, then an array job with two jobs will kick off. The first job will have an AWS supplied index of 0, which will resolve to 3 from this function, and the second will have an index of 1, which will resolve to 4. The size argument to this function is needed because the BitSet may create more bits (has a capacity) higher than the original requested amount. If you make a BitSet with 10 bits, it may create 64 in the background, so you need to keep track of how many were actually requested.

func CheckTaskOutput

func CheckTaskOutput(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, childIdx, originalIdx int) (core.Phase, error)

func ConstructCatalogReaderWorkItems

func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskReader, inputs []io.InputReader,
	outputs []io.OutputWriter) ([]catalog.DownloadRequest, error)

func ConstructCatalogUploadRequests

func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier,
	cacheVersion string, taskInterface idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet,
	inputReaders []io.InputReader, outputReaders []io.OutputReader) ([]catalog.UploadRequest, error)

func ConstructOutputReader

func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference,
	index int) (io.OutputReader, error)

func ConstructOutputReaders

func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference,
	size int) ([]io.OutputReader, error)

func ConstructOutputWriter

func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference,
	index int) (io.OutputWriter, error)

func ConstructOutputWriters

func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference,
	size int) ([]io.OutputWriter, error)

func ConstructRemoteFileInputReaders

func ConstructRemoteFileInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference,
	size int) ([]io.InputReader, error)

func ConstructStaticInputReaders

func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputLiterals map[string]*idlCore.Literal, arrayJobSize int) []io.InputReader

ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their inputs already populated.

func DetermineDiscoverability

func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContext, maxArrayJobSize int64, state *arrayCore.State) (
	*arrayCore.State, error)

DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location which is different than their original location. To find the original index we construct an indexLookup array. The subtask can find it's original index value in indexLookup[JOB_ARRAY_INDEX] where JOB_ARRAY_INDEX is an environment variable in the pod

func GetInputReader

func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader

func NewLiteralScalarOfInteger

func NewLiteralScalarOfInteger(number int64) *idlCore.Literal

func RunArrayTestsEndToEnd

func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceIteration)

func WriteToCatalog

func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, catalogClient catalog.AsyncClient,
	workItems []catalog.UploadRequest) (bool, error)

func WriteToDiscovery

func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State, phaseOnSuccess arrayCore.Phase, versionOnSuccess uint32) (*arrayCore.State, []*core.ExternalResource, error)

Types

type AdvanceIteration

type AdvanceIteration func(ctx context.Context, tCtx core.TaskExecutionContext) error

type OutputAssembler

type OutputAssembler struct {
	workqueue.IndexedWorkQueue
}

Represents an indexed work queue that aggregates outputs of sub tasks.

func NewErrorAssembler

func NewErrorAssembler(maxErrorMessageLength int, workQueueConfig workqueue.Config, scope promutils.Scope) (OutputAssembler, error)

func NewOutputAssembler

func NewOutputAssembler(workQueueConfig workqueue.Config, scope promutils.Scope) (OutputAssembler, error)

func (OutputAssembler) Queue

func (o OutputAssembler) Queue(ctx context.Context, id workqueue.WorkItemID, item *outputAssembleItem) error

type StaticInputReader

type StaticInputReader struct {
	io.InputFilePaths
	// contains filtered or unexported fields
}

StaticInputReader complies with the io.InputReader interface but has the input already populated.

func NewStaticInputReader

func NewStaticInputReader(inputPaths io.InputFilePaths, input *idlCore.LiteralMap) StaticInputReader

func (StaticInputReader) Get

Directories

Path Synopsis
This package deals with the communication with AWS-Batch and adopting its APIs to the nebula-plugin model.
This package deals with the communication with AWS-Batch and adopting its APIs to the nebula-plugin model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL