Documentation ¶
Overview ¶
Package reflow implements the core data structures and (abstract) runtime for Reflow.
Reflow is a system for distributed program execution. The programs are described by Flows, which are an abstract specification of the program's execution. Each Flow node can take any number of other Flows as dependent inputs and perform some (local) execution over these inputs in order to compute some output value.
Reflow supports a limited form of dynamic dependencies: a Flow may evaluate to a list of values, each of which may be executed independently. This mechanism also provides parallelism.
The system orchestrates Flow execution by evaluating the flow in the manner of an abstract syntax tree; see Eval for more details.
Index ¶
- Variables
- func WithBackground(ctx context.Context, wg WaitGroup) (context.Context, context.CancelFunc)
- type Arg
- type Cache
- type CacheMode
- type Config
- type Context
- type Eval
- func (e *Eval) CacheWrite(ctx context.Context, f *Flow, repo Repository) error
- func (e *Eval) Do(ctx context.Context) error
- func (e *Eval) Err() error
- func (e *Eval) Flow() *Flow
- func (e *Eval) LogFlow(ctx context.Context, f *Flow)
- func (e *Eval) LogSummary(log *log.Logger)
- func (e *Eval) Mutate(f *Flow, muts ...interface{})
- func (e *Eval) Need() Requirements
- func (e *Eval) Requirements() Requirements
- func (e *Eval) Stealer() *Stealer
- func (e *Eval) Value() values.T
- type EvalConfig
- type Exec
- type ExecArg
- type ExecConfig
- type ExecInspect
- type Executor
- type File
- type Fileset
- func (v Fileset) AnyEmpty() bool
- func (v Fileset) Digest() digest.Digest
- func (v Fileset) Empty() bool
- func (v Fileset) Equal(w Fileset) bool
- func (v Fileset) Files() []File
- func (v Fileset) Flatten() []Fileset
- func (v Fileset) Flow() *Flow
- func (v Fileset) N() int
- func (v Fileset) Pullup() Fileset
- func (v Fileset) Short() string
- func (v Fileset) Size() int64
- func (v Fileset) String() string
- func (v Fileset) WriteDigest(w io.Writer)
- type Flow
- func (f *Flow) CacheKeys() []digest.Digest
- func (f *Flow) Canonicalize(config Config) *Flow
- func (f *Flow) Copy() *Flow
- func (f *Flow) DebugString() string
- func (f *Flow) Digest() digest.Digest
- func (f *Flow) ExecArg(i int) ExecArg
- func (f *Flow) ExecString(cache bool) string
- func (f *Flow) Fork(flow *Flow)
- func (f *Flow) Label(ident string)
- func (f *Flow) MapInit()
- func (f *Flow) NExecArg() int
- func (f *Flow) PhysicalDigest() digest.Digest
- func (f *Flow) Requirements() (req Requirements)
- func (f *Flow) String() string
- func (f *Flow) Visitor() *FlowVisitor
- func (f *Flow) WriteDigest(w io.Writer)
- type FlowState
- type FlowVisitor
- type Fork
- type Gauges
- type Liveset
- type Mutation
- type Op
- type Profile
- type Repository
- type Requirements
- type Reserve
- type Resources
- func (r *Resources) Add(x, y Resources) *Resources
- func (r Resources) Available(s Resources) bool
- func (r Resources) Equal(s Resources) bool
- func (r *Resources) Max(x, y Resources) *Resources
- func (r *Resources) Min(x, y Resources) *Resources
- func (r *Resources) Scale(s Resources, factor float64) *Resources
- func (r Resources) ScaledDistance(u Resources) float64
- func (r *Resources) Set(s Resources) *Resources
- func (r Resources) String() string
- func (r *Resources) Sub(x, y Resources) *Resources
- type Result
- type Stealer
- type Transferer
- type Unreserve
- type Value
- type WaitGroup
- Bugs
Constants ¶
This section is empty.
Variables ¶
var Digester = digest.Digester(crypto.SHA256)
Digester is the Digester used throughout reflow. We use a SHA256 digest.
var Universe string
Universe is the global namespace for digest computation.
Functions ¶
func WithBackground ¶
WithBackground returns a new context.Context with an affiliated Context, accessible via Background. The background context may be canceled with the returned cancellation function. The supplied WaitGroup is used to inform the caller of pending background operations: wg.Add(1) is called for each call to Background; wg.Done is called when the context returned from Background is disposed of through (Context).Complete.
Types ¶
type Arg ¶
type Arg struct { // Out is true if this is an output argument. Out bool // Fileset is the fileset used as an input argument. Fileset *Fileset `json:",omitempty"` // Index is the output argument index. Index int }
Arg represents an exec argument (either input or output).
type Cache ¶
type Cache interface { // Lookup returns the value associated with a (digest) key. // Lookup returns an error flagged errors.NotExist when there // is no such value. // // Lookup should also check to make sure that the objects // actually exist, and provide a reasonable guarantee that they'll // be available for transfer. // // TODO(marius): allow the caller to maintain a lease on the desired // objects so that garbage collection can (safely) be run // concurrently with flows. This isn't a correctness concern (the // flows may be restarted), but rather one of efficiency. Lookup(context.Context, digest.Digest) (Fileset, error) // Transfer transmits the file objects associated with value v // (usually retrieved by Lookup) to the repository dst. Transfer // should be used in place of direct (cache) repository access since // it may apply additional policies (e.g., rate limiting, etc.) Transfer(ctx context.Context, dst Repository, v Fileset) error // NeedTransfer returns the set of files in the Fileset v that are absent // in the provided repository. NeedTransfer(ctx context.Context, dst Repository, v Fileset) ([]File, error) // Write stores the Value v, whose file objects exist in Repository repo, // under the key id. If the repository is nil no objects are transferred. Write(ctx context.Context, id digest.Digest, v Fileset, repo Repository) error // Delete removes the value named by id from this cache. Delete(ctx context.Context, id digest.Digest) error // Repository returns this cache's underlying repository. It should // not be used for data transfer during the course of evaluation; see // Transfer. Repository() Repository }
A Cache stores Values and their associated File objects for later retrieval. Caches may be temporary: objects are not guaranteed to persist.
type CacheMode ¶
type CacheMode int
CacheMode is a bitmask that tells how caching is to be used in the evaluator.
const ( // CacheOff is CacheMode's default value and indicates // no caching (read or write) is to be performed. CacheOff CacheMode = 0 // CacheRead indicates that cache lookups should be performed // during evaluation. CacheRead CacheMode = 1 << iota // CacheWrite indicates that the evaluator should write evaluation // results to the cache. CacheWrite )
type Config ¶
type Config struct { // HashV1 should be set to true if the flow should use the legacy // "v1" hash algorithm. HashV1 bool }
Config stores flow configuration information. Configs modulate Flow behavior.
type Context ¶
Context is a context.Context that is used for background operations within reflow. In addition to providing a common background context for operations, it also carries a WaitGroup, so that the caller can wait for background operation completion.
func Background ¶
Background returns the Context associated with the given / parent context.Context. If there is no associated context, it returns a fresh Context without an affiliated WaitGroup.
type Eval ¶
type Eval struct { // EvalConfig is the evaluation configuration used in this // evaluation. EvalConfig // contains filtered or unexported fields }
Eval is an evaluator for Flows.
func NewEval ¶
func NewEval(root *Flow, config EvalConfig) *Eval
NewEval creates and initializes a new evaluator using the provided evaluation configuration and root flow.
func (*Eval) CacheWrite ¶
CacheWrite writes the cache entry for flow f, with objects in the provided source repository. CacheWrite returns nil on success, or else the first error encountered.
func (*Eval) Do ¶
Do evaluates a flow (as provided in Init) and returns its value, or error.
There are two evaluation modes, configured by EvalConfig.BottomUp.
When BottomUp is true, the Flow is evaluated in bottom-up mode. Each node's dependencies are evaluated (recursively); a node is evaluated when all of its dependencies are complete (and error free). Before a node is run, its result is first looked up in the configured cache. If there is a cache hit, evaluation without any work done. Only the node's value is downloaded; its objects are fetched lazily. When a node is ready to be evaluated, we check that all of the objects that it depends on are present in the executor's repository; missing objects are retrieved from cache. If these objects are not present in the cache (this can happen if the object is removed from the cache's repository after the cache lookup was done but before the transfer began), evaluation fails with a restartable error.
When BottomUp is false, the flow is evaluated first top-down, and then bottom up. In this mode, objects are looked up first in the top-down phase; a nodes dependencies are explored only on cache miss. Once this phase is complete, evaluation proceeds in bottom-up mode. Object retrievial is as in bottom-up mode.
Eval keeps track of the evaluation state of each node; these are described in the documentation for FlowState.
Evaluation is performed by simplification: ready nodes are added to a todo list. Single-step evaluation yields either a fully evaluated node (where (*Flow).Value is set to its result) or by a new Flow node (whose (*Flow).Parent is always set to its ancestor). Evaluations are restartable.
Eval permits supplementary workers to steal nodes to evaluate. These workers are responsible for transferring any necessary data between the Eval's repository and the worker's. Once a Flow node has been stolen, it is owned by the worker until it is returned; the worker must set the Flow node's state appropriately.
This provides a simple evaluation scheme that also does not leave any parallelism "on the ground".
Eval employs a conservative admission controller to ensure that we do not exceed available resources.
The root flow is canonicalized before evaluation.
Eval reclaims unreachable objects after each exec has completed and e.GC is set to true.
TODO(marius): wait for all nodes to complete before returning (early) when cancelling...
TODO(marius): explore making use of CAS flow states, so that we don't have to separately track pending nodes internally (so we don't clobber stolen nodes).
TODO(marius): permit "steal-only" mode. The only provision for this setup is that the parent must contain some sort of global repository (e.g., S3).
func (*Eval) LogSummary ¶
LogSummary prints an execution summary to an io.Writer.
func (*Eval) Mutate ¶
Mutate safely applies a set of mutations vis-a-vis the garbage collector. Mutations may be applied concurrently with each other; mutations are not applied during garbage collection.
func (*Eval) Need ¶
func (e *Eval) Need() Requirements
Need returns the total resource requirements needed in order to avoid queueing work.
func (*Eval) Requirements ¶
func (e *Eval) Requirements() Requirements
Requirements returns the minimum and maximum resource requirements for this Eval's flow.
func (*Eval) Stealer ¶
Stealer returns Stealer from which flow nodes may be stolen. This permits an external worker to perform the work implied by the return Flow, which is always in FlowReady state. When the external worker has completed processing (or decided not to process after all), the node must be returned via Return.
type EvalConfig ¶
type EvalConfig struct { // The executor to which execs are submitted. Executor Executor // An (optional) logger to which the evaluation transcript is printed. Log *log.Logger // Status gets evaluation status reports. Status *status.Group // An (optional) logger to print evaluation trace. Trace *log.Logger // Transferer is used to arrange transfers between repositories, // including nodes and caches. Transferer Transferer // Repository is the main, shared repository between evaluations. Repository Repository // Assoc is the main, shared assoc that is used to store cache and // metadata associations. Assoc assoc.Assoc // CacheMode determines whether the evaluator reads from // or writees to the cache. If CacheMode is nonzero, Assoc, // Repository, and Transferer must be non-nil. CacheMode CacheMode // NoCacheExtern determines whether externs are cached. NoCacheExtern bool // GC tells whether Eval should perform garbage collection // after each exec has completed. GC bool // RecomputeEmpty determines whether cached empty values // are recomputed. RecomputeEmpty bool // BottomUp determines whether we perform bottom-up only // evaluation, skipping the top-down phase. BottomUp bool // Config stores the flow config to be used. Config Config // CacheLookupTimeout is the timeout for cache lookups. // After the timeout expires, a cache lookup is considered // a miss. CacheLookupTimeout time.Duration // Invalidate is a function that determines whether or not f's cached // results should be invalidated. Invalidate func(f *Flow) bool }
EvalConfig provides runtime configuration for evaluation instances.
func (EvalConfig) String ¶
func (e EvalConfig) String() string
String returns a human-readable form of the evaluation configuration.
type Exec ¶
type Exec interface { // ID returns the digest of the exec. This is equivalent to the Digest of the value computed // by the Exec. ID() digest.Digest // URI names execs in a process-agnostic fashion. URI() string // Result returns the exec's result after it has been completed. Result(ctx context.Context) (Result, error) // Inspect inspects the exec. It can be called at any point in the Exec's lifetime. Inspect(ctx context.Context) (ExecInspect, error) // Wait awaits completion of the Exec. Wait(ctx context.Context) error // Logs returns the standard error and/or standard output of the Exec. // If it is called during execution, and if follow is true, it follows // the logs until completion of execution. // Completed Execs return the full set of available logs. Logs(ctx context.Context, stdout, stderr, follow bool) (io.ReadCloser, error) // Shell invokes /bin/bash inside an Exec. It can be invoked only when // the Exec is executing. r provides the shell input. The returned read // closer has the shell output. The caller has to close the read closer // once done. // TODO(pgopal) - Implement shell for zombie execs. Shell(ctx context.Context) (io.ReadWriteCloser, error) // Promote installs this exec's objects into the alloc's repository. Promote(context.Context) error }
An Exec computes a Value. It is created from an ExecConfig; the Exec interface permits waiting on completion, and inspection of results as well as ongoing execution.
type ExecArg ¶
type ExecArg struct { // Out tells whether this argument is an output argument. Out bool // Index is the dependency index represented by this argument. Index int }
ExecArg indexes arguments to dependencies.
type ExecConfig ¶
type ExecConfig struct { // The type of exec: "exec", "intern", "extern" Type string // A human-readable name for the exec. Ident string // intern, extern: the URL from which data is fetched or to which // data is pushed. URL string // exec: the docker image used to perform an exec Image string // exec: the Sprintf-able command that is to be run inside of the // Docker image. Cmd string // exec: the set of arguments (one per %s in Cmd) passed to the command // extern: the single argument which is to be exported Args []Arg // exec: the resource requirements for the exec Resources // NeedAWSCreds indicates the exec needs AWS credentials defined in // its environment: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and // AWS_SESSION_TOKEN will be available with the user's default // credentials. NeedAWSCreds bool // OutputIsDir tells whether an output argument (by index) // is a directory. OutputIsDir []bool `json:",omitempty"` }
ExecConfig contains all the necessary information to perform an exec.
func (ExecConfig) String ¶
func (e ExecConfig) String() string
type ExecInspect ¶
type ExecInspect struct { Created time.Time Config ExecConfig State string // "created", "waiting", "running", .., "zombie" Status string // human readable status Error *errors.Error `json:",omitempty"` // non-nil runtime on error Profile Profile // Gauges are used to export realtime exec stats. They are used only // while the Exec is in running state. Gauges Gauges // Commands running from top, for live inspection. Commands []string Docker types.ContainerJSON // Docker inspect output. }
ExecInspect describes the current state of an Exec.
func (ExecInspect) Runtime ¶
func (e ExecInspect) Runtime() time.Duration
Runtime computes the exec's runtime based on Docker's timestamps.
type Executor ¶
type Executor interface { // Put creates a new Exec at id. It it idempotent. Put(ctx context.Context, id digest.Digest, exec ExecConfig) (Exec, error) // Get retrieves the Exec named id. Get(ctx context.Context, id digest.Digest) (Exec, error) // Remove deletes an Exec. Remove(ctx context.Context, id digest.Digest) error // Execs lists all Execs known to the Executor. Execs(ctx context.Context) ([]Exec, error) // Resources indicates the total amount of resources available at the Executor. Resources() Resources // Repository returns the Repository associated with this Executor. Repository() Repository }
Executor manages Execs and their values.
type File ¶
type File struct { // The digest of the contents of the file. ID digest.Digest // The size of the file. Size int64 }
File represents a name-by-hash file.
type Fileset ¶
type Fileset struct { List []Fileset `json:",omitempty"` Map map[string]File `json:"Fileset,omitempty"` }
Fileset is the result of an evaluated flow. Values may either be lists of values or Filesets. Filesets are a map of paths to Files.
func (Fileset) AnyEmpty ¶
AnyEmpty tells whether this value, or any of its constituent values contain no files.
func (Fileset) Digest ¶
Digest returns a digest representing the value. Digests preserve semantics: two values with the same digest are considered to be equivalent.
func (Fileset) Equal ¶
Equal reports whether v is equal to w. Two values are equal when they produce the same digest.
func (Fileset) Flatten ¶
Flatten is a convenience function to flatten (shallowly) the value v, returning a list of Values. If the value is a list value, the list is returned; otherwise a unary list of the value v is returned.
func (Fileset) Short ¶
Short returns a short, human-readable string representing the value. Its intended use if for pretty-printed output. In particular, hashes are abbreviated, and lists display only the first member, followed by ellipsis. For example, a list of values is printed as:
list<val<sample.fastq.gz=f2c59c40>, ...50MB>
func (Fileset) String ¶
String returns a full, human-readable string representing the value v. Unlike Short, string is fully descriptive: it contains the full digest and lists are complete. For example:
list<sample.fastq.gz=sha256:f2c59c40a1d71c0c2af12d38a2276d9df49073c08360d72320847efebc820160>, sample2.fastq.gz=sha256:59eb82c49448e349486b29540ad71f4ddd7f53e5a204d50997f054d05c939adb>>
func (Fileset) WriteDigest ¶
WriteDigest writes the digestible material for v to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.
type Flow ¶
type Flow struct { // The operation represented by this node. See Op // for definitions. Op Op // Parent is set when a node is Forked. Parent *Flow // Deps holds this Flow's data dependencies. Deps []*Flow // Config stores this Flow's config. Config Config Image string // OpExec Cmd string // OpExec URL *url.URL // OpIntern, OpExtern Re *regexp.Regexp // OpGroupby, OpCollect Repl string // OpCollect MapFunc func(*Flow) *Flow // OpMap, OpMapMerge MapFlow *Flow // OpMap K func(vs []values.T) *Flow // OpK Coerce func(values.T) (values.T, error) // OpCoerce // ArgMap maps exec arguments to dependencies. (OpExec). Argmap []ExecArg // OutputIsDir tells whether the output i is a directory. OutputIsDir []bool // Argstrs stores a symbolic argument name, used for pretty printing // and debugging. Argstrs []string // FlowDigest stores, for OpVal and OpK, a digest representing // just the operation or value. FlowDigest digest.Digest // A human-readable identifier for the node, for use in // debugging output, etc. Ident string // Source code position of this node. Position string // State stores the evaluation state of the node; see FlowState // for details. State FlowState // Resources indicates the expected resource usage of this node. // Currently it is only defined for OpExec. Resources Resources // Reserved stores the amount of resources that have been reserved // on behalf of this node. Reserved Resources // FlowRequirements stores the requirements indicated by // OpRequirements. FlowRequirements Requirements // Value stores the Value to which the node was evaluated. Value values.T // Err stores any evaluation error that occured during flow evaluation. Err *errors.Error // The total runtime for evaluating this node. Runtime time.Duration // The current owning executor of this Flow. Owner Executor // The exec working on this node. Exec Exec // Cached stores whether the flow was retrieved from cache. Cached bool // The amount of data to be transferred. TransferSize data.Size // Inspect stores an exec's inspect output. Inspect ExecInspect Tracked bool Status *status.Task Data []byte // OpData // contains filtered or unexported fields }
Flow defines an AST for data flows. It is a logical union of ops as defined by type Op. Child nodes witness computational dependencies and must therefore be evaluated before its parents.
func (*Flow) CacheKeys ¶
CacheKeys returns all the valid cache keys for this flow node. They are returned in order from most concrete to least concrete.
func (*Flow) Canonicalize ¶
Canonicalize returns a canonical version of Flow f, where semantically equivalent flows (as per Flow.Digest) are collapsed into one.
func (*Flow) DebugString ¶
DebugString returns a human readable representation of the flow appropriate for debugging.
func (*Flow) Digest ¶
Digest produces a digest of Flow f. The digest captures the entirety of the Flows semantics: two flows with the same digest must evaluate to the same value. OpMap Flows are canonicalized by passing a no-op Flow to its MapFunc.
func (*Flow) ExecArg ¶
ExecArg returns the ith ExecArg. It is drawn from f.Argmap if it is defined, or else it just the i'th input argument.
ExecArg panics if i >= f.NExecArg().
func (*Flow) ExecString ¶
ExecString renders a string representing the operation performed by this node. Cache should be set to true if the result was retrieved from cache; in this case, values from dependencies are not rendered in this case since they may not be available. The returned string has the following format:
how:digest(ident) shortvalue = execstring (runtime transfer rate)
where "execstring" is a string indicating the operation performed.
For example, the ExecString of a node that interns a directory of FASTQs looks like this:
ecae46a4(inputfastq) val<161216_E00472_0063_AH7LWNALXX/CNVS-LUAD-120587007-cfDNA-WGS-Rep1_S1_L001_R1_001.fastq.gz=87f7ca18, ...492.45GB> = intern "s3://grail-avalon/samples/CNVS-LUAD-120587007-cfDNA-WGS-Rep1/fastq/" (1h29m33.908587144s 93.83MB/s)
func (*Flow) Fork ¶
Fork creates a new fork of this flow. The current version of Flow f becomes the parent flow.
func (*Flow) Label ¶
Label labels this flow with ident. It then recursively labels its ancestors. Labeling stops when a node is already labeled.
func (*Flow) MapInit ¶
func (f *Flow) MapInit()
MapInit initializes Flow.MapFlow from the supplied MapFunc.
func (*Flow) NExecArg ¶
NExecArg returns the number of exec arguments of this node. If f.Argmap is defined, it returns the length of the argument map, or else the number of dependencies.
func (*Flow) PhysicalDigest ¶
PhysicalDigest computes the physical digest of the Flow f, reflecting the actual underlying operation to be performed, and not the logical one.
It is an error to call PhysicalDigest on nodes whose dependencies are not fully resolved (i.e., state FlowDone, contains a Fileset value), or on nodes not of type OpExec, OpIntern, or OpExtern. This is because the physical input values must be available to compute the digest.
Physical digest returns an empty digest a physical digest is not computable for node f.
func (*Flow) Requirements ¶
func (f *Flow) Requirements() (req Requirements)
Requirements computes the minimum and maximum resource requirements for this flow. It currently assumes that the width of any map operation is infinite.
BUG(marius): Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.
TODO(marius): include width hints on map nodes
TODO(marius): account for static parallelism too.
func (*Flow) String ¶
Strings returns a shallow and human readable string representation of the flow.
func (*Flow) Visitor ¶
func (f *Flow) Visitor() *FlowVisitor
Visitor returns a new FlowVisitor rooted at this node.
func (*Flow) WriteDigest ¶
WriteDigest writes the digestible material of f to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.
type FlowState ¶
type FlowState int64
FlowState is an enum representing the state of a Flow node during evaluation.
const ( // FlowInit indicates that the flow is initialized but not evaluated FlowInit FlowState = iota // FlowNeedLookup indicates that the evaluator should perform a // cache lookup on the flow node. FlowNeedLookup // FlowLookup indicates that the evaluator is currently performing a // cache lookup of the flow node. After a successful cache lookup, // the node is transfered to FlowDone, and the (cached) value is // attached to the flow node. The objects may not be transfered into // the evaluator's repository. FlowLookup // FlowTODO indicates that the evaluator should consider the node // for evaluation once its dependencies are completed. FlowTODO // FlowNeedTransfer indicates that the evaluator should transfer all // objects needed for execution into the evaluator's repository. FlowNeedTransfer // FlowTransfer indicates that the evalutor is currently // transferring the flow's dependent objects from cache. FlowTransfer // FlowReady indicates that the node is ready for evaluation and should // be scheduled by the evaluator. A node is ready only once all of its // dependent objects are available in the evaluator's repository. FlowReady // FlowRunning indicates that the node is currently being evaluated by // the evaluator. FlowRunning // FlowDone indicates that the node has completed evaluation. FlowDone // FlowMax is the number of flow states. FlowMax )
FlowState denotes a Flow node's state during evaluation. Flows begin their life in FlowInit, where they remain until they are examined by the evaluator. The precise state transitions depend on the evaluation mode (whether it is evaluating bottom-up or top-down, and whether a cache is used), but generally follow the order in which they are laid out here.
type FlowVisitor ¶
type FlowVisitor struct { *Flow // contains filtered or unexported fields }
FlowVisitor implements a convenient visitor for flow graphs.
func (*FlowVisitor) Push ¶
func (v *FlowVisitor) Push(f *Flow)
Push pushes node f onto visitor stack.
func (*FlowVisitor) Visit ¶
func (v *FlowVisitor) Visit()
Visit pushes the current node's children on to the visitor stack, including both data and control dependencies.
func (*FlowVisitor) Walk ¶
func (v *FlowVisitor) Walk() bool
Walk visits the next flow node on the stack. Walk returns false when it runs out of nodes to visit; it also guarantees that each node is visited only once.
type Liveset ¶
type Liveset interface { // Contains returns true if the given object definitely is in the // set; it may rarely return true when the object does not. Contains(digest.Digest) bool }
A Liveset contains a possibly approximate judgement about live objects.
type Mutation ¶
type Mutation int
Mutation is a type of mutation.
const ( // Incr is the mutation that increments the reference count used for // GC. Incr Mutation = iota // Decr is the mutation that decrements the reference count used for // GC. Decr // Cached is the mutation that sets the flow's flag. Cached // Refresh is the mutation that refreshes the status of the flow node. Refresh )
type Op ¶
type Op int
Op is an enum representing operations that may be performed in a Flow.
const ( // OpExec runs a command in a docker container on the inputs // represented by the Flow's dependencies. OpExec Op = 1 + iota // OpIntern imports datasets from URLs. OpIntern // OpExtern exports values to URLs. OpExtern // OpGroupby applies a regular expression to group an input value. OpGroupby // OpMap applies a function (which returns a Flow) to each element // in the input. OpMap // OpCollect filters and rewrites values. OpCollect // OpMerge merges a set of flows. OpMerge // OpVal returns a value. OpVal // OpPullup merges a set of results into one value. OpPullup // OpK is a flow continuation. OpK // OpCoerce is a flow value coercion. (Errors allowed.) OpCoerce // OpRequirements modifies the flow's requirements. OpRequirements // OpData evaluates to a literal (inline) piece of data. OpData )
func (Op) DigestString ¶
type Repository ¶
type Repository interface { // Collect removes from this repository any objects not in the // Liveset Collect(context.Context, Liveset) error // Stat returns the File metadata for the blob with the given digest. // It returns errors.NotExist if the blob does not exist in this // repository. Stat(context.Context, digest.Digest) (File, error) // Get streams the blob named by the given Digest. // If it does not exist in this repository, an error with code // errors.NotFound will be returned. Get(context.Context, digest.Digest) (io.ReadCloser, error) // Put streams a blob to the repository and returns its // digest when completed. Put(context.Context, io.Reader) (digest.Digest, error) // WriteTo writes a blob identified by a Digest directly to a // foreign repository named by a URL. If the repository is // unable to write directly to the foreign repository, an error // with flag errors.NotSupported is returned. WriteTo(context.Context, digest.Digest, *url.URL) error // ReadFrom reads a blob identified by a Digest directly from a // foreign repository named by a URL. If the repository is // unable to read directly from the foreign repository, an error // with flag errors.NotSupported is returned. ReadFrom(context.Context, digest.Digest, *url.URL) error // URL returns the URL of this repository, or nil if it does not // have one. The returned URL may be used for direct transfers via // WriteTo or ReadFrom. URL() *url.URL }
Repository defines an interface used for servicing blobs of data that are named-by-hash.
type Requirements ¶
type Requirements struct { // Min is the smallest amount of resources that must be allocated // to satisfy the requirements. Min Resources // Width is the width of the requirements. A width of zero indicates // a "narrow" job: minimum describes the exact resources needed. // Widths greater than zero are "wide" requests: they require some // multiple of the minimum requirement. The distinction between a // width of zero and a width of one is a little subtle: width // represents the smallest acceptable width, and thus a width of 1 // can be taken as a hint to allocate a higher multiple of the // minimum requirements, whereas a width of 0 represents a precise // requirement: allocating any more is likely to be wasteful. Width int }
Requirements stores resource requirements, comprising the minimum amount of acceptable resources and a width.
func (*Requirements) Add ¶
func (r *Requirements) Add(s Requirements)
Add adds the provided requirements s to the requirements r. R's minimum requirements are set to the larger of the two; the two widths are added.
func (*Requirements) AddParallel ¶
func (r *Requirements) AddParallel(s Resources)
AddParallel adds the provided resources s to the requirements, and also increases the requirement's width by one.
func (*Requirements) AddSerial ¶
func (r *Requirements) AddSerial(s Resources)
AddSerial adds the provided resources s to the requirements.
func (Requirements) Equal ¶
func (r Requirements) Equal(s Requirements) bool
Equal reports whether r and s represent the same requirements.
func (*Requirements) Max ¶
func (r *Requirements) Max() Resources
Max is the maximum amount of resources represented by this resource request.
func (Requirements) String ¶
func (r Requirements) String() string
String renders a human-readable representation of r.
func (*Requirements) Wide ¶
func (r *Requirements) Wide() bool
Wide returns whether these requirements represent a wide resource request.
type Resources ¶
Resources describes a set of labeled resources. Each resource is described by a string label and assigned a value. The zero value of Resources represents the resources with zeros for all labels.
func (Resources) Equal ¶
Equal tells whether the resources r and s are equal in all dimensions of both r and s.
func (*Resources) Scale ¶
Scale sets r to the scaled resources s[key]*factor for all keys and returns r.
func (Resources) ScaledDistance ¶
ScaledDistance returns the distance between two resources computed as a sum of the differences in memory, cpu and disk with some predefined scaling.
type Result ¶
type Result struct { // Fileset is the fileset produced by an exec. Fileset Fileset `json:",omitempty"` // Err is error produced by an exec. Err *errors.Error `json:",omitempty"` }
Result is the result of an exec.
type Stealer ¶
type Stealer struct {
// contains filtered or unexported fields
}
A Stealer coordinates work stealing with an Eval. A Stealer instance is obtained by (*Eval).Stealer.
func (*Stealer) Admit ¶
Admit returns a channel that will return a stolen Flow node that makes use of at most max resources. Only one Admit can be active at a time: if Admit is called while another is outstanding, the first Admit is cancelled, closing its channel.
type Transferer ¶
type Transferer interface { // Transfer transfers a set of files from the src to the dst // repository. A transfer manager may apply policies (e.g., rate // limits and concurrency limits) to these transfers. Transfer(ctx context.Context, dst, src Repository, files ...File) error NeedTransfer(ctx context.Context, dst Repository, files ...File) ([]File, error) }
Transferer defines an interface used for management of transfers between multiple repositories.
type Unreserve ¶
type Unreserve Resources
Unreserve subtracts resources from the flow's reservation.
Notes ¶
Bugs ¶
Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package assoc defines data types for associative maps used within Reflow.
|
Package assoc defines data types for associative maps used within Reflow. |
dydbassoc
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB.
|
Package dydbassoc implements an assoc.Assoc based on AWS's DynamoDB. |
Package batch implements support for running batches of reflow (stateful) evaluations.
|
Package batch implements support for running batches of reflow (stateful) evaluations. |
cmd
|
|
reflowlet
Reflowlet is the agent process that is run on nodes in a Reflow cluster.
|
Reflowlet is the agent process that is run on nodes in a Reflow cluster. |
Package config defines an interface for configuring a Reflow instance.
|
Package config defines an interface for configuring a Reflow instance. |
all
Package all imports all standard configuration providers in Reflow.
|
Package all imports all standard configuration providers in Reflow. |
awsenvconfig
Package awsenvconfig configures AWS credentials to be derived from the user's environment.
|
Package awsenvconfig configures AWS credentials to be derived from the user's environment. |
ec2metadataconfig
Package ec2config defines and registers configuration providers using Amazon's EC2 metadata service.
|
Package ec2config defines and registers configuration providers using Amazon's EC2 metadata service. |
httpscaconfig
Package httpscaconfig defines a configuration provider named "httpsca" which can be used to configure HTTPS certificates via an on-disk certificate authority.
|
Package httpscaconfig defines a configuration provider named "httpsca" which can be used to configure HTTPS certificates via an on-disk certificate authority. |
httpsconfig
Package httpsconfig defines a configuration provider named "file" which can be used to configure HTTPS certificates.
|
Package httpsconfig defines a configuration provider named "file" which can be used to configure HTTPS certificates. |
s3config
Package s3config defines a configuration provider named "s3" which can be used to configure S3-based caches.
|
Package s3config defines a configuration provider named "s3" which can be used to configure S3-based caches. |
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2.
|
Package ec2cluster implements support for maintaining elastic clusters of Reflow instances on EC2. |
Package errors provides a standard error definition for use in Reflow.
|
Package errors provides a standard error definition for use in Reflow. |
internal
|
|
ec2authenticator
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root.
|
Package ec2authenticator implements Docker repository authentication for ECR using an AWS SDK session and a root. |
ecrauth
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories.
|
Package ecrauth provides an interface and utilities for authenticating AWS EC2 ECR Docker repositories. |
scanner
Package scanner provides a scanner and tokenizer for UTF-8-encoded text.
|
Package scanner provides a scanner and tokenizer for UTF-8-encoded text. |
status
Package status provides facilities for reporting statuses from a number of tasks working towards a common goal.
|
Package status provides facilities for reporting statuses from a number of tasks working towards a common goal. |
wg
Package wg implements a channel-enabled WaitGroup.
|
Package wg implements a channel-enabled WaitGroup. |
Package lang implements the reflow language.
|
Package lang implements the reflow language. |
pooltest
Package pooltest tests pools.
|
Package pooltest tests pools. |
testutil
Package testutil provides utilities for testing code that involves pools.
|
Package testutil provides utilities for testing code that involves pools. |
Package log implements leveling and teeing on top of Go's standard logs package.
|
Package log implements leveling and teeing on top of Go's standard logs package. |
Package pool implements resource pools for reflow.
|
Package pool implements resource pools for reflow. |
client
Package client implements a remoting client for reflow pools.
|
Package client implements a remoting client for reflow pools. |
server
Package server exposes a pool implementation for remote access.
|
Package server exposes a pool implementation for remote access. |
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories.
|
Package repository provides common ways to dial reflow.Repository implementations; it also provides some common utilities for working with repositories. |
client
Package client implements repository REST client.
|
Package client implements repository REST client. |
file
Package file implements a filesystem-backed repository.
|
Package file implements a filesystem-backed repository. |
s3
Package s3 implements an S3-backed repository.
|
Package s3 implements an S3-backed repository. |
server
Package server implements a Repository REST server.
|
Package server implements a Repository REST server. |
Package rest provides a framework for serving and accessing hierarchical resource-based APIs.
|
Package rest provides a framework for serving and accessing hierarchical resource-based APIs. |
Package syntax implements the Reflow language.
|
Package syntax implements the Reflow language. |
test
|
|
flow
Package flow contains a number of constructors for Flow nodes that are convenient for testing.
|
Package flow contains a number of constructors for Flow nodes that are convenient for testing. |
testutil
Package testutil contains various utilities for testing Reflow functionality.
|
Package testutil contains various utilities for testing Reflow functionality. |
Package tool implements the reflow command.
|
Package tool implements the reflow command. |
Package trace provides a tracing system for Reflow events.
|
Package trace provides a tracing system for Reflow events. |
Package types contains data structures and algorithms for dealing with value types in Reflow.
|
Package types contains data structures and algorithms for dealing with value types in Reflow. |
Package values defines data structures for representing (runtime) values in Reflow.
|
Package values defines data structures for representing (runtime) values in Reflow. |