Documentation ¶
Index ¶
- Variables
- func ImageQualifiers(image string) (img string, aws, docker bool)
- func WithBackground(ctx context.Context, wg WaitGroup) (context.Context, context.CancelFunc)
- type Config
- type Context
- type Edge
- type Eval
- func (e *Eval) CacheWrite(ctx context.Context, f *Flow) error
- func (e *Eval) Do(ctx context.Context) error
- func (e *Eval) Err() error
- func (e *Eval) Flow() *Flow
- func (e *Eval) LogFlow(ctx context.Context, f *Flow)
- func (e *Eval) LogSummary(log *log.Logger)
- func (e *Eval) Mutate(f *Flow, muts ...interface{})
- func (e *Eval) Requirements() reflow.Requirements
- func (e *Eval) Value() values.T
- type EvalConfig
- type ExecArg
- type Flow
- func (f *Flow) AbbrevCmd() string
- func (f *Flow) CacheKeys() []digest.Digest
- func (f *Flow) Canonicalize(config Config) *Flow
- func (f *Flow) Copy() *Flow
- func (f *Flow) DebugString() string
- func (f *Flow) Digest() digest.Digest
- func (f *Flow) ExecArg(i int) ExecArg
- func (f *Flow) ExecConfig() reflow.ExecConfig
- func (f *Flow) ExecReset()
- func (f *Flow) ExecString(cache bool) string
- func (f *Flow) Fork(flow *Flow)
- func (f *Flow) Label(ident string)
- func (f *Flow) MapInit()
- func (f *Flow) NExecArg() int
- func (f *Flow) Requirements() (req reflow.Requirements)
- func (f *Flow) String() string
- func (f *Flow) Visitor() *FlowVisitor
- func (f *Flow) WriteDigest(w io.Writer)
- type FlowVisitor
- type Fork
- type KContext
- type Mutation
- type Node
- type Op
- type Repair
- type SetReserved
- type Snapshotter
- type State
- type Status
- type Value
- type WaitGroup
- Bugs
Constants ¶
This section is empty.
Variables ¶
var Digester = digest.Digester(crypto.SHA256)
Digester is the Digester used throughout reflow. We use a SHA256 digest.
var Universe string
Universe is the global namespace for digest computation.
Functions ¶
func ImageQualifiers ¶
ImageQualifiers analyzes the given image for the presence of backdoor qualifiers, strips the image of them, and returns a boolean for each known qualifier if present.
func WithBackground ¶
WithBackground returns a new context.Context with an affiliated Context, accessible via Background. The background context may be canceled with the returned cancellation function. The supplied WaitGroup is used to inform the caller of pending background operations: wg.Add(1) is called for each call to Background; wg.Done is called when the context returned from Background is disposed of through (Context).Complete.
Types ¶
type Config ¶
type Config struct { // HashV1 should be set to true if the flow should use the legacy // "v1" hash algorithm. HashV1 bool }
Config stores flow configuration information. Configs modulate Flow behavior.
type Context ¶
Context is a context.Context that is used for background operations within reflow. In addition to providing a common background context for operations, it also carries a WaitGroup, so that the caller can wait for background operation completion.
func Background ¶
Background returns the Context associated with the given / parent context.Context. If there is no associated context, it returns a fresh Context without an affiliated WaitGroup.
type Edge ¶
Edge represents a dependency from a flow node to another.
func (Edge) Attributes ¶
Attributes implments encoding.Attributer. Blue edges are dynamically explored edges.
type Eval ¶
type Eval struct { // EvalConfig is the evaluation configuration used in this // evaluation. EvalConfig // contains filtered or unexported fields }
Eval is an evaluator for Flows.
func NewEval ¶
func NewEval(root *Flow, config EvalConfig) *Eval
NewEval creates and initializes a new evaluator using the provided evaluation configuration and root flow.
func (*Eval) CacheWrite ¶
CacheWrite writes the cache entry for flow f, with objects in the provided source repository. CacheWrite returns nil on success, or else the first error encountered.
func (*Eval) Do ¶
Do evaluates a flow (as provided in Init) and returns its value, or error.
There are two evaluation modes, configured by EvalConfig.BottomUp.
When BottomUp is true, the Flow is evaluated in bottom-up mode. Each node's dependencies are evaluated (recursively); a node is evaluated when all of its dependencies are complete (and error free). Before a node is run, its result is first looked up in the configured cache. If there is a cache hit, evaluation without any work done. Only the node's value is downloaded; its objects are fetched lazily. When a node is ready to be evaluated, we check that all of the objects that it depends on are present in the executor's repository; missing objects are retrieved from cache. If these objects are not present in the cache (this can happen if the object is removed from the cache's repository after the cache lookup was done but before the transfer began), evaluation fails with a restartable error.
When BottomUp is false, the flow is evaluated first top-down, and then bottom up. In this mode, objects are looked up first in the top-down phase; a nodes dependencies are explored only on cache miss. Once this phase is complete, evaluation proceeds in bottom-up mode. Object retrievial is as in bottom-up mode.
Eval keeps track of the evaluation state of each node; these are described in the documentation for State.
Evaluation is performed by simplification: ready nodes are added to a todo list. Single-step evaluation yields either a fully evaluated node (where (*Flow).Value is set to its result) or by a new Flow node (whose (*Flow).Parent is always set to its ancestor). Evaluations are restartable.
This provides a simple evaluation scheme that also does not leave any parallelism "on the ground".
Eval employs a conservative admission controller to ensure that we do not exceed available resources.
The root flow is canonicalized before evaluation.
TODO(marius): wait for all nodes to complete before returning (early) when cancelling...
func (*Eval) LogSummary ¶
LogSummary prints an execution summary to an io.Writer.
func (*Eval) Mutate ¶
Mutate safely applies a set of mutations which may be applied concurrently with each other.
func (*Eval) Requirements ¶
func (e *Eval) Requirements() reflow.Requirements
Requirements returns the minimum and maximum resource requirements for this Eval's flow.
type EvalConfig ¶
type EvalConfig struct { // Scheduler is used to run tasks. // The scheduler must use the same repository as the evaluator. Scheduler *sched.Scheduler // Predictor is used to predict the tasks' resource usage. It // will only be used if a Scheduler is defined. Predictor *predictor.Predictor // Snapshotter is used to snapshot source URLs into unloaded // filesets. If non-nil, then files are delay-loaded. Snapshotter Snapshotter // An (optional) logger to which the evaluation transcript is printed. Log *log.Logger // DotWriter is an (optional) writer where the evaluator will write the flowgraph to in dot format. DotWriter io.Writer // Status gets evaluation status reports. Status *status.Group // An (optional) logger to print evaluation trace. Trace *log.Logger // Repository is the main, shared repository between evaluations. Repository reflow.Repository // Assoc is the main, shared assoc that is used to store cache and // metadata associations. Assoc assoc.Assoc // AssertionGenerator is the implementation for generating assertions. AssertionGenerator reflow.AssertionGenerator // Assert is the policy to use for asserting cached Assertions. Assert reflow.Assert // RunID is a unique identifier for the run RunID taskdb.RunID // CacheMode determines whether the evaluator reads from // or writes to the cache. If CacheMode is nonzero, Assoc, // Repository, and Transferer must be non-nil. CacheMode infra2.CacheMode // RecomputeEmpty determines whether cached empty values // are recomputed. RecomputeEmpty bool // BottomUp determines whether we perform bottom-up only // evaluation, skipping the top-down phase. BottomUp bool // PostUseChecksum indicates whether input filesets are checksummed after use. PostUseChecksum bool // Config stores the flow config to be used. Config Config // ImageMap stores the canonical names of the images. // A canonical name has a fully qualified registry host, // and image digest instead of image tag. ImageMap map[string]string // CacheLookupTimeout is the timeout for cache lookups. // After the timeout expires, a cache lookup is considered // a miss. CacheLookupTimeout time.Duration // Invalidate is a function that determines whether or not f's cached // results should be invalidated. Invalidate func(f *Flow) bool // Labels is the labels for this run. Labels pool.Labels // MaxResources is the max resources that can be used for a single task for this evaluation MaxResources reflow.Resources }
EvalConfig provides runtime configuration for evaluation instances.
func (EvalConfig) String ¶
func (e EvalConfig) String() string
String returns a human-readable form of the evaluation configuration.
type ExecArg ¶
type ExecArg struct { // Out tells whether this argument is an output argument. Out bool // Index is the dependency index represented by this argument. Index int }
ExecArg indexes arguments to dependencies.
type Flow ¶
type Flow struct { // The operation represented by this node. See Op // for definitions. Op Op // Parent is set when a node is Forked. Parent *Flow // Deps holds this Flow's data dependencies. Deps []*Flow // Config stores this Flow's config. Config Config Image string // OpExec Cmd string // OpExec URL *url.URL // OpIntern, Extern Re *regexp.Regexp // Groupby, Collect Repl string // Collect MapFunc func(*Flow) *Flow // Map, MapMerge MapFlow *Flow // Map K func(vs []values.T) *Flow // K Kctx func(ctx KContext, v []values.T) *Flow // Kctx Coerce func(values.T) (values.T, error) // Coerce // ArgMap maps exec arguments to dependencies. (OpExec). Argmap []ExecArg // OutputIsDir tells whether the output i is a directory. OutputIsDir []bool // Original fields if this Flow was rewritten with canonical values. OriginalImage string // Argstrs stores a symbolic argument name, used for pretty printing // and debugging. Argstrs []string // FlowDigest stores, for Val, K and Coerce, a digest representing // just the operation or value. FlowDigest digest.Digest // ExtraDigest is considered as an additional digestible material of this flow // and included in the the flow's logical and physical digest computation. ExtraDigest digest.Digest // A human-readable identifier for the node, for use in // debugging output, etc. Ident string // Source code position of this node. Position string // State stores the evaluation state of the node; see State // for details. State State // Resources indicates the expected resource usage of this node. // Currently it is only defined for OpExec. Resources reflow.Resources // Reserved stores the amount of resources that have been reserved // on behalf of this node. Reserved reflow.Resources // FlowRequirements stores the requirements indicated by // Requirements. FlowRequirements reflow.Requirements // Value stores the Value to which the node was evaluated. Value values.T // Err stores any evaluation error that occurred during flow evaluation. Err *errors.Error // The total runtime for evaluating this node. Runtime time.Duration // The current owning executor of this Flow. Owner reflow.Executor // The exec working on this node. Exec reflow.Exec // Cached stores whether the flow was retrieved from cache. Cached bool // RunInfo stores a subset of the exec's inspect data, used for logging. RunInfo reflow.ExecRunInfo Tracked bool Status *status.Task Data []byte // Data // MustIntern is set to true if an OpIntern must be // fully interned and cannot be pre-resolved. MustIntern bool // Dirty is used by the evaluator to track which nodes are dirtied // by this node: once the node has been evaluated, these flows // may be eligible for evaluation. Dirty []*Flow // Pending maintains a map of this node's dependent nodes that // are pending evaluation. It is maintained by the evaluator to trigger // evaluation. Pending map[*Flow]bool // NonDeterministic, in the case of Execs, denotes if the exec is non-deterministic. NonDeterministic bool // ExecDepIncorrectCacheKeyBug is set for nodes that are known to be impacted by a bug // which causes the cache keys to be incorrectly computed. // See https://github.com/grailbio/reflow/pull/128 or T41260. ExecDepIncorrectCacheKeyBug bool // contains filtered or unexported fields }
Flow defines an AST for data flows. It is a logical union of ops as defined by type Op. Child nodes witness computational dependencies and must therefore be evaluated before its parents.
func (*Flow) CacheKeys ¶
CacheKeys returns all the valid cache keys for this flow node. They are returned in order from most concrete to least concrete.
func (*Flow) Canonicalize ¶
Canonicalize returns a canonical version of Flow f, where semantically equivalent flows (as per Flow.Digest) are collapsed into one.
func (*Flow) DebugString ¶
DebugString returns a human readable representation of the flow appropriate for debugging.
func (*Flow) Digest ¶
Digest produces a digest of Flow f. The digest captures the entirety of the Flows semantics: two flows with the same digest must evaluate to the same value. Map Flows are canonicalized by passing a no-op Flow to its MapFunc.
func (*Flow) ExecArg ¶
ExecArg returns the ith ExecArg. It is drawn from f.Argmap if it is defined, or else it just the i'th input argument.
ExecArg panics if i >= f.NExecArg().
func (*Flow) ExecConfig ¶
func (f *Flow) ExecConfig() reflow.ExecConfig
ExecConfig returns the flow's exec configuration. The flows dependencies must already be computed before invoking ExecConfig. ExecConfig is valid only for Intern, Extern, and Exec ops.
func (*Flow) ExecReset ¶
func (f *Flow) ExecReset()
ExecReset resets all flow parameters related to running a single exec.
func (*Flow) ExecString ¶
ExecString renders a string representing the operation performed by this node. Cache should be set to true if the result was retrieved from cache; in this case, values from dependencies are not rendered in this case since they may not be available. The returned string has the following format:
how:digest(ident) shortvalue = execstring (runtime transfer rate)
where "execstring" is a string indicating the operation performed.
For example, the ExecString of a node that interns a directory of FASTQs looks like this:
ecae46a4(inputfastq) val<161216_E00472_0063_AH7LWNALXX/CNVS-LUAD-120587007-cfDNA-WGS-Rep1_S1_L001_R1_001.fastq.gz=87f7ca18, ...492.45GB> = intern "s3://grail-avalon/samples/CNVS-LUAD-120587007-cfDNA-WGS-Rep1/fastq/" (1h29m33.908587144s 93.83MB/s)
func (*Flow) Fork ¶
Fork creates a new fork of this flow. The current version of Flow f becomes the parent flow.
func (*Flow) Label ¶
Label labels this flow with ident. It then recursively labels its ancestors. Labeling stops when a node is already labeled.
func (*Flow) MapInit ¶
func (f *Flow) MapInit()
MapInit initializes Flow.MapFlow from the supplied MapFunc.
func (*Flow) NExecArg ¶
NExecArg returns the number of exec arguments of this node. If f.Argmap is defined, it returns the length of the argument map, or else the number of dependencies.
func (*Flow) Requirements ¶
func (f *Flow) Requirements() (req reflow.Requirements)
Requirements computes the minimum and maximum resource requirements for this flow. It currently assumes that the width of any map operation is infinite.
BUG(marius): Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.
TODO(marius): include width hints on map nodes
TODO(marius): account for static parallelism too.
func (*Flow) String ¶
Strings returns a shallow and human readable string representation of the flow.
func (*Flow) Visitor ¶
func (f *Flow) Visitor() *FlowVisitor
Visitor returns a new FlowVisitor rooted at this node.
func (*Flow) WriteDigest ¶
WriteDigest writes the digestible material of f to w. The io.Writer is assumed to be produced by a Digester, and hence infallible. Errors are not checked.
type FlowVisitor ¶
type FlowVisitor struct { *Flow // contains filtered or unexported fields }
FlowVisitor implements a convenient visitor for flow graphs.
func (*FlowVisitor) Push ¶
func (v *FlowVisitor) Push(f *Flow)
Push pushes node f onto visitor stack.
func (*FlowVisitor) Visit ¶
func (v *FlowVisitor) Visit()
Visit pushes the current node's children on to the visitor stack, including both data and control dependencies.
func (*FlowVisitor) Walk ¶
func (v *FlowVisitor) Walk() bool
Walk visits the next flow node on the stack. Walk returns false when it runs out of nodes to visit; it also guarantees that each node is visited only once.
type KContext ¶
type KContext interface { // Context is supplied context. context.Context // Repository returns the repository. Repository() reflow.Repository }
KContext is the context provided to a continuation (Kctx).
type Mutation ¶
type Mutation int
Mutation is a type of mutation.
const ( Invalid Mutation = iota // Cached is the mutation that sets the flow's flag. Cached // Refresh is the mutation that refreshes the status of the flow node. Refresh // MustIntern sets the flow's MustIntern flag to true. MustIntern // NoStatus indicates that a flow node's status should not be updated. NoStatus // Propagate is the mutation that propagates a flow's dependency assertions // to the flow's result Fileset. Results in a no-op if the flow has no result fileset. Propagate )
type Node ¶
type Node struct {
*Flow
}
Node is a flow node in the dot graph.
func (Node) Attributes ¶
Attributes implments encoding.Attributer.
type Op ¶
type Op int
Op is an enum representing operations that may be performed in a Flow.
const ( // Exec runs a command in a docker container on the inputs // represented by the Flow's dependencies. Exec Op = 1 + iota // Intern imports datasets from URLs. Intern // Extern exports values to URLs. Extern // Groupby applies a regular expression to group an input value. Groupby // Map applies a function (which returns a Flow) to each element // in the input. Map // Collect filters and rewrites values. Collect // Merge merges a set of flows. Merge // Val returns a value. Val // Pullup merges a set of results into one value. Pullup // K is a flow continuation. K // Coerce is a flow value coercion. (Errors allowed.) Coerce // Requirements modifies the flow's requirements. Requirements // Data evaluates to a literal (inline) piece of data. Data // Kctx is a flow continuation with access to the evaluator context. Kctx )
func (Op) DigestString ¶
type Repair ¶
type Repair struct { // EvalConfig is the repair's configuration. Only Assoc and Repository need // to be configured. EvalConfig // GetLimit is applied to the assoc's get requests. GetLimit *limiter.Limiter // NumWrites is incremented for each new assoc entry written by the repair job. NumWrites int64 // contains filtered or unexported fields }
Repair performs cache-repair for flows. Repair can forward-migrate Reflow caches when more key types are added.
Repair works by simulating evaluation using logical cache keys (and performing direct evaluation on the cached metadata where it can) and then writing new cache keys back to the assoc.
func NewRepair ¶
func NewRepair(config EvalConfig) *Repair
NewRepair returns a new repair configured with the provided EvalConfig.
The caller must call (*Repair).Go before submitting evaluations through (*Repair.Do).
func (*Repair) Do ¶
Do repairs the flow f. Repair is performed by using cached evaluations to populate values, and, when the cache is missing entries and a value can be computed immediately (i.e., without consulting an executor), computing that value. Flows that are successfully evaluated this way (sustaining no errors) are written back with their completed set of cache keys.
Only OpExec flows are written back.
type SetReserved ¶
SetReserved sets the flow's Reserved resources.
type Snapshotter ¶
Snapshotter provides an interface for snapshotting source URL data into unloaded filesets.
type State ¶
type State int64
State is an enum representing the state of a Flow node during evaluation.
const ( // Init indicates that the flow is initialized but not evaluated Init State = iota // NeedLookup indicates that the evaluator should perform a // cache lookup on the flow node. NeedLookup // Lookup indicates that the evaluator is currently performing a // cache lookup of the flow node. After a successful cache lookup, // the node is transfered to Done, and the (cached) value is // attached to the flow node. The objects may not be transfered into // the evaluator's repository. Lookup // TODO indicates that the evaluator should consider the node // for evaluation once its dependencies are completed. TODO // Ready indicates that the node is ready for evaluation and should // be scheduled by the evaluator. A node is ready only once all of its // dependent objects are available in the evaluator's repository. Ready // NeedSubmit indicates the task is ready to be submitted to the // scheduler. NeedSubmit // Running indicates that the node is currently being evaluated by // the evaluator. Running Execing // Done indicates that the node has completed evaluation. Done // Max is the number of flow states. Max )
State denotes a Flow node's state during evaluation. Flows begin their life in Init, where they remain until they are examined by the evaluator. The precise state transitions depend on the evaluation mode (whether it is evaluating bottom-up or top-down, and whether a cache is used), but generally follow the order in which they are laid out here.
Notes ¶
Bugs ¶
Requirements cannot accurately determine disk space requirements as they depend on object liveness. They are currently treated as entirely ephemeral.