Documentation
¶
Index ¶
- func CacheProbe(key string) bool
- func ClearObjStore() (err kv.Error)
- func ExtractMergeDoc(x1, x2 interface{}) (results string, err kv.Error)
- func GetCancelWrapper(cancel context.CancelFunc, msg string, logger *log.Logger) context.CancelFunc
- func GetHitsMisses(hash string) (hits int, misses int)
- func GetQueuePatterns() (matcher *regexp.Regexp, mismatcher *regexp.Regexp)
- func GoGetConst(dir string, constName string) (v [][]string, err kv.Error)
- func InitObjStore(ctx context.Context, backing string, size int64, removedC chan os.FileInfo, ...) (err kv.Error)
- func InitQueueMatcher(ctx context.Context, namespace string, mapname string, logger *log.Logger) (err []kv.Error)
- func IsInTest() (isTest bool)
- func JSONEditor(oldDoc string, directives []string) (result string, err kv.Error)
- func MergeExperiment(x1, x2 interface{}) (interface{}, kv.Error)
- func NewLocalStorage() (s *localStorage, err kv.Error)
- func NewObjStore(ctx context.Context, spec *StoreOpts, errorC chan kv.Error) (oStore *objStore, err kv.Error)
- func ObjStoreFootPrint() (max int64)
- func ParseBytes(val string) (bytes uint64, err error)
- func Reverse(in string) (reversed string)
- func RunScript(ctx context.Context, scriptPath string, output *os.File, tmpDir string, ...) (err kv.Error)
- func ServiceVirtualEnvCache(ctx context.Context)
- func SetVEnvCacheExpirationPeriod(period time.Duration)
- type ArtifactCache
- func (cache *ArtifactCache) Close()
- func (cache *ArtifactCache) Fetch(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (size int64, warns []kv.Error, err kv.Error)
- func (cache *ArtifactCache) Hash(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (hash string, err kv.Error)
- func (cache *ArtifactCache) Local(group string, dir string, file string) (fn string, err kv.Error)
- func (cache *ArtifactCache) Restore(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (uploaded bool, warns []kv.Error, err kv.Error)
- type Backoffs
- type BufferedWriter
- type EnvSubstituter
- type LocalQueue
- func (fq *LocalQueue) Exists(ctx context.Context, subscription string) (exists bool, err kv.Error)
- func (fq *LocalQueue) Get(subscription string) (Msg []byte, MsgID string, err kv.Error)
- func (fq *LocalQueue) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)
- func (fq *LocalQueue) GetQueuesRefreshInterval() time.Duration
- func (fq *LocalQueue) GetShortQName(qt *task.QueueTask) (shortName string, err kv.Error)
- func (fq *LocalQueue) GetWorkCheckInterval() time.Duration
- func (fq *LocalQueue) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)
- func (fq *LocalQueue) IsEncrypted() (encrypted bool)
- func (fq *LocalQueue) Publish(queueName string, contentType string, msg []byte, allow_create bool) (err kv.Error)
- func (fq *LocalQueue) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
- func (fq *LocalQueue) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan string, err kv.Error)
- func (fq *LocalQueue) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
- type LogFilterer
- type LogOutputProvider
- type ObjDownloader
- type ObjDownloaderFactory
- type OutputFilter
- type OutputWriter
- type Report
- type Storage
- type StoreOpts
- type TimeEMA
- type Trigger
- type VirtualEnv
- type VirtualEnvCache
- type VirtualEnvEntry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CacheProbe ¶
CacheProbe can be used to test the validity of the cache for a previously cached item.
func ClearObjStore ¶
ClearObjStore can be used by clients to erase the contents of the object store cache
func ExtractMergeDoc ¶
ExtractMergeDoc uses two JSON-marshalable values x1 and x2 performing a merge and returns the results
func GetCancelWrapper ¶
func GetCancelWrapper(cancel context.CancelFunc, msg string, logger *log.Logger) context.CancelFunc
GetCancelWrapper will provide cancel function with some additional tracing capabilities for debugging.
func GetHitsMisses ¶
func GoGetConst ¶
GoGetConst will retrieve data structures from source code within the code directories that can contain useful information to utilities visiting the code for testing purposes. It is used mainly to retrieve command line parameters used during testing that packages contain so that when tests are run by external application neutral software the code under test can parameterize itself.
func InitObjStore ¶
func InitObjStore(ctx context.Context, backing string, size int64, removedC chan os.FileInfo, errorC chan kv.Error) (err kv.Error)
InitObjStore sets up the backing store for our object store cache. The size specified can be any byte amount.
The triggerC channel is functional when the err value is nil, this channel can be used to manually trigger the disk caching sub system
func InitQueueMatcher ¶
func IsInTest ¶
func IsInTest() (isTest bool)
IsInTest will examine the OS arguments passed into the software being run to detect if the go test framework is present. If varies from the cudaInTest approach in that it will work if the tests were enabled in another module
func JSONEditor ¶
JSONEditor will accept a source JSON document and an array of change edits for the source document and will process them as either RFC7386, or RFC6902 edits if they validate as either.
func MergeExperiment ¶
MergeExperiment merges the two JSON-marshalable values x1 and x2, preferring x1 over x2 except where x1 and x2 are JSON objects, in which case the keys from both objects are included and their values merged recursively.
It returns an error if x1 or x2 cannot be JSON-marshaled.
func NewLocalStorage ¶
NewLocalStorage is used to allocate and initialize a struct that acts as a receiver
func NewObjStore ¶
func NewObjStore(ctx context.Context, spec *StoreOpts, errorC chan kv.Error) (oStore *objStore, err kv.Error)
NewObjStore is used to instantiate an object store for the running that includes a cache
func ObjStoreFootPrint ¶
func ObjStoreFootPrint() (max int64)
ObjStoreFootPrint can be used to determine what the current footprint of the artifact cache is
func ParseBytes ¶
ParseBytes returns a value for the input string.
This function uses the humanize library from github for go.
Typical inputs can include by way of examples '6gb', '6 GB', '6 GiB'. Inputs support SI and IEC sizes. For more information please review https://github.com/dustin/go-humanize/blob/master/bytes.go
func RunScript ¶
func RunScript(ctx context.Context, scriptPath string, output *os.File, tmpDir string, runKey string, logger *log.Logger) (err kv.Error)
Run will use a generated script file and will run it to completion while marshalling results and files from the computation. Run is a blocking call and will only return upon completion or termination of the process it starts.
func ServiceVirtualEnvCache ¶
Types ¶
type ArtifactCache ¶
type ArtifactCache struct { sync.Mutex // This can be used by the application layer to receive diagnostic and other information // about kv.occurring inside the caching tracker etc and surface these kv.etc to // the logging system ErrorC chan kv.Error // contains filtered or unexported fields }
ArtifactCache is used to encapsulate and store hashes, typically file hashes, and prevent duplicated uploads from occurring needlessly
func NewArtifactCache ¶
func NewArtifactCache() (cache *ArtifactCache)
NewArtifactCache initializes a hash tracker for artifact related files and passes it back to the caller. The tracking structure can be used to track files that already been downloaded / uploaded and also includes a channel that can be used to receive error notifications
func (*ArtifactCache) Close ¶
func (cache *ArtifactCache) Close()
Close will clean up the cache of hashes and close the error reporting channel associated with the cache tracker
func (*ArtifactCache) Fetch ¶
func (cache *ArtifactCache) Fetch(ctx context.Context, art *request.Artifact, projectId string, group string, maxBytes int64, env map[string]string, dir string) (size int64, warns []kv.Error, err kv.Error)
Fetch can be used to retrieve an artifact from a storage layer implementation, while passing through the lens of a caching filter that prevents unneeded downloads.
func (*ArtifactCache) Hash ¶
func (cache *ArtifactCache) Hash(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (hash string, err kv.Error)
Hash is used to obtain the hash of an artifact from the backing store implementation being used by the storage implementation
func (*ArtifactCache) Local ¶
Local returns the local disk based file name for the artifacts expanded archive files
func (*ArtifactCache) Restore ¶
func (cache *ArtifactCache) Restore(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (uploaded bool, warns []kv.Error, err kv.Error)
Restore the artifacts that have been marked mutable and that have changed
type Backoffs ¶
type Backoffs struct {
// contains filtered or unexported fields
}
Backoffs uses a cache with TTL on the cache items to maintain a set of blocking directive for resources, where the cache expiry time is the applicable time for the blocker
func GetBackoffs ¶
func GetBackoffs() (b *Backoffs)
GetBackoffs retrieves a reference to a singleton of the Backoffs structure
type BufferedWriter ¶
type BufferedWriter struct {
// contains filtered or unexported fields
}
func (*BufferedWriter) Close ¶
func (wr *BufferedWriter) Close() error
type EnvSubstituter ¶
type EnvSubstituter struct {
// contains filtered or unexported fields
}
type LocalQueue ¶
type LocalQueue struct { RootDir string // full file path to root queues "server" directory // contains filtered or unexported fields }
LocalQueue "project" is basically a local root directory containing queues sub-directories.
func NewLocalQueue ¶
func (*LocalQueue) Exists ¶
Exists will check that file queue named "subscription" does exist as sub-directory under root "server" directory.
func (*LocalQueue) GetQueuesRefreshInterval ¶
func (fq *LocalQueue) GetQueuesRefreshInterval() time.Duration
func (*LocalQueue) GetShortQName ¶
GetShortQName GetShortQueueName is useful for storing queue specific information in collections etc
func (*LocalQueue) GetWorkCheckInterval ¶
func (fq *LocalQueue) GetWorkCheckInterval() time.Duration
func (*LocalQueue) HasWork ¶
func (fq *LocalQueue) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)
HasWork will look at the local file queue to see if there is any pending work. The function is called in an attempt to see if there is any point in processing new work without a lot of overhead.
func (*LocalQueue) IsEncrypted ¶
func (fq *LocalQueue) IsEncrypted() (encrypted bool)
func (*LocalQueue) Refresh ¶
func (fq *LocalQueue) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
Refresh will examine the local file queues "server" and extract a list of the queues that relate to StudioML work.
func (*LocalQueue) Responder ¶
func (fq *LocalQueue) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan string, err kv.Error)
Responder is used to open a connection to an existing response queue if one was made available and also to provision a channel into which the runner can place report messages
func (*LocalQueue) Work ¶
func (fq *LocalQueue) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
Work will connect to the FileQueue "server" identified in the receiver, fq, and will see if any work can be found on the queue identified by the go runner subscription and present work to the handler for processing
type LogFilterer ¶
type LogFilterer struct {
// contains filtered or unexported fields
}
func (*LogFilterer) Filter ¶
func (lf *LogFilterer) Filter(input []byte) []byte
type LogOutputProvider ¶
func GetFilteredOutputWriter ¶
func GetFilteredOutputWriter(externOut *os.File, logger *log.Logger, filter OutputFilter) LogOutputProvider
type ObjDownloader ¶
type ObjDownloaderFactory ¶
var (
DownloaderFactory ObjDownloaderFactory
)
func (*ObjDownloaderFactory) GetDownloader ¶
func (*ObjDownloaderFactory) RemoveDownloader ¶
func (f *ObjDownloaderFactory) RemoveDownloader(key string)
func (*ObjDownloaderFactory) SetBackingDir ¶
func (f *ObjDownloaderFactory) SetBackingDir(dir string)
type OutputFilter ¶
func GetLogFilterer ¶
func GetLogFilterer(logger *log.Logger) OutputFilter
type OutputWriter ¶
func (*OutputWriter) Close ¶
func (or *OutputWriter) Close() error
func (*OutputWriter) GetWriters ¶
func (or *OutputWriter) GetWriters() (io.Writer, io.Writer)
type Storage ¶
type Storage interface { // Gather will retrieve contents of the named storage object using a prefix treating any items retrieved as individual files, invokes Fetch // Gather(ctx context.Context, keyPrefix string, outputDir string, maxBytes int64, tap io.Writer, failFast bool) (size int64, warnings []kv.Error, err kv.Error) // Fetch will retrieve contents of the named storage object and optionally unpack it into the // user specified output directory // Fetch(ctx context.Context, name string, unpack bool, output string, maxBytes int64, tap io.Writer) (size int64, warnings []kv.Error, err kv.Error) // Deposit is a directory archive and upload, deduplication is implemented outside of this interface // Deposit(ctx context.Context, src string, dest string) (warnings []kv.Error, err kv.Error) // Hash can be used to retrieve the hash of the contents of the file. The hash is // retrieved not computed and so is a lightweight operation common to both S3 and Google Storage. // The hash on some storage platforms is not a plain MD5 but uses multiple hashes from file // segments to increase the speed of hashing and also to reflect the multipart download // processing that was used for the file, for a full explanation please see // https://stackoverflow.com/questions/12186993/what-is-the-algorithm-to-compute-the-amazon-s3-etag-for-a-file-larger-than-5gb // Hash(ctx context.Context, name string) (hash string, err kv.Error) Close() }
Storage defines an interface for implementations of a studioml artifact store
type StoreOpts ¶
type StoreOpts struct { Art *request.Artifact ProjectID string Group string Env map[string]string Validate bool }
StoreOpts is used to encapsulate a storage implementation with the runner and studioml data needed
type TimeEMA ¶
TimeEMA is used to store exponential moving averages for a time duration
func NewTimeEMA ¶
NewTimeEMA creates a new exponential moving average of a time duration for a set of time windows with an initial execution time duration set
func (*TimeEMA) Get ¶
Get retrieves a single time duration moving average for a specified window of time
type Trigger ¶
type Trigger struct { T <-chan struct{} C chan time.Time // contains filtered or unexported fields }
Trigger is a data structure that encapsulates a timer and a channel which together are used to in turn to send messages to a downstream go channel. The main Trigger use case is to allow a regular action to be scheduled via a timer and also to allow unit tests for example to activate the same action.
func NewTrigger ¶
NewTrigger accepts a timer and a channel that together can be used to send messages into a channel that is encapsulated within the returned t data structure
type VirtualEnv ¶
type VirtualEnv struct { Request *request.Request Script string // contains filtered or unexported fields }
VirtualEnv encapsulated the context that a python virtual environment is to be instantiated from including items such as the list of pip installables that should be loaded and shell script to run.
func NewVirtualEnv ¶
func NewVirtualEnv(rqst *request.Request, dir string, uniqueID string, logger *log.Logger) (env *VirtualEnv, err kv.Error)
NewVirtualEnv builds the VirtualEnv data structure from data received across the wire from a studioml client.
func (*VirtualEnv) Close ¶
func (*VirtualEnv) Close() (err kv.Error)
Close is used to close any resources which the encapsulated VirtualEnv may have consumed.
func (*VirtualEnv) Make ¶
func (p *VirtualEnv) Make(ctx context.Context, alloc *resources.Allocated, e interface{}) (err kv.Error, evalDone bool)
Make is used to write a script file that is generated for the specific TF tasks studioml has sent. It also receives Python virtual environment ID for environment to be used for running given evaluation task.
func (*VirtualEnv) Run ¶
Run will use a generated script file and will run it to completion while marshalling results and files from the computation. Run is a blocking call and will only return upon completion or termination of the process it starts. Run is called by the processor runScript receiver.
type VirtualEnvCache ¶
type VirtualEnvEntry ¶
Source Files
¶
- artifacts.go
- backoffs.go
- context_helper.go
- execoutput.go
- execscript.go
- executionEMA.go
- experiment_json.go
- filteroutput.go
- go.go
- localfilequeue.go
- localstorage.go
- objectdownloader.go
- objectstore.go
- pythonenv.go
- pythonenvcache.go
- queuematcher.go
- runnerreports.go
- storage.go
- strings.go
- trigger.go
- units.go