Documentation
¶
Overview ¶
Package workflow provides the workflow execution engine.
Index ¶
- Constants
- Variables
- func BuildNamespaces(stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema]) map[string]map[string]*schema.ObjectSchema
- func GetOutputNodeID(stepID string, stageID string, outputID string) string
- func GetSchema() *schema.TypedScopeSchema[*Workflow]
- func GetStageNodeID(stepID string, stageID string) string
- func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, ...)
- type DAGItem
- type DAGItemKind
- type DependencyError
- type ErrInvalidInput
- type ErrInvalidInputYAML
- type ErrInvalidState
- type ErrInvalidWorkflow
- type ErrInvalidWorkflowYAML
- type ErrNoMorePossibleOutputs
- type ErrNoMorePossibleSteps
- type ExecutableWorkflow
- type Executor
- type Workflow
- type YAMLConverter
Examples ¶
Constants ¶
const ( // WorkflowInputKey is the key in the workflow map for input. WorkflowInputKey = "input" // WorkflowStepsKey is the key in the workflow map for the steps. WorkflowStepsKey = "steps" )
const OrDisabledTag = "!ordisabled"
OrDisabledTag is the yaml tag to specify that the following code should be interpreted as a `oneof` type with two possible outputs: the expr specified or the disabled output.
const SoftOptionalTag = "!soft-optional"
SoftOptionalTag is the tag to specify that the field, when used in an object, should be optional with an optional DAG dependency.
const WaitOptionalTag = "!wait-optional"
WaitOptionalTag is the tag to specify that the field, when used in an object, should be optional with a completion DAG dependency.
const YamlDiscriminatorKey = "discriminator"
YamlDiscriminatorKey is the key to specify the discriminator inside a !oneof section.
const YamlExprTag = "!expr"
YamlExprTag is the key to specify that the following code should be interpreted as an expression.
const YamlOneOfKey = "one_of"
YamlOneOfKey is the key to specify the oneof options within a !oneof section.
const YamlOneOfTag = "!oneof"
YamlOneOfTag is the yaml tag that allows the section to be interpreted as a OneOf.
Variables ¶
var ErrEmptyWorkflowFile = fmt.Errorf("empty workflow file provided in context")
ErrEmptyWorkflowFile signals that the workflow file was provided, but it was empty.
var ErrNoSteps = fmt.Errorf("no steps defined in workflow")
ErrNoSteps signals that the workflow has no steps defined.
var ErrWorkflowAborted = fmt.Errorf("workflow execution aborted")
ErrWorkflowAborted indicates that the workflow execution was intentionally aborted.
Functions ¶
func BuildNamespaces ¶ added in v0.19.1
func BuildNamespaces(stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema]) map[string]map[string]*schema.ObjectSchema
BuildNamespaces creates a namespaced collection of objects for the inputs and outputs of each stage in the step's lifecycles. It maps namespace id (path) to object id to object schema.
func GetOutputNodeID ¶ added in v0.4.0
GetOutputNodeID returns the DAG node ID for a stage output.
func GetSchema ¶
func GetSchema() *schema.TypedScopeSchema[*Workflow]
GetSchema returns the entire workflow schema.
func GetStageNodeID ¶ added in v0.4.0
GetStageNodeID returns the DAG node ID for a stage.
func PrintObjectNamespaceTable ¶ added in v0.19.1
func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, logger log.Logger)
PrintObjectNamespaceTable constructs and writes a tidy table of workflow Objects and their namespaces to the given output destination.
Types ¶
type DAGItem ¶ added in v0.4.0
type DAGItem struct { // Kind discriminates between the input and step nodes. Kind DAGItemKind // StepID is only filled for step types. StepID string // StageID is the stage of the step provider this item refers to. StageID string // OutputID is the ID of the output of the step stage. OutputID string // OutputSchema contains the output-specific schema for this item. OutputSchema schema.StepOutput // Data is the processed input containing expressions. Data any // DataSchema is the corresponding schema for the Data once the expressions are resolved. DataSchema schema.Type // Provider is the runnable step from the step provider that can be executed. Provider step.RunnableStep }
DAGItem is the internal structure of the DAG.
type DAGItemKind ¶ added in v0.4.0
type DAGItemKind string
DAGItemKind is the type of DAG node items.
const ( // DAGItemKindInput indicates a DAG node for the workflow input. DAGItemKindInput DAGItemKind = "input" // DAGItemKindStepStage indicates a DAG node for a stage. DAGItemKindStepStage DAGItemKind = "stepStage" // DAGItemKindStepStageOutput indicates a DAG node for an output of a stage. DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput" // DAGItemKindOutput indicates a DAG node for the workflow output. DAGItemKindOutput DAGItemKind = "output" // DagItemKindDependencyGroup indicates a DAG node used to complete a part of // an input or output that needs dependencies grouped, typically for OR dependencies // or optional dependencies. DagItemKindDependencyGroup DAGItemKind = "dependencyGroup" )
type DependencyError ¶ added in v0.4.0
type DependencyError struct { ID string `json:"id"` Path []string `json:"path"` Message string `json:"message"` Cause error `json:"cause"` }
DependencyError describes an error while preparing dependencies.
func (DependencyError) Error ¶ added in v0.4.0
func (d DependencyError) Error() string
Error returns the error message.
func (DependencyError) Unwrap ¶ added in v0.4.0
func (d DependencyError) Unwrap() error
Unwrap returns the original error that caused this error.
type ErrInvalidInput ¶ added in v0.4.0
type ErrInvalidInput struct {
Cause error
}
ErrInvalidInput indicates that the input data is invalid because it does not match the declared schema.
func (ErrInvalidInput) Error ¶ added in v0.4.0
func (e ErrInvalidInput) Error() string
func (ErrInvalidInput) Unwrap ¶ added in v0.4.0
func (e ErrInvalidInput) Unwrap() error
type ErrInvalidInputYAML ¶ added in v0.4.0
type ErrInvalidInputYAML struct {
Cause error
}
ErrInvalidInputYAML indicates that the input YAML is syntactically invalid.
func (ErrInvalidInputYAML) Error ¶ added in v0.4.0
func (e ErrInvalidInputYAML) Error() string
func (ErrInvalidInputYAML) Unwrap ¶ added in v0.4.0
func (e ErrInvalidInputYAML) Unwrap() error
type ErrInvalidState ¶ added in v0.8.0
type ErrInvalidState struct {
// contains filtered or unexported fields
}
ErrInvalidState indicates that the workflow failed due to an invalid state.
func (ErrInvalidState) Error ¶ added in v0.8.0
func (e ErrInvalidState) Error() string
Error returns an explanation on why the error happened.
type ErrInvalidWorkflow ¶ added in v0.4.0
type ErrInvalidWorkflow struct {
Cause error
}
ErrInvalidWorkflow indicates that the workflow structure was invalid.
func (ErrInvalidWorkflow) Error ¶ added in v0.4.0
func (e ErrInvalidWorkflow) Error() string
func (ErrInvalidWorkflow) Unwrap ¶ added in v0.4.0
func (e ErrInvalidWorkflow) Unwrap() error
type ErrInvalidWorkflowYAML ¶ added in v0.4.0
type ErrInvalidWorkflowYAML struct {
Cause error
}
ErrInvalidWorkflowYAML signals an invalid YAML in the workflow file.
func (ErrInvalidWorkflowYAML) Error ¶ added in v0.4.0
func (e ErrInvalidWorkflowYAML) Error() string
func (ErrInvalidWorkflowYAML) Unwrap ¶ added in v0.4.0
func (e ErrInvalidWorkflowYAML) Unwrap() error
type ErrNoMorePossibleOutputs ¶ added in v0.19.1
type ErrNoMorePossibleOutputs struct {
// contains filtered or unexported fields
}
ErrNoMorePossibleOutputs indicates that the workflow has terminated due to it being impossible to resolve an output. This means that steps that the output(s) depended on did not have the required results.
func (ErrNoMorePossibleOutputs) Error ¶ added in v0.19.1
func (e ErrNoMorePossibleOutputs) Error() string
Error returns an explanation on why the error happened.
type ErrNoMorePossibleSteps ¶ added in v0.4.0
type ErrNoMorePossibleSteps struct {
// contains filtered or unexported fields
}
ErrNoMorePossibleSteps indicates that the workflow has finished, but the output cannot be constructed.
func (ErrNoMorePossibleSteps) Error ¶ added in v0.4.0
func (e ErrNoMorePossibleSteps) Error() string
Error returns an explanation on why the error happened.
type ExecutableWorkflow ¶ added in v0.4.0
type ExecutableWorkflow interface { // Input returns the input schema of the workflow. Input() schema.Scope // DAG returns the directed acyclic graph for this workflow. DAG() dgraph.DirectedGraph[*DAGItem] // Execute runs a workflow until it finishes or until the context expires with the specified input. The input // must only contain primitives (float, int, bool, string, map, slice) and may not contain structs and other // elements. The output will consist of the output ID, the returned output data corresponding to the output IDs // schema, or if an error happened, the error. Execute(ctx context.Context, serializedInput any) (outputID string, outputData any, err error) // OutputSchema returns the schema for the possible outputs of this workflow. OutputSchema() map[string]*schema.StepOutputSchema // Namespaces returns a namespaced collection of objects for the inputs // and outputs of each stage in the step's lifecycles. // It maps namespace id (path) to object id to object schema. Namespaces() map[string]map[string]*schema.ObjectSchema }
ExecutableWorkflow is a workflow that has been prepared by the executor and is ready to be run.
type Executor ¶ added in v0.4.0
type Executor interface { // Prepare processes the workflow for execution. The workflow parameter should contain the workflow description, // where each step input consists only of primitive types or expressions. The workflowContext variable should // contain all files from around the workflow, which will allow for evaluating expression functions against these // additional files. Prepare( workflow *Workflow, workflowContext map[string][]byte, ) (ExecutableWorkflow, error) }
Executor is a tool to execute workflows.
Example ¶
package main import ( "context" "fmt" "go.flow.arcalot.io/engine/config" "go.flow.arcalot.io/engine/internal/builtinfunctions" "go.arcalot.io/lang" "go.arcalot.io/log/v2" "go.flow.arcalot.io/engine/internal/step/dummy" "go.flow.arcalot.io/engine/internal/step/registry" "go.flow.arcalot.io/engine/workflow" ) var workflowYAML = `--- version: v0.2.0 input: root: RootObject objects: RootObject: id: RootObject properties: name: type: type_id: string steps: say_hi: kind: dummy name: !expr $.input.name outputs: success: message: !expr $.steps.say_hi.greet.success.message ` func main() { logger := log.NewLogger(log.LevelDebug, log.NewGoLogWriter()) stepRegistry := lang.Must2(registry.New( dummy.New(), )) executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry, builtinfunctions.GetFunctions()) if err != nil { panic(err) } yamlConverter := workflow.NewYAMLConverter(stepRegistry) decodedWorkflow, err := yamlConverter.FromYAML([]byte(workflowYAML)) if err != nil { panic(err) } preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil) if err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{ "name": "Arca Lot", }) if err != nil { panic(err) } fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"]) }
Output: success: Hello Arca Lot!
type Workflow ¶
type Workflow struct { // Version determines which set of the arcaflow workflow external interface will be used in the workflow. Version string `json:"version"` // Input describe the input schema for a workflow. These values can be referenced from expressions. The structure // must be a scope described in primitive types. This is done so later on a forward reference to a step input can // be used. Input any `json:"input"` // Steps contains the possible steps in this workflow. The data set must contain a valid step structure where the // inputs to stages may consist only of primitive types and expressions. Steps map[string]any `json:"steps"` // Outputs lets you define one or more outputs. The outputs should be keyed by their output ID (e.g. "success") and // the value should be the data you wish to output. The data may contain expressions to construct the output. Outputs map[string]any `json:"outputs"` // OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and // expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as // per the Arcaflow schema. OutputSchema map[string]*schema.StepOutputSchema `json:"outputSchema"` // Output is the legacy way to define a single output. It conflicts the "outputs" field and if filled, will create a // "success" output. // // Deprecated: use Outputs instead. Output any `json:"output"` }
Workflow is the primary data structure describing workflows.
type YAMLConverter ¶ added in v0.4.0
YAMLConverter converts a raw YAML into a usable workflow.
func NewYAMLConverter ¶ added in v0.4.0
func NewYAMLConverter(stepRegistry step.Registry) YAMLConverter
NewYAMLConverter creates a YAMLConverter.