Documentation ¶
Index ¶
- Constants
- func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, ...) (*arrayCore.State, error)
- func CatalogBitsetToLiteralCollection(catalogResults *bitarray.BitSet, size int) *idlCore.LiteralCollection
- func CheckTaskOutput(ctx context.Context, dataStore *storage.DataStore, ...) (core.Phase, error)
- func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskReader, inputs []io.InputReader, ...) ([]catalog.DownloadRequest, error)
- func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier, ...) ([]catalog.UploadRequest, error)
- func ConstructInputReaders(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.InputReader, error)
- func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, ...) (io.OutputReader, error)
- func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.OutputReader, error)
- func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, ...) (io.OutputWriter, error)
- func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.OutputWriter, error)
- func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State) (*arrayCore.State, error)
- func NewLiteralScalarOfInteger(number int64) *idlCore.Literal
- func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceIteration)
- func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, ...) (bool, error)
- func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State, ...) (*arrayCore.State, error)
- type AdvanceIteration
- type OutputAssembler
Constants ¶
const (
ErrSystem errors.ErrorCode = "SYSTEM_ERROR"
)
Variables ¶
This section is empty.
Functions ¶
func AssembleFinalOutputs ¶
func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tCtx pluginCore.TaskExecutionContext, terminalPhase arrayCore.Phase, state *arrayCore.State) (*arrayCore.State, error)
Assembles a single outputs.pb that contain all the outputs of the subtasks and write them to the final OutputWriter. This step can potentially be expensive (hence the metrics) and why it's offloaded to a background process.
func CatalogBitsetToLiteralCollection ¶
func CatalogBitsetToLiteralCollection(catalogResults *bitarray.BitSet, size int) *idlCore.LiteralCollection
When an AWS Batch array job kicks off, it is given the index of the array job in an environment variable. The SDK will use this index to look up the real index of the job using the output of this function. That is, if there are five subtasks originally, but 0-2 are cached in Catalog, then an array job with two jobs will kick off. The first job will have an AWS supplied index of 0, which will resolve to 3 from this function, and the second will have an index of 1, which will resolve to 4. The size argument to this function is needed because the BitSet may create more bits (has a capacity) higher than the original requested amount. If you make a BitSet with 10 bits, it may create 64 in the background, so you need to keep track of how many were actually requested.
func CheckTaskOutput ¶
func ConstructCatalogReaderWorkItems ¶
func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskReader, inputs []io.InputReader, outputs []io.OutputWriter) ([]catalog.DownloadRequest, error)
func ConstructCatalogUploadRequests ¶
func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier, cacheVersion string, taskInterface idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet, inputReaders []io.InputReader, outputReaders []io.OutputReader) ([]catalog.UploadRequest, error)
func ConstructInputReaders ¶
func ConstructInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference, size int) ([]io.InputReader, error)
func ConstructOutputReader ¶
func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, index int) (io.OutputReader, error)
func ConstructOutputReaders ¶
func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputReader, error)
func ConstructOutputWriter ¶
func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference, index int) (io.OutputWriter, error)
func ConstructOutputWriters ¶
func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputWriter, error)
func DetermineDiscoverability ¶
func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State) ( *arrayCore.State, error)
Check if there are any previously cached tasks. If there are we will only submit an ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location which is different than their original location. To find the original index we construct an indexLookup array. The subtask can find it's original index value in indexLookup[JOB_ARRAY_INDEX] where JOB_ARRAY_INDEX is an environment variable in the pod
func RunArrayTestsEndToEnd ¶
func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceIteration)
func WriteToCatalog ¶
func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, catalogClient catalog.AsyncClient, workItems []catalog.UploadRequest) (bool, error)
Types ¶
type AdvanceIteration ¶
type AdvanceIteration func(ctx context.Context, tCtx core.TaskExecutionContext) error
type OutputAssembler ¶
type OutputAssembler struct {
workqueue.IndexedWorkQueue
}
Represents an indexed work queue that aggregates outputs of sub tasks.
func NewErrorAssembler ¶
func NewOutputAssembler ¶
func (OutputAssembler) Queue ¶
func (o OutputAssembler) Queue(ctx context.Context, id workqueue.WorkItemID, item *outputAssembleItem) error
Directories ¶
Path | Synopsis |
---|---|
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.
|
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model. |