Documentation ¶
Overview ¶
Package flow provides a low-level workflow manager based on a CUE Instance.
A Task defines an operational unit in a Workflow and corresponds to a struct in a CUE instance. This package does not define what a Task looks like in a CUE Instance. Instead, the user of this package must supply a TaskFunc that creates a Runner for cue.Values that are deemed to be a Task.
Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed. A Task A depends on another Task B if A, directly or indirectly, has a reference to any field of Task B, including its root.
Example ¶
package main import ( "context" "fmt" "log" "cuelang.org/go/cue" "cuelang.org/go/cue/cuecontext" "cuelang.org/go/tools/flow" ) func main() { ctx := cuecontext.New() v := ctx.CompileString(` a: { input: "world" output: string } b: { input: a.output output: string } `) if err := v.Err(); err != nil { log.Fatal(err) } controller := flow.New(nil, v, ioTaskFunc) if err := controller.Run(context.Background()); err != nil { log.Fatal(err) } } func ioTaskFunc(v cue.Value) (flow.Runner, error) { inputPath := cue.ParsePath("input") input := v.LookupPath(inputPath) if !input.Exists() { return nil, nil } return flow.RunnerFunc(func(t *flow.Task) error { inputVal, err := t.Value().LookupPath(inputPath).String() if err != nil { return fmt.Errorf("input not of type string") } outputVal := fmt.Sprintf("hello %s", inputVal) fmt.Printf("setting %s.output to %q\n", t.Path(), outputVal) return t.Fill(map[string]string{ "output": outputVal, }) }), nil }
Output: setting a.output to "hello world" setting b.output to "hello hello world"
Index ¶
- Variables
- type Config
- type Controller
- type Runner
- type RunnerFunc
- type State
- type Task
- func (t *Task) Context() context.Context
- func (t *Task) Dependencies() []*Task
- func (t *Task) Err() error
- func (t *Task) Fill(x interface{}) error
- func (t *Task) Index() int
- func (t *Task) Path() cue.Path
- func (t *Task) PathDependencies(p cue.Path) []*Task
- func (t *Task) State() State
- func (t *Task) Stats() stats.Counts
- func (t *Task) Value() cue.Value
- type TaskFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAbort may be returned by a task to avoid processing downstream tasks. // This can be used by control nodes to influence execution. ErrAbort = errors.New("abort dependant tasks without failure") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Root limits the search for tasks to be within the path indicated to root. // For the cue command, this is set to ["command"]. The default value is // for all tasks to be root. Root cue.Path // InferTasks allows tasks to be defined outside of the Root. Such tasks // will only be included in the workflow if any of its fields is referenced // by any of the tasks defined within Root. // // CAVEAT EMPTOR: this features is mostly provided for backwards // compatibility with v0.2. A problem with this approach is that it will // look for task structs within arbitrary data. So if not careful, there may // be spurious matches. InferTasks bool // IgnoreConcrete ignores references for which the values are already // concrete and cannot change. IgnoreConcrete bool // FindHiddenTasks allows tasks to be defined in hidden fields. FindHiddenTasks bool // UpdateFunc is called whenever the information in the controller is // updated. This includes directly after initialization. The task may be // nil if this call is not the result of a task completing. UpdateFunc func(c *Controller, t *Task) error }
A Config defines options for interpreting an Instance as a Workflow.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
A Controller defines a set of Tasks to be executed.
func New ¶
func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller
New creates a Controller for a given Instance and TaskFunc.
The instance value can either be a *cue.Instance or a cue.Value.
func (*Controller) Run ¶
func (c *Controller) Run(ctx context.Context) error
Run runs the tasks of a workflow until completion.
func (*Controller) Stats ¶ added in v0.5.0
func (c *Controller) Stats() (counts stats.Counts)
Stats reports statistics on the total number of CUE operations used.
This is an experimental method and the API is likely to change. The Counts.String method will likely stay and is the safest way to use this API.
This currently should only be called after completion or within a call to UpdateFunc.
func (*Controller) Tasks ¶
func (c *Controller) Tasks() []*Task
Tasks reports the tasks that are currently registered with the controller.
This may currently only be called before Run is called or from within a call to UpdateFunc. Task pointers returned by this call are not guaranteed to be the same between successive calls to this method.
func (*Controller) Value ¶ added in v0.4.3
func (c *Controller) Value() cue.Value
Value returns the value managed by the controller.
It is safe to use the value only after Run() has returned. It panics if the flow is running.
type Runner ¶
type Runner interface { // Run runs a Task. If any of the tasks it depends on returned an error it // is passed to this task. It reports an error upon failure. // // Any results to be returned can be set by calling Fill on the passed task. // // TODO: what is a good contract for receiving and passing errors and abort. // // If for a returned error x errors.Is(x, ErrAbort), all dependant tasks // will not be run, without this being an error. Run(t *Task, err error) error }
A Runner executes a Task.
type RunnerFunc ¶
A RunnerFunc runs a Task.
type State ¶
type State int
A State indicates the state of a Task.
The following state diagram indicates the possible state transitions:
Ready ↗︎ ↘︎ Waiting ← Running ↘︎ ↙︎ Terminated
A Task may move from Waiting to Terminating if one of the tasks on which it depends fails.
NOTE: transitions from Running to Waiting are currently not supported. In the future this may be possible if a task depends on continuously running tasks that send updates.
const ( // Waiting indicates a task is blocked on input from another task. // // NOTE: although this is currently not implemented, a task could // theoretically move from the Running to Waiting state. Waiting State = iota // Ready means a tasks is ready to run, but currently not running. Ready // Running indicates a goroutine is currently active for a task and that // it is not Waiting. Running // Terminated means a task has stopped running either because it terminated // while Running or was aborted by task on which it depends. The error // value of a Task indicates the reason for the termination. Terminated )
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
A Task contains the context for a single task execution. Tasks may be run concurrently.
func (*Task) Dependencies ¶
Dependencies reports the Tasks t depends on.
This method may currently only be called before Run is called or after a Task completed, or from within a call to UpdateFunc.
func (*Task) Err ¶
Err returns the error of a completed Task.
This method may currently only be called before Run is called, after a Task completed, or from within a call to UpdateFunc.
func (*Task) Fill ¶
Fill fills in values of the Controller's configuration for the current task. The changes take effect after the task completes.
This method may currently only be called by the runner.
func (*Task) Path ¶
Path reports the path of Task within the Instance in which it is defined. The Path is always valid.
func (*Task) PathDependencies ¶
PathDependencies reports the dependencies found for a value at the given path.
This may currently only be called before Run is called or from within a call to UpdateFunc.
func (*Task) State ¶
State is the current state of the Task.
This method may currently only be called before Run is called or after a Task completed, or from within a call to UpdateFunc.