Documentation ¶
Overview ¶
Package ioutils contains utilities for interacting with the IO Layer of KozmoPropeller Metastore For example, utilities like reading inputs, writing outputs, computing output paths, prefixes. These helpers are intended to be used by KozmoPropeller and aim to reduce the burden of implementing simple io functions
Index ¶
- Constants
- func ConstructCheckpointPath(store storage.ReferenceConstructor, rawOutputPrefix storage.DataReference) storage.DataReference
- func GenerateAlphabet(b []rune) []rune
- func GenerateArabicNumerals(b []rune) []rune
- func GetIndexLookupPath(ctx context.Context, store storage.ReferenceConstructor, ...) (res storage.DataReference, err error)
- func GetTaskTemplatePath(ctx context.Context, store storage.ReferenceConstructor, ...) (storage.DataReference, error)
- func NewCachedInputReader(ctx context.Context, in io.InputReader) io.InputReader
- func NewDeterministicUniqueRawOutputPath(ctx context.Context, ...) (io.RawOutputPaths, error)
- func NewLazyUploadingTaskReader(baseTaskReader SimpleTaskReader, remotePath storage.DataReference, ...) pluginsCore.TaskReader
- func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths
- func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, ...) (io.RawOutputPaths, error)
- func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePath storage.DataReference, ...) (io.RawOutputPaths, error)
- func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, ...) (io.RawOutputPaths, error)
- type BufferedOutputWriter
- type InMemoryOutputReader
- func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error)
- func (r InMemoryOutputReader) Exists(_ context.Context) (bool, error)
- func (r InMemoryOutputReader) IsError(ctx context.Context) (bool, error)
- func (r InMemoryOutputReader) IsFile(_ context.Context) bool
- func (r InMemoryOutputReader) Read(_ context.Context) (*core.LiteralMap, *io.ExecutionError, error)
- func (r InMemoryOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error)
- type PrecomputedShardSelector
- type RemoteCheckpointPaths
- type RemoteFileInputReader
- type RemoteFileOutputPaths
- func (w RemoteFileOutputPaths) GetDeckPath() storage.DataReference
- func (w RemoteFileOutputPaths) GetErrorPath() storage.DataReference
- func (w RemoteFileOutputPaths) GetFuturesPath() storage.DataReference
- func (w RemoteFileOutputPaths) GetOutputPath() storage.DataReference
- func (w RemoteFileOutputPaths) GetOutputPrefixPath() storage.DataReference
- type RemoteFileOutputReader
- func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error)
- func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error)
- func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error)
- func (r RemoteFileOutputReader) IsFile(ctx context.Context) bool
- func (r RemoteFileOutputReader) Read(ctx context.Context) (*core.LiteralMap, *io.ExecutionError, error)
- func (r RemoteFileOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error)
- type RemoteFileOutputWriter
- type ShardSelector
- type SimpleInputFilePath
- type SimpleTaskReader
Constants ¶
const ( // InputsSuffix specifies the name of the file that contains the task inputs in the form core.LiteralMap InputsSuffix = "inputs.pb" // TaskTemplateSuffix In case a task requests for a task template, it is passed into the task using this filename. // The format is of type core.TaskTemplate TaskTemplateSuffix = "task.pb" // FuturesSuffix specifies that for dynamic workflows, the futures files is written with this filename/suffix. // The format is core.DynamicJobSpec FuturesSuffix = "futures.pb" // OutputsSuffix specifies that outputs are assumed to be written to this "file"/"suffix" under the given prefix // The outputs file has a format of core.LiteralMap OutputsSuffix = "outputs.pb" // ErrorsSuffix specifies that the errors are written to this prefix/file under the given prefix. The Error File // has a format of core.ErrorDocument ErrorsSuffix = "error.pb" IndexLookupSuffix = "indexlookup.pb" // CheckpointPrefix specifies the common prefix that can be used as a directory where all the checkpoint information // will be stored. This directory is under the raw output-prefix path CheckpointPrefix = "_kozmocheckpoints" )
const (
ErrFailedRead string = "READ_FAILED"
)
Variables ¶
This section is empty.
Functions ¶
func ConstructCheckpointPath ¶
func ConstructCheckpointPath(store storage.ReferenceConstructor, rawOutputPrefix storage.DataReference) storage.DataReference
ConstructCheckpointPath returns a checkpoint path under the given `base` / rawOutputPrefix, following the conventions of the store
func GenerateAlphabet ¶
Generates the entire latin alphabet and appends it to the passed in array and returns the new array
func GenerateArabicNumerals ¶
Generates all arabic numerals and appends to the passed in array and returns the new array/slice
func GetIndexLookupPath ¶
func GetIndexLookupPath(ctx context.Context, store storage.ReferenceConstructor, prefix storage.DataReference) (res storage.DataReference, err error)
GetIndexLookupPath returns the indexpath suffixed to IndexLookupSuffix
func GetTaskTemplatePath ¶
func GetTaskTemplatePath(ctx context.Context, store storage.ReferenceConstructor, base storage.DataReference) (storage.DataReference, error)
GetTaskTemplatePath returns a protobuf file path where TaskTemplate is stored
func NewCachedInputReader ¶
func NewCachedInputReader(ctx context.Context, in io.InputReader) io.InputReader
Creates a new Read-through cached Input Reader. the returned reader is not thread-safe It caches the inputs on a successful read from the underlying input reader
func NewDeterministicUniqueRawOutputPath ¶
func NewDeterministicUniqueRawOutputPath(ctx context.Context, rawOutputPrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error)
Constructs an output path that is deterministic and unique within the given outputPrefix. No sharding is performed
func NewLazyUploadingTaskReader ¶
func NewLazyUploadingTaskReader(baseTaskReader SimpleTaskReader, remotePath storage.DataReference, store storage.ProtobufStore) pluginsCore.TaskReader
NewLazyUploadingTaskReader decorates an existing TaskReader and adds a functionality to allow lazily uploading the task template to a remote location, only when the location information is accessed
func NewRawOutputPaths ¶
func NewRawOutputPaths(_ context.Context, rawOutputPrefix storage.DataReference) io.RawOutputPaths
A simple Output sandbox at a given path
func NewShardedDeterministicRawOutputPath ¶
func NewShardedDeterministicRawOutputPath(ctx context.Context, sharder ShardSelector, basePrefix, outputMetadataPrefix storage.DataReference, store storage.ReferenceConstructor) (io.RawOutputPaths, error)
Creates a deterministic RawOutputPath whose path is distributed based on the ShardSelector passed in. Determinism depends on the outputMetadataPath Potential performance problem, as creating a new RawPath creation may be expensive as it hashes the outputMetadataPath the final RawOutputPath is created in the shard selected by the sharder at the basePath and then appended by a hashed value of the outputMetadata
func NewShardedRawOutputPath ¶
func NewShardedRawOutputPath(ctx context.Context, sharder ShardSelector, basePath storage.DataReference, uniqueID string, store storage.ReferenceConstructor) (io.RawOutputPaths, error)
Creates an OutputSandbox in the basePath using the uniqueID and a sharder This implementation is faster than the Randomized strategy
func NewTaskIDRawOutputPath ¶
func NewTaskIDRawOutputPath(ctx context.Context, rawOutputPrefix storage.DataReference, taskID *core2.TaskExecutionIdentifier, store storage.ReferenceConstructor) (io.RawOutputPaths, error)
Generates a RawOutput Path that looks like the TaskExecutionID and can be easily cross referenced with Kozmo generated TaskExecution ID
Types ¶
type BufferedOutputWriter ¶
type BufferedOutputWriter struct { io.OutputFilePaths // contains filtered or unexported fields }
A Buffered outputWriter just records the io.OutputReader and can be accessed using special methods.
func NewBufferedOutputWriter ¶
func NewBufferedOutputWriter(ctx context.Context, paths io.OutputFilePaths) *BufferedOutputWriter
Returns a new object of type BufferedOutputWriter
func (*BufferedOutputWriter) GetReader ¶
func (o *BufferedOutputWriter) GetReader() io.OutputReader
func (*BufferedOutputWriter) Put ¶
func (o *BufferedOutputWriter) Put(ctx context.Context, reader io.OutputReader) error
type InMemoryOutputReader ¶
type InMemoryOutputReader struct { DeckPath *storage.DataReference // contains filtered or unexported fields }
func NewInMemoryOutputReader ¶
func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader
func (InMemoryOutputReader) DeckExists ¶
func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error)
func (InMemoryOutputReader) Exists ¶
func (r InMemoryOutputReader) Exists(_ context.Context) (bool, error)
func (InMemoryOutputReader) IsError ¶
func (r InMemoryOutputReader) IsError(ctx context.Context) (bool, error)
func (InMemoryOutputReader) Read ¶
func (r InMemoryOutputReader) Read(_ context.Context) (*core.LiteralMap, *io.ExecutionError, error)
func (InMemoryOutputReader) ReadError ¶
func (r InMemoryOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error)
type PrecomputedShardSelector ¶
type PrecomputedShardSelector struct {
// contains filtered or unexported fields
}
this sharder distributes data into one of the precomputed buckets. The bucket is deterministically determined given the input s
func (*PrecomputedShardSelector) GetShardPrefix ¶
Generates deterministic shard id for the given string s
type RemoteCheckpointPaths ¶
type RemoteCheckpointPaths struct { RemoteFileOutputPaths // contains filtered or unexported fields }
RemoteCheckpointPaths implements the CheckpointPaths Interface and adds on top of the OutputFilePaths Interface
func NewCheckpointRemoteFilePaths ¶
func NewCheckpointRemoteFilePaths(ctx context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.RawOutputPaths, previousCheckpointPath storage.DataReference) RemoteCheckpointPaths
NewCheckpointRemoteFilePaths returns a new object constructed with an optional previousCheckpointPath and derives a new checkpointPath from the outputPrefix
func NewReadOnlyOutputFilePaths ¶
func NewReadOnlyOutputFilePaths(ctx context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference) RemoteCheckpointPaths
NewReadOnlyOutputFilePaths can be used when data is only to be read from an existing remote location
func (RemoteCheckpointPaths) GetCheckpointPrefix ¶
func (r RemoteCheckpointPaths) GetCheckpointPrefix() storage.DataReference
GetCheckpointPrefix returns a new checkpoint path under the raw output prefix.
func (RemoteCheckpointPaths) GetPreviousCheckpointsPrefix ¶
func (r RemoteCheckpointPaths) GetPreviousCheckpointsPrefix() storage.DataReference
GetPreviousCheckpointsPrefix returns the Prefix path for checkpoints for the previous attempt, or "" if this is the first attempt
type RemoteFileInputReader ¶
type RemoteFileInputReader struct { io.InputFilePaths // contains filtered or unexported fields }
func NewRemoteFileInputReader ¶
func NewRemoteFileInputReader(_ context.Context, store storage.ProtobufStore, inputPaths io.InputFilePaths) RemoteFileInputReader
func (RemoteFileInputReader) Get ¶
func (r RemoteFileInputReader) Get(ctx context.Context) (*core.LiteralMap, error)
type RemoteFileOutputPaths ¶
type RemoteFileOutputPaths struct { // Arbitrary supplied of the RawOutputPath io.RawOutputPaths // contains filtered or unexported fields }
RemoteFileOutputPaths records all metadata output paths / keys on a remote storage system, e.g. S3 / GCS or any other key-value store. Theoretically if the storage.DataReference can support BigTable, this will work with it.
func NewRemoteFileOutputPaths ¶
func NewRemoteFileOutputPaths(_ context.Context, store storage.ReferenceConstructor, outputPrefix storage.DataReference, sandbox io.RawOutputPaths) RemoteFileOutputPaths
NewRemoteFileOutputPaths returns a RemoteFileOutputPaths object, where all the paths are configured using the given outputPrefix and constructed using the storage.ReferenceConstructor
func (RemoteFileOutputPaths) GetDeckPath ¶
func (w RemoteFileOutputPaths) GetDeckPath() storage.DataReference
func (RemoteFileOutputPaths) GetErrorPath ¶
func (w RemoteFileOutputPaths) GetErrorPath() storage.DataReference
func (RemoteFileOutputPaths) GetFuturesPath ¶
func (w RemoteFileOutputPaths) GetFuturesPath() storage.DataReference
func (RemoteFileOutputPaths) GetOutputPath ¶
func (w RemoteFileOutputPaths) GetOutputPath() storage.DataReference
func (RemoteFileOutputPaths) GetOutputPrefixPath ¶
func (w RemoteFileOutputPaths) GetOutputPrefixPath() storage.DataReference
type RemoteFileOutputReader ¶
type RemoteFileOutputReader struct {
// contains filtered or unexported fields
}
func NewRemoteFileOutputReader ¶
func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader
func (RemoteFileOutputReader) DeckExists ¶
func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error)
func (RemoteFileOutputReader) Exists ¶
func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error)
func (RemoteFileOutputReader) IsError ¶
func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error)
func (RemoteFileOutputReader) IsFile ¶
func (r RemoteFileOutputReader) IsFile(ctx context.Context) bool
func (RemoteFileOutputReader) Read ¶
func (r RemoteFileOutputReader) Read(ctx context.Context) (*core.LiteralMap, *io.ExecutionError, error)
func (RemoteFileOutputReader) ReadError ¶
func (r RemoteFileOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error)
type RemoteFileOutputWriter ¶
type RemoteFileOutputWriter struct { io.OutputFilePaths // contains filtered or unexported fields }
RemoteFileOutputWriter adds storage Write APIs to output paths / keys. In retrospect, the `path` should be generally replaced with keys
func NewRemoteFileOutputWriter ¶
func NewRemoteFileOutputWriter(_ context.Context, store storage.ProtobufStore, outputFilePaths io.OutputFilePaths) RemoteFileOutputWriter
NewRemoteFileOutputWriter returns a writer that records all outputs to remote files / objects. Given outputs, it will automatically write it to the outputFile / key that is configured.
func (RemoteFileOutputWriter) Put ¶
func (w RemoteFileOutputWriter) Put(ctx context.Context, reader io.OutputReader) error
type ShardSelector ¶
This interface allows shard selection for OutputSandbox.
func NewBase36PrefixShardSelector ¶
func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error)
Creates a PrecomputedShardSelector with 36*36 unique shards. Each shard is of the format {[0-9a-z][0-9a-z]}, i.e. 2 character long.
func NewConstantShardSelector ¶
func NewConstantShardSelector(shards []string) ShardSelector
uses the given shards to select a shard
type SimpleInputFilePath ¶
type SimpleInputFilePath struct {
// contains filtered or unexported fields
}
func NewInputFilePaths ¶
func NewInputFilePaths(_ context.Context, store storage.ReferenceConstructor, inputPathPrefix storage.DataReference) SimpleInputFilePath
func (SimpleInputFilePath) GetInputPath ¶
func (s SimpleInputFilePath) GetInputPath() storage.DataReference
func (SimpleInputFilePath) GetInputPrefixPath ¶
func (s SimpleInputFilePath) GetInputPrefixPath() storage.DataReference
type SimpleTaskReader ¶
type SimpleTaskReader interface {
Read(ctx context.Context) (*core.TaskTemplate, error)
}
SimpleTaskReader provides only the TaskReader interface. This is created to conveniently use the uploading taskreader interface