Documentation ¶
Index ¶
- Constants
- func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, ...) (*arrayCore.State, error)
- func CatalogBitsetToLiteralCollection(catalogResults *bitarray.BitSet, size int) *idlCore.LiteralCollection
- func CheckTaskOutput(ctx context.Context, dataStore *storage.DataStore, ...) (core.Phase, error)
- func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskReader, inputs []io.InputReader, ...) ([]catalog.DownloadRequest, error)
- func ConstructCatalogUploadRequests(keyID *idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier, ...) ([]catalog.UploadRequest, error)
- func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, ...) (io.OutputReader, error)
- func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.OutputReader, error)
- func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, ...) (io.OutputWriter, error)
- func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.OutputWriter, error)
- func ConstructRemoteFileInputReaders(ctx context.Context, dataStore *storage.DataStore, ...) ([]io.InputReader, error)
- func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputLiterals map[string]*idlCore.Literal, ...) []io.InputReader
- func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContext, maxArrayJobSize int64, ...) (*arrayCore.State, error)
- func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader
- func NewLiteralScalarOfInteger(number int64) *idlCore.Literal
- func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceIteration)
- func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, ...) (bool, error)
- func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State, ...) (*arrayCore.State, []*core.ExternalResource, error)
- type AdvanceIteration
- type OutputAssembler
- type StaticInputReader
Constants ¶
const AwsBatchTaskType = "aws-batch"
const (
ErrSystem errors.ErrorCode = "SYSTEM_ERROR"
)
Variables ¶
This section is empty.
Functions ¶
func AssembleFinalOutputs ¶
func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tCtx pluginCore.TaskExecutionContext, terminalPhase arrayCore.Phase, terminalVersion uint32, state *arrayCore.State) (*arrayCore.State, error)
Assembles a single outputs.pb that contain all the outputs of the subtasks and write them to the final OutputWriter. This step can potentially be expensive (hence the metrics) and why it's offloaded to a background process.
func CatalogBitsetToLiteralCollection ¶
func CatalogBitsetToLiteralCollection(catalogResults *bitarray.BitSet, size int) *idlCore.LiteralCollection
When an AWS Batch array job kicks off, it is given the index of the array job in an environment variable. The SDK will use this index to look up the real index of the job using the output of this function. That is, if there are five subtasks originally, but 0-2 are cached in Catalog, then an array job with two jobs will kick off. The first job will have an AWS supplied index of 0, which will resolve to 3 from this function, and the second will have an index of 1, which will resolve to 4. The size argument to this function is needed because the BitSet may create more bits (has a capacity) higher than the original requested amount. If you make a BitSet with 10 bits, it may create 64 in the background, so you need to keep track of how many were actually requested.
func CheckTaskOutput ¶
func ConstructCatalogReaderWorkItems ¶
func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskReader, inputs []io.InputReader, outputs []io.OutputWriter) ([]catalog.DownloadRequest, error)
func ConstructCatalogUploadRequests ¶
func ConstructCatalogUploadRequests(keyID *idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier, cacheVersion string, cacheIgnoreInputVars []string, taskInterface *idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet, inputReaders []io.InputReader, outputReaders []io.OutputReader) ([]catalog.UploadRequest, error)
func ConstructOutputReader ¶
func ConstructOutputReader(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, index int) (io.OutputReader, error)
func ConstructOutputReaders ¶
func ConstructOutputReaders(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputReader, error)
func ConstructOutputWriter ¶
func ConstructOutputWriter(ctx context.Context, dataStore *storage.DataStore, outputPrefix, outputSandbox storage.DataReference, index int) (io.OutputWriter, error)
func ConstructOutputWriters ¶
func ConstructOutputWriters(ctx context.Context, dataStore *storage.DataStore, outputPrefix, baseOutputSandbox storage.DataReference, size int) ([]io.OutputWriter, error)
func ConstructRemoteFileInputReaders ¶
func ConstructRemoteFileInputReaders(ctx context.Context, dataStore *storage.DataStore, inputPrefix storage.DataReference, size int) ([]io.InputReader, error)
func ConstructStaticInputReaders ¶
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputLiterals map[string]*idlCore.Literal, arrayJobSize int) []io.InputReader
ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their inputs already populated.
func DetermineDiscoverability ¶
func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContext, maxArrayJobSize int64, state *arrayCore.State) ( *arrayCore.State, error)
DetermineDiscoverability checks if there are any previously cached tasks. If there are we will only submit an ArrayJob for the non-cached tasks. The ArrayJob is now a different size, and each task will get a new index location which is different than their original location. To find the original index we construct an indexLookup array. The subtask can find it's original index value in indexLookup[JOB_ARRAY_INDEX] where JOB_ARRAY_INDEX is an environment variable in the pod
func GetInputReader ¶
func GetInputReader(tCtx core.TaskExecutionContext, taskTemplate *idlCore.TaskTemplate) io.InputReader
func RunArrayTestsEndToEnd ¶
func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceIteration)
func WriteToCatalog ¶
func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, catalogClient catalog.AsyncClient, workItems []catalog.UploadRequest) (bool, error)
Types ¶
type AdvanceIteration ¶
type AdvanceIteration func(ctx context.Context, tCtx core.TaskExecutionContext) error
type OutputAssembler ¶
type OutputAssembler struct {
workqueue.IndexedWorkQueue
}
Represents an indexed work queue that aggregates outputs of sub tasks.
func NewErrorAssembler ¶
func NewOutputAssembler ¶
func (OutputAssembler) Queue ¶
func (o OutputAssembler) Queue(ctx context.Context, id workqueue.WorkItemID, item *outputAssembleItem) error
type StaticInputReader ¶
type StaticInputReader struct { io.InputFilePaths // contains filtered or unexported fields }
StaticInputReader complies with the io.InputReader interface but has the input already populated.
func NewStaticInputReader ¶
func NewStaticInputReader(inputPaths io.InputFilePaths, input *idlCore.LiteralMap) StaticInputReader
func (StaticInputReader) Get ¶
func (i StaticInputReader) Get(_ context.Context) (*idlCore.LiteralMap, error)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.
|
This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model. |