Documentation
¶
Overview ¶
Package workflow provides the workflow execution engine.
Index ¶
- Variables
- func GetOutputNodeID(stepID string, stageID string, outputID string) string
- func GetStageNodeID(stepID string, stageID string) string
- type DAGItem
- type DAGItemKind
- type DependencyError
- type ErrInvalidInput
- type ErrInvalidInputYAML
- type ErrInvalidWorkflow
- type ErrInvalidWorkflowYAML
- type ErrNoMorePossibleSteps
- type ExecutableWorkflow
- type Executor
- type Workflow
- type YAMLConverter
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmptyWorkflowFile = fmt.Errorf("empty workflow file provided in context")
ErrEmptyWorkflowFile signals that the workflow file was provided, but it was empty.
var ErrNoSteps = fmt.Errorf("no steps defined in workflow")
ErrNoSteps signals that the workflow has no steps defined.
var ErrWorkflowAborted = fmt.Errorf("workflow execution aborted")
ErrWorkflowAborted indicates that the workflow execution was intentionally aborted.
Functions ¶
func GetOutputNodeID ¶ added in v0.4.0
GetOutputNodeID returns the DAG node ID for a stage output.
func GetStageNodeID ¶ added in v0.4.0
GetStageNodeID returns the DAG node ID for a stage.
Types ¶
type DAGItem ¶ added in v0.4.0
type DAGItem struct { // Kind discriminates between the input and step nodes. Kind DAGItemKind // StepID is only filled for step types. StepID string // StageID is the stage of the step provider this item refers to. StageID string // OutputID is the ID of the output of the step stage. OutputID string // OutputSchema contains the output-specific schema for this item. OutputSchema schema.StepOutput // Data is the processed input containing expressions. Data any // DataSchema is the corresponding schema for the Data once the expressions are resolved. DataSchema schema.Type // Provider is the runnable step from the step provider that can be executed. Provider step.RunnableStep }
DAGItem is the internal structure of the DAG.
type DAGItemKind ¶ added in v0.4.0
type DAGItemKind string
DAGItemKind is the type of DAG node items.
const ( // DAGItemKindInput indicates a DAG node for the workflow input. DAGItemKindInput DAGItemKind = "input" // DAGItemKindStepStage indicates a DAG node for a stage. DAGItemKindStepStage DAGItemKind = "stepStage" // DAGItemKindStepStageOutput indicates a DAG node for an output of a stage. DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput" // DAGItemKindOutput indicates a DAG node for the workflow output. DAGItemKindOutput DAGItemKind = "output" )
type DependencyError ¶ added in v0.4.0
type DependencyError struct { ID string `json:"id"` Path []string `json:"path"` Message string `json:"message"` Cause error `json:"cause"` }
DependencyError describes an error while preparing dependencies.
func (DependencyError) Error ¶ added in v0.4.0
func (d DependencyError) Error() string
Error returns the error message.
func (DependencyError) Unwrap ¶ added in v0.4.0
func (d DependencyError) Unwrap() error
Unwrap returns the original error that caused this error.
type ErrInvalidInput ¶ added in v0.4.0
type ErrInvalidInput struct {
Cause error
}
ErrInvalidInput indicates that the input data is invalid because it does not match the declared schema.
func (ErrInvalidInput) Error ¶ added in v0.4.0
func (e ErrInvalidInput) Error() string
func (ErrInvalidInput) Unwrap ¶ added in v0.4.0
func (e ErrInvalidInput) Unwrap() error
type ErrInvalidInputYAML ¶ added in v0.4.0
type ErrInvalidInputYAML struct {
Cause error
}
ErrInvalidInputYAML indicates that the input YAML is syntactically invalid.
func (ErrInvalidInputYAML) Error ¶ added in v0.4.0
func (e ErrInvalidInputYAML) Error() string
func (ErrInvalidInputYAML) Unwrap ¶ added in v0.4.0
func (e ErrInvalidInputYAML) Unwrap() error
type ErrInvalidWorkflow ¶ added in v0.4.0
type ErrInvalidWorkflow struct {
Cause error
}
ErrInvalidWorkflow indicates that the workflow structure was invalid.
func (ErrInvalidWorkflow) Error ¶ added in v0.4.0
func (e ErrInvalidWorkflow) Error() string
func (ErrInvalidWorkflow) Unwrap ¶ added in v0.4.0
func (e ErrInvalidWorkflow) Unwrap() error
type ErrInvalidWorkflowYAML ¶ added in v0.4.0
type ErrInvalidWorkflowYAML struct {
Cause error
}
ErrInvalidWorkflowYAML signals an invalid YAML in the workflow file.
func (ErrInvalidWorkflowYAML) Error ¶ added in v0.4.0
func (e ErrInvalidWorkflowYAML) Error() string
func (ErrInvalidWorkflowYAML) Unwrap ¶ added in v0.4.0
func (e ErrInvalidWorkflowYAML) Unwrap() error
type ErrNoMorePossibleSteps ¶ added in v0.4.0
type ErrNoMorePossibleSteps struct {
// contains filtered or unexported fields
}
ErrNoMorePossibleSteps indicates that the workflow has finished, but the output cannot be constructed.
func (ErrNoMorePossibleSteps) Error ¶ added in v0.4.0
func (e ErrNoMorePossibleSteps) Error() string
Error returns an explanation on why the error happened.
type ExecutableWorkflow ¶ added in v0.4.0
type ExecutableWorkflow interface { // Input returns the input schema of the workflow. Input() schema.Scope // DAG returns the directed acyclic graph for this workflow. DAG() dgraph.DirectedGraph[*DAGItem] // Execute runs a workflow until it finishes or until the context expires with the specified input. The input // must only contain primitives (float, int, bool, string, map, slice) and may not contain structs and other // elements. The output will consist of the output ID, the returned output data corresponding to the output IDs // schema, or if an error happened, the error. Execute( ctx context.Context, input any, ) (outputID string, outputData any, err error) // OutputSchema returns the schema for the possible outputs of this workflow. OutputSchema() map[string]*schema.StepOutputSchema }
ExecutableWorkflow is a workflow that has been prepared by the executor and is ready to be run.
type Executor ¶ added in v0.4.0
type Executor interface { // Prepare processes the workflow for execution. The workflow parameter should contain the workflow description, // where each step input consists only of primitive types or expressions. The workflowContext variable should // contain all files from around the workflow, which will allow for evaluating expression functions against these // additional files. Prepare( workflow *Workflow, workflowContext map[string][]byte, ) (ExecutableWorkflow, error) }
Executor is a tool to execute workflows.
Example ¶
package main import ( "context" "fmt" "go.flow.arcalot.io/engine/config" "go.arcalot.io/lang" "go.arcalot.io/log/v2" "go.flow.arcalot.io/engine/internal/step/dummy" "go.flow.arcalot.io/engine/internal/step/registry" "go.flow.arcalot.io/engine/workflow" ) var workflowYAML = `--- input: root: RootObject objects: RootObject: id: RootObject properties: name: type: type_id: string steps: say_hi: kind: dummy name: !expr $.input.name outputs: success: message: !expr $.steps.say_hi.greet.success.message ` func main() { logger := log.NewLogger(log.LevelDebug, log.NewGoLogWriter()) stepRegistry := lang.Must2(registry.New( dummy.New(), )) executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry) if err != nil { panic(err) } yamlConverter := workflow.NewYAMLConverter(stepRegistry) decodedWorkflow, err := yamlConverter.FromYAML([]byte(workflowYAML)) if err != nil { panic(err) } preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil) if err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{ "name": "Arca Lot", }) if err != nil { panic(err) } fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"]) }
Output: success: Hello Arca Lot!
type Workflow ¶
type Workflow struct { // Input describe the input schema for a workflow. These values can be referenced from expressions. The structure // must be a scope described in primitive types. This is done so later on a forward reference to a step input can // be used. Input any `json:"input"` // Steps contains the possible steps in this workflow. The data set must contain a valid step structure where the // inputs to stages may consist only of primitive types and expressions. Steps map[string]any `json:"steps"` // Outputs lets you define one or more outputs. The outputs should be keyed by their output ID (e.g. "success") and // the value should be the data you wish to output. The data may contain expressions to construct the output. Outputs map[string]any `json:"outputs"` // OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and // expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as // per the Arcaflow schema. OutputSchema map[string]any `json:"outputSchema"` // Output is the legay way to define a single output. It conflicts the "outputs" field and if filled, will create a // "success" output. // // Deprecated: use Outputs instead. Output any `json:"output"` }
Workflow is the primary data structure describing workflows.
type YAMLConverter ¶ added in v0.4.0
YAMLConverter converts a raw YAML into a usable workflow.
func NewYAMLConverter ¶ added in v0.4.0
func NewYAMLConverter(stepRegistry step.Registry) YAMLConverter
NewYAMLConverter creates a YAMLConverter.