foreach

package
v0.20.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

Foreach step provider

This provider allows you to loop over a list of inputs and execute (potentially parallel) workflows for each item. The subworkflows must only have one possible output named "success" and this output will be collected into a list as a result.

Usage

steps:
  your_step:
    kind: foreach
    workflow: some_workflow_file.yaml # This must be in the workflow directory
    items: !expr $.input.some_list_of_items
    parallelism: 5 # How many workflows to run in parallel
output:
  result: !expr $.steps.your_step.outputs.success.data # This will be a list of result objects
Handling errors

In case one or more subworkflows exit with an error, you can also recover.

output:
  result: !expr $.steps.your_step.failed.error.data # This will be a map of int keys to provide the subworkflows with a successful execution.
  errors: !expr $.steps.your_step.failed.error.messages # This will be a map of int to error messages for the subworkflows that failed.

Documentation

Overview

Package foreach provides the ability to loop over items.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(
	logger log.Logger,
	yamlParserFactory func() (workflow.YAMLConverter, error),
	executorFactory func(logger log.Logger) (workflow.Executor, error),
) (step.Provider, error)

New creates a new loop provider.

Example

ExampleNew provides an example for using the foreach provider to run subworkflows.

package main

import (
	"context"
	"fmt"
	"go.arcalot.io/lang"
	"go.arcalot.io/log/v2"
	"go.flow.arcalot.io/engine/config"
	"go.flow.arcalot.io/engine/internal/builtinfunctions"
	"go.flow.arcalot.io/engine/internal/step"
	"go.flow.arcalot.io/engine/internal/step/dummy"
	"go.flow.arcalot.io/engine/internal/step/foreach"
	"go.flow.arcalot.io/engine/internal/step/registry"
	"go.flow.arcalot.io/engine/workflow"
)

// mainWorkflow is the workflow calling the foreach step.
var mainWorkflow = `
version: v0.2.0
input:
  root: names
  objects:
    names:
      id: names
      properties:
        names:
          type:
            type_id: list
            items:
              type_id: object
              id: name
              properties:
                name:
                  type:
                    type_id: string
steps:
  greet:
    kind: foreach
    items: !expr $.input.names
    workflow: subworkflow.yaml
    parallelism: 5
outputs:
  success:
    messages: !expr $.steps.greet.outputs.success.data
  failed:
    error: !expr $.steps.greet.failed.error
`

var subworkflow = `
version: v0.2.0
input:
  root: name
  objects:
    name:
      id: name
      properties:
        name:
          type:
            type_id: string
steps:
  say_hi:
    kind: dummy
    name: !expr $.input.name
outputs:
  success:
    message: !expr $.steps.say_hi.greet.success.message
  error:
    reason: !expr $.steps.say_hi.greet.error.reason
`

// ExampleNew provides an example for using the foreach provider to run subworkflows.
func main() {
	logConfig := log.Config{
		Level:       log.LevelError,
		Destination: log.DestinationStdout,
	}
	logger := log.New(
		logConfig,
	)
	cfg := &config.Config{
		Log: logConfig,
	}
	factories := workflowFactory{
		config: cfg,
	}
	stepRegistry := lang.Must2(registry.New(
		dummy.New(),
		lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
	))
	factories.stepRegistry = stepRegistry
	executor := lang.Must2(workflow.NewExecutor(
		logger,
		cfg,
		stepRegistry,
		builtinfunctions.GetFunctions(),
	))
	wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(mainWorkflow)))
	preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
		"subworkflow.yaml": []byte(subworkflow),
	}))
	outputID, outputData := lang.Must3(preparedWorkflow.Execute(context.Background(), map[string]any{
		"names": []any{
			map[string]any{
				"name": "Arca",
			},
			map[string]any{
				"name": "Lot",
			},
		},
	},
	))
	if outputID != "success" {
		panic(fmt.Errorf("workflow run failed"))
	}
	data := outputData.(map[any]any)["messages"].([]any)
	for _, entry := range data {
		fmt.Printf("%s ", entry.(map[any]any)["message"])
	}
	fmt.Println()
}

type workflowFactory struct {
	stepRegistry step.Registry
	config       *config.Config
}

func (f *workflowFactory) createYAMLParser() (workflow.YAMLConverter, error) {
	stepR := f.stepRegistry
	if stepR == nil {
		return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
	}
	return workflow.NewYAMLConverter(stepR), nil
}

func (f *workflowFactory) createWorkflow(logger log.Logger) (workflow.Executor, error) {
	stepR := f.stepRegistry
	if stepR == nil {
		return nil, fmt.Errorf("YAML converter not available yet, please call the factory function after the engine has initialized")
	}
	return workflow.NewExecutor(logger, f.config, stepR, builtinfunctions.GetFunctions())
}
Output:

Hello Arca! Hello Lot!

Types

type StageID

type StageID string

StageID contains the identifiers for the stages.

const (
	// StageIDExecute is executing the subworkflow.
	StageIDExecute StageID = "execute"
	// StageIDOutputs is providing the output data of the subworkflow.
	StageIDOutputs StageID = "outputs"
	// StageIDFailed is providing the error reason from the subworkflow.
	StageIDFailed StageID = "failed"
	// StageIDEnabling is a stage that indicates that the step is waiting to be enabled.
	// This is required to be separate to ensure that it exits immediately if disabled.
	StageIDEnabling StageID = "enabling"
	// StageIDDisabled is indicating that the step was disabled.
	StageIDDisabled StageID = "disabled"
	// StageIDClosed is a stage that indicates that the workflow has exited or did not start
	// due to workflow termination or step cancellation.
	StageIDClosed StageID = "closed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL