step

package
v0.17.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

Arcaflow pluggable step system

This library provides pluggable steps to Arcaflow.

In Arcaflow, a workflow consists of one or more steps that are executed in parallel or in sequence. This library helps provide a uniform API to implement new step types. For example, the plugin step kind runs an Arcaflow plugin using a deployer (e.g. a container engine).

How pluggable steps work

Each pluggable step provider goes through several stages during its life.

First, the provider does not have any configuration. In this state, the provider cannot provide any meaningful schema information other than for the provider configuration itself. For example, when the plugin provider is in this state, it has not yet deployed the plugin and cannot say what options the specific plugin wants.

Second, the provider can perform an action to obtain a "runnable step". In case of the plugin provider, it deploys the plugin using the local deployer and queries its schema. After this, the schema information is available and can be run.

Finally, the step can be run. In a running state the step goes through a set of lifecycle stages, each of which can require an input and can produce an output. For example, the plugin provider will, given the correct input, first be in the configuration stage, then advance to deploying and then running, and finally move to the finished stage.

Creating your own pluggable step

Provider

To create your own pluggable step, you must first implement the Provider interface described in provider.go. This interface describes the provider in its first phase. Here are the functions you must implement:

  • Kind(): this function must return a globally unique value the user will have to supply in the kind field of their workflow.
  • Lifecycle(): this function must return a lifecycle object structure that describes which lifecycle stages the step can go through. You won't have to provide a schema for these lifecycle stages just yet. (See lifecycles below for more details.)
  • ProviderKeyword(): this function must return the keyword the user will have to enter in their workflow file to configure the provider. For example, the plugin provider uses plugin as a provider keyword and the user is expected to provide a plugin container image here. Be careful though, this must be unique within this provider and cannot be the same as any lifecycle keywords!
  • ProviderSchema(): this function returns structure which describes the schema associated with the provider keyword. You must ensure that this schema matches the expected type for the LoadSchema() function.
  • RunKeyword(): this function returns a keyword that holds the configuration when the step is run. For example, the plugin provider uses this to let users specify the step which to run, in case there are multiple steps provided by a plugin.
  • LoadSchema(): this function takes the value from the provider keyword as input and loads the schema. It returns a runnable step, which already holds the schema information about the step being run.
  • Any(): this is a helper function that should always call AnyProvider() to provide an untyped provider.
Lifecycle

The lifecycle is a simple description of the stages your provider goes through. For example, the first stage of the plugin provider looks like this:

var configurationLifecycleStage = step.LifecycleStage{
    // This is how the stage is called when running
    ID:              "configuring",
    // This is how the stage is called when it has finished.
    FinishedKeyword: "configured",
    // Provide the workflow keyword for configuration:
    InputKeyword:    "step",
    // These stages can possibly run next.
    NextStages: []string{
        "deploying", "configuration_failed",
    },
    // True indicates that this stage is an incurable problem in the workflow.
    Fatal: false,
}

You can then set up your lifecycle:

var lifecycle = step.Lifecycle[step.LifecycleStage]{
    InitialStage: "configuring",
    Stages: []step.LifecycleStage{
        configurationLifecycleStage,
        // ...
    },
}

Later on, you will need to provide a lifecycle with schema, which you can do as follows:

var lifecycleStageWithSchema = step.LifecycleStageWithSchema{
    // Embed the lifecycle from before.
    configurationLifecycleStage,
    step.NewPartialProperty[YourParameterType](
        // Add your schema here.
        mySchema,
        // Add a possible display value here.
        nil,
        // This is always required.
        true,
    ),
    map[string]*schema.StepOutputSchema{
        // Add your possible outputs here.
    }
}
Runnable steps

Next, you need to create a runnable step. This must have three functions:

  • RunSchema() provides a schema that describes the properties that are required to run the step provided by this provider. For example, the plugin provider uses this to specify the step a user should choose if the plugin provides more than one.
  • Lifecycle() must return a lifecycle, but this time with the schema information (see above).
  • Start() starts the step. It takes the initial input specified by RunSchema(), as well as a StageChangeHandler that will be notified if the step finishes its current stage in the lifecycle. This callback will also indicate if the step needs more data to proceed to the next step. This will allow you to incrementally feed data to the step. This function must return a running step construct as a result.
Stage change handler

The stage change handler allows you to react to events happening in a stage:

  • OnStageChange() is the callback that notifies the handler of the fact that the previous stage has finished with a specific output. It indicates which next stage the provider moved to, and if it is waiting for input to be provided via ProvideStageInput.
  • OnStepComplete() is called when the step has completed a final stage in its lifecycle and communicates the output.
Running steps

Finally, you will need to implement the running step construct. It has the following functions:

  • ProvideStageInput() gives you the opportunity to provide input for a stage so that it may continue.
  • CurrentStage() returns the stage the step provider is currently in, no matter if it is finished or not.
  • Close() shuts down the step and cleans up the resources associated with the step.

Important things to note

The plugin provider is called from multiple goroutines and must be thread-safe. The stage and state variables MUST be consistent at all times and the provider MUST advance through the states properly (e.g. it must not skip over the "running" state.)

Documentation

Overview

Package step provides the abstract definition of an Arcaflow workflow step. Implementations of this package provide the actual implementation of how the step types are run, such as the "plugin" kind, which executes plugins.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrProviderNotFound

type ErrProviderNotFound struct {
	Kind       string
	ValidKinds []string
}

ErrProviderNotFound is an error indicating that a provider kind was not found.

func (ErrProviderNotFound) Error

func (e ErrProviderNotFound) Error() string

Error returns the error message.

type Lifecycle

type Lifecycle[StageType lifecycleStage] struct {
	// InitialStage contains the first stage this step enters.
	InitialStage string
	// Stages contains the list of stages for this provider.
	Stages []StageType
}

Lifecycle describes the lifecycle of a step. Each stage in the lifecycle can, but is not required to have an input schema and an output schema. The stage can also declare the next possible stages.

This lifecycle information is used to build the dependency tree of this step.

func (Lifecycle[StageType]) DAG

func (l Lifecycle[StageType]) DAG() (dgraph.DirectedGraph[StageType], error)

DAG will return a directed acyclic graph of the lifecycle.

type LifecycleStage

type LifecycleStage struct {
	// ID uniquely identifies the stage within the current provider.
	ID string
	// WaitingName describes this stage when waiting for input.
	WaitingName string
	// RunningName describes this stage when it is running.
	RunningName string
	// FinishedName specifies how to call this stage once it is complete.
	FinishedName string
	// InputFields provides the set of fields containing the input data of this stage. This must match the later
	// lifecycle with schema.
	InputFields map[string]struct{}
	// NextStages describes the possible next stages. The provider advances to one of these stages automatically and
	// will pause if there is no input available.
	// It will automatically create a DAG node between the current and the described next stages to ensure
	// that it is running in order.
	NextStages []string
	// Fatal indicates that this stage should be treated as fatal unless handled by the workflow.
	Fatal bool
}

LifecycleStage is the description of a single stage within a step lifecycle.

func (LifecycleStage) Identifier

func (l LifecycleStage) Identifier() string

Identifier is a helper function for getting the ID.

func (LifecycleStage) NextStageIDs

func (l LifecycleStage) NextStageIDs() []string

NextStageIDs is a helper function that returns the next possible stages.

type LifecycleStageWithSchema

type LifecycleStageWithSchema struct {
	LifecycleStage

	// InputSchema describes the schema for the required input of the current stage. This may be nil if the stage
	// requires no input.
	InputSchema map[string]*schema.PropertySchema
	Outputs     map[string]*schema.StepOutputSchema
}

LifecycleStageWithSchema contains information about the possible outputs of a lifecycle stage. This is available after the step lifecycle has started.

type Provider

type Provider interface {
	// Kind returns the identifier that uniquely identifies this provider.
	// e.g. "plugin"
	Kind() string

	// Lifecycle describes the lifecycle of the step implemented by this provider.
	Lifecycle() Lifecycle[LifecycleStage]

	// ProviderSchema provides the basic schema of the provider that needs to be fulfilled in order to load the schema
	// itself. The returned value must provide the names of fields and their partial properties.
	ProviderSchema() map[string]*schema.PropertySchema

	// RunProperties provides the names of properties needed to run the step. The properties are provided as a set to
	// limit conflicts, but must, nevertheless, not conflict any fields in the ProviderSchema or any lifecycle stage.
	RunProperties() map[string]struct{}

	// LoadSchema prompts this provider to load its schema and return a step that can actually be run. The provided
	// inputs are guaranteed to match the schema returned by ProviderSchema.
	LoadSchema(inputs map[string]any, workflowContext map[string][]byte) (RunnableStep, error)
}

Provider is the description of an item that fits in a workflow. Its implementation provide the basis for workflow execution.

type Registry

type Registry interface {
	// Schema provides a generic schema for all steps.
	Schema() *schema.OneOfSchema[string]
	// SchemaByKind returns the schema of a single provider.
	SchemaByKind(kind string) (schema.Object, error)
	// GetByKind returns a provider by its kind value, or
	GetByKind(kind string) (Provider, error)
	// List returns a map of all step providers mapped by their kind values.
	List() map[string]Provider
}

Registry holds the providers for possible steps in workflows. The registry must call Register() on each Provider immediately after creation.

type RunnableStep

type RunnableStep interface {
	// Lifecycle describes the lifecycle of this step. The data provided is guaranteed to match the RunSchema.
	Lifecycle(input map[string]any) (Lifecycle[LifecycleStageWithSchema], error)
	// RunSchema provides a schema that describes the properties that are required to run the step provided by this
	// provider. These fields must match the RunProperties from the Provider.
	RunSchema() map[string]*schema.PropertySchema
	// Start begins the step execution. This does not necessarily mean that any action is happening, but the step
	// lifecycle will begin and enter its first stage, possibly waiting for input. The provided input is guaranteed to
	// match the RunSchema.
	Start(
		input map[string]any,
		runID string,
		stageChangeHandler StageChangeHandler,
	) (RunningStep, error)
}

RunnableStep is a step that already has a schema and can be run.

type RunningStep

type RunningStep interface {
	// ProvideStageInput gives you the opportunity to provide input for a stage so that it may continue.
	// The ProvideStageInput must ensure that it only returns once the provider has transitioned to the next
	// stage based on the input, otherwise race conditions may happen.
	ProvideStageInput(stage string, input map[string]any) error
	// CurrentStage returns the stage the step provider is currently in, no matter if it is finished or not.
	CurrentStage() string
	// State returns information about the current step.
	State() RunningStepState
	// Close shuts down the step and cleans up the resources associated with the step.
	Close() error
	// ForceClose shuts down the step forcefully.
	ForceClose() error
}

RunningStep is the representation of a step that is currently executing.

type RunningStepState

type RunningStepState string

RunningStepState is the state any running step can be in.

const (
	// RunningStepStateStarting indicates that the step hasn't processed its first stage yet.
	RunningStepStateStarting RunningStepState = "starting"
	// RunningStepStateWaitingForInput indicates that the step is currently blocked because it is missing input.
	RunningStepStateWaitingForInput RunningStepState = "waiting_for_input"
	// RunningStepStateRunning indicates that the step is working.
	RunningStepStateRunning RunningStepState = "running"
	// RunningStepStateFinished indicates that the step has finished, including failure cases.
	RunningStepStateFinished RunningStepState = "finished"
)

type StageChangeHandler

type StageChangeHandler interface {
	// OnStageChange is the callback that notifies the handler of the fact that the previous stage has finished with
	// the specified output. It indicates which next stage the provider moved to, and if it is waiting for input to
	// be provided via ProvideStageInput.
	//
	// The previous stage may be nil if the callback is called on the first stage the provider enters. The output of the
	// previous stage may also be nil if the stage did not declare any outputs.
	OnStageChange(
		step RunningStep,
		previousStage *string,
		previousStageOutputID *string,
		previousStageOutput *any,
		newStage string,
		inputAvailable bool,
		wg *sync.WaitGroup,
	)

	// OnStepComplete is called when the step has completed a final stage in its lifecycle and communicates the output.
	// The previous output may be nil if the previous stage did not declare any outputs.
	OnStepComplete(
		step RunningStep,
		previousStage string,
		previousStageOutputID *string,
		previousStageOutput *any,
		wg *sync.WaitGroup,
	)
}

StageChangeHandler is a callback hook for reacting to changes in stages. The calling party will be notified of the provider advancing stages with this hook.

Directories

Path Synopsis
Package dummy is a step provider that just says hello.
Package dummy is a step provider that just says hello.
Package foreach provides the ability to loop over items.
Package foreach provides the ability to loop over items.
Package plugin provides a step provider that executes container-based Arcaflow plugins using deployers.
Package plugin provides a step provider that executes container-based Arcaflow plugins using deployers.
Package registry provides the step registry, joining the step providers together.
Package registry provides the step registry, joining the step providers together.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL