workflow

package
v0.20.0-rc.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package workflow provides the workflow execution engine.

Index

Examples

Constants

View Source
const (
	// WorkflowInputKey is the key in the workflow map for input.
	WorkflowInputKey = "input"
	// WorkflowStepsKey is the key in the workflow map for the steps.
	WorkflowStepsKey = "steps"
)
View Source
const OrDisabledTag = "!ordisabled"

OrDisabledTag is the yaml tag to specify that the following code should be interpreted as a `oneof` type with two possible outputs: the expr specified or the disabled output.

View Source
const SoftOptionalTag = "!soft-optional"

SoftOptionalTag is the tag to specify that the field, when used in an object, should be optional with an optional DAG dependency.

View Source
const WaitOptionalTag = "!wait-optional"

WaitOptionalTag is the tag to specify that the field, when used in an object, should be optional with a completion DAG dependency.

View Source
const YamlDiscriminatorKey = "discriminator"

YamlDiscriminatorKey is the key to specify the discriminator inside a !oneof section.

View Source
const YamlExprTag = "!expr"

YamlExprTag is the key to specify that the following code should be interpreted as an expression.

View Source
const YamlOneOfKey = "one_of"

YamlOneOfKey is the key to specify the oneof options within a !oneof section.

View Source
const YamlOneOfTag = "!oneof"

YamlOneOfTag is the yaml tag that allows the section to be interpreted as a OneOf.

Variables

View Source
var ErrEmptyWorkflowFile = fmt.Errorf("empty workflow file provided in context")

ErrEmptyWorkflowFile signals that the workflow file was provided, but it was empty.

View Source
var ErrNoSteps = fmt.Errorf("no steps defined in workflow")

ErrNoSteps signals that the workflow has no steps defined.

View Source
var ErrWorkflowAborted = fmt.Errorf("workflow execution aborted")

ErrWorkflowAborted indicates that the workflow execution was intentionally aborted.

Functions

func BuildNamespaces added in v0.19.1

func BuildNamespaces(stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema]) map[string]map[string]*schema.ObjectSchema

BuildNamespaces creates a namespaced collection of objects for the inputs and outputs of each stage in the step's lifecycles. It maps namespace id (path) to object id to object schema.

func GetOutputNodeID added in v0.4.0

func GetOutputNodeID(stepID string, stageID string, outputID string) string

GetOutputNodeID returns the DAG node ID for a stage output.

func GetSchema

func GetSchema() *schema.TypedScopeSchema[*Workflow]

GetSchema returns the entire workflow schema.

func GetStageNodeID added in v0.4.0

func GetStageNodeID(stepID string, stageID string) string

GetStageNodeID returns the DAG node ID for a stage.

func PrintObjectNamespaceTable added in v0.19.1

func PrintObjectNamespaceTable(output io.Writer, allNamespaces map[string]map[string]*schema.ObjectSchema, logger log.Logger)

PrintObjectNamespaceTable constructs and writes a tidy table of workflow Objects and their namespaces to the given output destination.

Types

type DAGItem added in v0.4.0

type DAGItem struct {
	// Kind discriminates between the input and step nodes.
	Kind DAGItemKind
	// StepID is only filled for step types.
	StepID string
	// StageID is the stage of the step provider this item refers to.
	StageID string
	// OutputID is the ID of the output of the step stage.
	OutputID string
	// OutputSchema contains the output-specific schema for this item.
	OutputSchema schema.StepOutput
	// Data is the processed input containing expressions.
	Data any
	// DataSchema is the corresponding schema for the Data once the expressions are resolved.
	DataSchema schema.Type
	// Provider is the runnable step from the step provider that can be executed.
	Provider step.RunnableStep
}

DAGItem is the internal structure of the DAG.

func (DAGItem) String added in v0.4.0

func (d DAGItem) String() string

String provides an identifier for this DAG item constructed from the contents.

type DAGItemKind added in v0.4.0

type DAGItemKind string

DAGItemKind is the type of DAG node items.

const (
	// DAGItemKindInput indicates a DAG node for the workflow input.
	DAGItemKindInput DAGItemKind = "input"
	// DAGItemKindStepStage indicates a DAG node for a stage.
	DAGItemKindStepStage DAGItemKind = "stepStage"
	// DAGItemKindStepStageOutput indicates a DAG node for an output of a stage.
	DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput"
	// DAGItemKindOutput indicates a DAG node for the workflow output.
	DAGItemKindOutput DAGItemKind = "output"
	// DagItemKindDependencyGroup indicates a DAG node used to complete a part of
	// an input or output that needs dependencies grouped, typically for OR dependencies
	// or optional dependencies.
	DagItemKindDependencyGroup DAGItemKind = "dependencyGroup"
)

type DependencyError added in v0.4.0

type DependencyError struct {
	ID      string   `json:"id"`
	Path    []string `json:"path"`
	Message string   `json:"message"`
	Cause   error    `json:"cause"`
}

DependencyError describes an error while preparing dependencies.

func (DependencyError) Error added in v0.4.0

func (d DependencyError) Error() string

Error returns the error message.

func (DependencyError) Unwrap added in v0.4.0

func (d DependencyError) Unwrap() error

Unwrap returns the original error that caused this error.

type ErrInvalidInput added in v0.4.0

type ErrInvalidInput struct {
	Cause error
}

ErrInvalidInput indicates that the input data is invalid because it does not match the declared schema.

func (ErrInvalidInput) Error added in v0.4.0

func (e ErrInvalidInput) Error() string

func (ErrInvalidInput) Unwrap added in v0.4.0

func (e ErrInvalidInput) Unwrap() error

type ErrInvalidInputYAML added in v0.4.0

type ErrInvalidInputYAML struct {
	Cause error
}

ErrInvalidInputYAML indicates that the input YAML is syntactically invalid.

func (ErrInvalidInputYAML) Error added in v0.4.0

func (e ErrInvalidInputYAML) Error() string

func (ErrInvalidInputYAML) Unwrap added in v0.4.0

func (e ErrInvalidInputYAML) Unwrap() error

type ErrInvalidState added in v0.8.0

type ErrInvalidState struct {
	// contains filtered or unexported fields
}

ErrInvalidState indicates that the workflow failed due to an invalid state.

func (ErrInvalidState) Error added in v0.8.0

func (e ErrInvalidState) Error() string

Error returns an explanation on why the error happened.

type ErrInvalidWorkflow added in v0.4.0

type ErrInvalidWorkflow struct {
	Cause error
}

ErrInvalidWorkflow indicates that the workflow structure was invalid.

func (ErrInvalidWorkflow) Error added in v0.4.0

func (e ErrInvalidWorkflow) Error() string

func (ErrInvalidWorkflow) Unwrap added in v0.4.0

func (e ErrInvalidWorkflow) Unwrap() error

type ErrInvalidWorkflowYAML added in v0.4.0

type ErrInvalidWorkflowYAML struct {
	Cause error
}

ErrInvalidWorkflowYAML signals an invalid YAML in the workflow file.

func (ErrInvalidWorkflowYAML) Error added in v0.4.0

func (e ErrInvalidWorkflowYAML) Error() string

func (ErrInvalidWorkflowYAML) Unwrap added in v0.4.0

func (e ErrInvalidWorkflowYAML) Unwrap() error

type ErrNoMorePossibleOutputs added in v0.19.1

type ErrNoMorePossibleOutputs struct {
	// contains filtered or unexported fields
}

ErrNoMorePossibleOutputs indicates that the workflow has terminated due to it being impossible to resolve an output. This means that steps that the output(s) depended on did not have the required results.

func (ErrNoMorePossibleOutputs) Error added in v0.19.1

func (e ErrNoMorePossibleOutputs) Error() string

Error returns an explanation on why the error happened.

type ErrNoMorePossibleSteps added in v0.4.0

type ErrNoMorePossibleSteps struct {
	// contains filtered or unexported fields
}

ErrNoMorePossibleSteps indicates that the workflow has finished, but the output cannot be constructed.

func (ErrNoMorePossibleSteps) Error added in v0.4.0

func (e ErrNoMorePossibleSteps) Error() string

Error returns an explanation on why the error happened.

type ExecutableWorkflow added in v0.4.0

type ExecutableWorkflow interface {
	// Input returns the input schema of the workflow.
	Input() schema.Scope

	// DAG returns the directed acyclic graph for this workflow.
	DAG() dgraph.DirectedGraph[*DAGItem]

	// Execute runs a workflow until it finishes or until the context expires with the specified input. The input
	// must only contain primitives (float, int, bool, string, map, slice) and may not contain structs and other
	// elements. The output will consist of the output ID, the returned output data corresponding to the output IDs
	// schema, or if an error happened, the error.
	Execute(ctx context.Context, serializedInput any) (outputID string, outputData any, err error)

	// OutputSchema returns the schema for the possible outputs of this workflow.
	OutputSchema() map[string]*schema.StepOutputSchema

	// Namespaces returns a namespaced collection of objects for the inputs
	// and outputs of each stage in the step's lifecycles.
	// It maps namespace id (path) to object id to object schema.
	Namespaces() map[string]map[string]*schema.ObjectSchema
}

ExecutableWorkflow is a workflow that has been prepared by the executor and is ready to be run.

type Executor added in v0.4.0

type Executor interface {
	// Prepare processes the workflow for execution. The workflow parameter should contain the workflow description,
	// where each step input consists only of primitive types or expressions. The workflowContext variable should
	// contain all files from around the workflow, which will allow for evaluating expression functions against these
	// additional files.
	Prepare(
		workflow *Workflow,
		workflowContext map[string][]byte,
	) (ExecutableWorkflow, error)
}

Executor is a tool to execute workflows.

Example
package main

import (
	"context"
	"fmt"
	"go.flow.arcalot.io/engine/config"
	"go.flow.arcalot.io/engine/internal/builtinfunctions"

	"go.arcalot.io/lang"
	"go.arcalot.io/log/v2"
	"go.flow.arcalot.io/engine/internal/step/dummy"
	"go.flow.arcalot.io/engine/internal/step/registry"
	"go.flow.arcalot.io/engine/workflow"
)

var workflowYAML = `---
version: v0.2.0
input:
  root: RootObject
  objects:
    RootObject:
      id: RootObject
      properties:
        name:
          type:
            type_id: string
steps:
  say_hi:
    kind: dummy
    name: !expr $.input.name
outputs:
  success:
    message: !expr $.steps.say_hi.greet.success.message
`

func main() {
	logger := log.NewLogger(log.LevelDebug, log.NewGoLogWriter())
	stepRegistry := lang.Must2(registry.New(
		dummy.New(),
	))

	executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry, builtinfunctions.GetFunctions())
	if err != nil {
		panic(err)
	}

	yamlConverter := workflow.NewYAMLConverter(stepRegistry)
	decodedWorkflow, err := yamlConverter.FromYAML([]byte(workflowYAML))
	if err != nil {
		panic(err)
	}

	preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil)
	if err != nil {
		panic(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{
		"name": "Arca Lot",
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"])
}
Output:

success: Hello Arca Lot!

func NewExecutor added in v0.4.0

func NewExecutor(
	logger log.Logger,
	config *config.Config,
	stepRegistry step.Registry,
	callableFunctions map[string]schema.CallableFunction,
) (Executor, error)

NewExecutor creates a new executor instance for workflows.

type Workflow

type Workflow struct {
	// Version determines which set of the arcaflow workflow external interface will be used in the workflow.
	Version string `json:"version"`
	// Input describe the input schema for a workflow. These values can be referenced from expressions. The structure
	// must be a scope described in primitive types. This is done so later on a forward reference to a step input can
	// be used.
	Input any `json:"input"`
	// Steps contains the possible steps in this workflow. The data set must contain a valid step structure where the
	// inputs to stages may consist only of primitive types and expressions.
	Steps map[string]any `json:"steps"`
	// Outputs lets you define one or more outputs. The outputs should be keyed by their output ID (e.g. "success") and
	// the value should be the data you wish to output. The data may contain expressions to construct the output.
	Outputs map[string]any `json:"outputs"`
	// OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and
	// expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as
	// per the Arcaflow schema.
	OutputSchema map[string]*schema.StepOutputSchema `json:"outputSchema"`
	// Output is the legacy way to define a single output. It conflicts the "outputs" field and if filled, will create a
	// "success" output.
	//
	// Deprecated: use Outputs instead.
	Output any `json:"output"`
}

Workflow is the primary data structure describing workflows.

type YAMLConverter added in v0.4.0

type YAMLConverter interface {
	FromYAML(data []byte) (*Workflow, error)
}

YAMLConverter converts a raw YAML into a usable workflow.

func NewYAMLConverter added in v0.4.0

func NewYAMLConverter(stepRegistry step.Registry) YAMLConverter

NewYAMLConverter creates a YAMLConverter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL