workflow

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package workflow provides the workflow execution engine.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmptyWorkflowFile = fmt.Errorf("empty workflow file provided in context")

ErrEmptyWorkflowFile signals that the workflow file was provided, but it was empty.

View Source
var ErrNoSteps = fmt.Errorf("no steps defined in workflow")

ErrNoSteps signals that the workflow has no steps defined.

View Source
var ErrWorkflowAborted = fmt.Errorf("workflow execution aborted")

ErrWorkflowAborted indicates that the workflow execution was intentionally aborted.

Functions

func GetOutputNodeID added in v0.4.0

func GetOutputNodeID(stepID string, stageID string, outputID string) string

GetOutputNodeID returns the DAG node ID for a stage output.

func GetStageNodeID added in v0.4.0

func GetStageNodeID(stepID string, stageID string) string

GetStageNodeID returns the DAG node ID for a stage.

Types

type DAGItem added in v0.4.0

type DAGItem struct {
	// Kind discriminates between the input and step nodes.
	Kind DAGItemKind
	// StepID is only filled for step types.
	StepID string
	// StageID is the stage of the step provider this item refers to.
	StageID string
	// OutputID is the ID of the output of the step stage.
	OutputID string
	// OutputSchema contains the output-specific schema for this item.
	OutputSchema schema.StepOutput
	// Data is the processed input containing expressions.
	Data any
	// DataSchema is the corresponding schema for the Data once the expressions are resolved.
	DataSchema schema.Type
	// Provider is the runnable step from the step provider that can be executed.
	Provider step.RunnableStep
}

DAGItem is the internal structure of the DAG.

func (DAGItem) String added in v0.4.0

func (d DAGItem) String() string

String provides an identifier for this DAG item constructed from the contents.

type DAGItemKind added in v0.4.0

type DAGItemKind string

DAGItemKind is the type of DAG node items.

const (
	// DAGItemKindInput indicates a DAG node for the workflow input.
	DAGItemKindInput DAGItemKind = "input"
	// DAGItemKindStepStage indicates a DAG node for a stage.
	DAGItemKindStepStage DAGItemKind = "stepStage"
	// DAGItemKindStepStageOutput indicates a DAG node for an output of a stage.
	DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput"
	// DAGItemKindOutput indicates a DAG node for the workflow output.
	DAGItemKindOutput DAGItemKind = "output"
)

type DependencyError added in v0.4.0

type DependencyError struct {
	ID      string   `json:"id"`
	Path    []string `json:"path"`
	Message string   `json:"message"`
	Cause   error    `json:"cause"`
}

DependencyError describes an error while preparing dependencies.

func (DependencyError) Error added in v0.4.0

func (d DependencyError) Error() string

Error returns the error message.

func (DependencyError) Unwrap added in v0.4.0

func (d DependencyError) Unwrap() error

Unwrap returns the original error that caused this error.

type ErrInvalidInput added in v0.4.0

type ErrInvalidInput struct {
	Cause error
}

ErrInvalidInput indicates that the input data is invalid because it does not match the declared schema.

func (ErrInvalidInput) Error added in v0.4.0

func (e ErrInvalidInput) Error() string

func (ErrInvalidInput) Unwrap added in v0.4.0

func (e ErrInvalidInput) Unwrap() error

type ErrInvalidInputYAML added in v0.4.0

type ErrInvalidInputYAML struct {
	Cause error
}

ErrInvalidInputYAML indicates that the input YAML is syntactically invalid.

func (ErrInvalidInputYAML) Error added in v0.4.0

func (e ErrInvalidInputYAML) Error() string

func (ErrInvalidInputYAML) Unwrap added in v0.4.0

func (e ErrInvalidInputYAML) Unwrap() error

type ErrInvalidWorkflow added in v0.4.0

type ErrInvalidWorkflow struct {
	Cause error
}

ErrInvalidWorkflow indicates that the workflow structure was invalid.

func (ErrInvalidWorkflow) Error added in v0.4.0

func (e ErrInvalidWorkflow) Error() string

func (ErrInvalidWorkflow) Unwrap added in v0.4.0

func (e ErrInvalidWorkflow) Unwrap() error

type ErrInvalidWorkflowYAML added in v0.4.0

type ErrInvalidWorkflowYAML struct {
	Cause error
}

ErrInvalidWorkflowYAML signals an invalid YAML in the workflow file.

func (ErrInvalidWorkflowYAML) Error added in v0.4.0

func (e ErrInvalidWorkflowYAML) Error() string

func (ErrInvalidWorkflowYAML) Unwrap added in v0.4.0

func (e ErrInvalidWorkflowYAML) Unwrap() error

type ErrNoMorePossibleSteps added in v0.4.0

type ErrNoMorePossibleSteps struct {
	// contains filtered or unexported fields
}

ErrNoMorePossibleSteps indicates that the workflow has finished, but the output cannot be constructed.

func (ErrNoMorePossibleSteps) Error added in v0.4.0

func (e ErrNoMorePossibleSteps) Error() string

Error returns an explanation on why the error happened.

type ExecutableWorkflow added in v0.4.0

type ExecutableWorkflow interface {
	// Input returns the input schema of the workflow.
	Input() schema.Scope

	// DAG returns the directed acyclic graph for this workflow.
	DAG() dgraph.DirectedGraph[*DAGItem]

	// Execute runs a workflow until it finishes or until the context expires with the specified input. The input
	// must only contain primitives (float, int, bool, string, map, slice) and may not contain structs and other
	// elements. The output will consist of the output ID, the returned output data corresponding to the output IDs
	// schema, or if an error happened, the error.
	Execute(
		ctx context.Context,
		input any,
	) (outputID string, outputData any, err error)

	// OutputSchema returns the schema for the possible outputs of this workflow.
	OutputSchema() map[string]*schema.StepOutputSchema
}

ExecutableWorkflow is a workflow that has been prepared by the executor and is ready to be run.

type Executor added in v0.4.0

type Executor interface {
	// Prepare processes the workflow for execution. The workflow parameter should contain the workflow description,
	// where each step input consists only of primitive types or expressions. The workflowContext variable should
	// contain all files from around the workflow, which will allow for evaluating expression functions against these
	// additional files.
	Prepare(
		workflow *Workflow,
		workflowContext map[string][]byte,
	) (ExecutableWorkflow, error)
}

Executor is a tool to execute workflows.

Example
package main

import (
	"context"
	"fmt"
	"go.flow.arcalot.io/engine/config"

	"go.arcalot.io/lang"
	"go.arcalot.io/log/v2"
	"go.flow.arcalot.io/engine/internal/step/dummy"
	"go.flow.arcalot.io/engine/internal/step/registry"
	"go.flow.arcalot.io/engine/workflow"
)

var workflowYAML = `---
input:
  root: RootObject
  objects:
    RootObject:
      id: RootObject
      properties:
        name:
          type:
            type_id: string
steps:
  say_hi:
    kind: dummy
    name: !expr $.input.name
outputs:
  success:
    message: !expr $.steps.say_hi.greet.success.message
`

func main() {
	logger := log.NewLogger(log.LevelDebug, log.NewGoLogWriter())
	stepRegistry := lang.Must2(registry.New(
		dummy.New(),
	))

	executor, err := workflow.NewExecutor(logger, &config.Config{}, stepRegistry)
	if err != nil {
		panic(err)
	}

	yamlConverter := workflow.NewYAMLConverter(stepRegistry)
	decodedWorkflow, err := yamlConverter.FromYAML([]byte(workflowYAML))
	if err != nil {
		panic(err)
	}

	preparedWorkflow, err := executor.Prepare(decodedWorkflow, nil)
	if err != nil {
		panic(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{
		"name": "Arca Lot",
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("%s: %s\n", outputID, outputData.(map[any]any)["message"])
}
Output:

success: Hello Arca Lot!

func NewExecutor added in v0.4.0

func NewExecutor(
	logger log.Logger,
	config *config.Config,
	stepRegistry step.Registry,
) (Executor, error)

NewExecutor creates a new executor instance for workflows.

type Workflow

type Workflow struct {
	// Input describe the input schema for a workflow. These values can be referenced from expressions. The structure
	// must be a scope described in primitive types. This is done so later on a forward reference to a step input can
	// be used.
	Input any `json:"input"`
	// Steps contains the possible steps in this workflow. The data set must contain a valid step structure where the
	// inputs to stages may consist only of primitive types and expressions.
	Steps map[string]any `json:"steps"`
	// Outputs lets you define one or more outputs. The outputs should be keyed by their output ID (e.g. "success") and
	// the value should be the data you wish to output. The data may contain expressions to construct the output.
	Outputs map[string]any `json:"outputs"`
	// OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and
	// expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as
	// per the Arcaflow schema.
	OutputSchema map[string]any `json:"outputSchema"`
	// Output is the legay way to define a single output. It conflicts the "outputs" field and if filled, will create a
	// "success" output.
	//
	// Deprecated: use Outputs instead.
	Output any `json:"output"`
}

Workflow is the primary data structure describing workflows.

type YAMLConverter added in v0.4.0

type YAMLConverter interface {
	FromYAML(data []byte) (*Workflow, error)
}

YAMLConverter converts a raw YAML into a usable workflow.

func NewYAMLConverter added in v0.4.0

func NewYAMLConverter(stepRegistry step.Registry) YAMLConverter

NewYAMLConverter creates a YAMLConverter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL