flow

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: MIT Imports: 10 Imported by: 1

README

Workflow - a library organizes steps with dependencies into DAG (Directed-Acyclic-Graph) for Go

Go Report Card Go Test Status Go Test Coverage

Overview

Strongly encourage everyone to read examples in the examples directory to have a quick understanding of how to use this library.

go-workflow helps Go developers organize steps with dependencies into a Directed-Acyclic-Graph (DAG).

  • It provides a simple and flexible way to define and execute a workflow.
  • It is easy to implement steps and compose them into a composite step.
  • It uses goroutine to execute steps concurrently.
  • It supports retry, timeout, and other configurations for each step.
  • It supports callbacks to hook before / after each step.

See it in action:

package yours

import (
    "context"

    flow "github.com/Azure/go-workflow"
)

type Step struct{ Value string }

// All required for a step is `Do(context.Context) error`
func (s *Step) Do(ctx context.Context) error {
    fmt.Println(s.Value)
    return nil
}

func main() {
    // declare steps
    var (
        a = new(Step)
        b = &Step{Value: "B"}
        c = flow.Func("declare from anonymous function", func(ctx context.Context) error {
            fmt.Println("C")
            return nil
        })
    )
    // compose steps into a workflow!
    w := new(flow.Workflow)
    w.Add(
        flow.Step(b).DependsOn(a),     // use DependsOn to define dependencies
        flow.Steps(a, b).DependsOn(c), // final execution order: c -> a -> b

        // other configurations, like retry, timeout, condition, etc.
        flow.Step(c).
            Retry(func(ro *flow.RetryOption) {
                ro.Attempts = 3 // retry 3 times
            }).
            Timeout(10*time.Minute), // timeout after 10 minutes

        // use Input to change step at runtime
        flow.Step(a).Input(func(ctx context.Context, a *Step) error {
            a.Value = "A"
            return nil
        }),
    )
    // execute the workflow and block until all steps are terminated
    err := w.Do(context.Background())
}

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultRetryOption = RetryOption{
	Backoff:  backoff.NewExponentialBackOff(),
	Attempts: 3,
}
View Source
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")
View Source
var WorkflowPhases = []Phase{PhaseInit, PhaseMain, PhaseDefer}

WorkflowPhases defines the order of phases Workflow executes. New phases can be added to this, please support the built-in phases.

i.e.

var PhaseDebug flow.Phase = "Debug"

func init() {
	flow.WorkflowPhases = []flow.Phase{flow.PhaseInit, flow.PhaseMain, PhaseDebug, flow.PhaseDefer}
}

Then in your package, workflow will execute the steps in PhaseDebug after PhaseMain, before PhaseDefer.

Functions

func As

func As[T Steper](s Steper) []T

As finds all steps in the tree of step that matches target type, and returns them. The sequence of the returned steps is preorder traversal.

func ConditionOrDefault added in v0.0.7

func ConditionOrDefault(cond Condition) func(context.Context, map[Steper]StatusError) StepStatus

ConditionOrDefault will use DefaultCondition if cond is nil.

func Is

func Is[T Steper](s Steper) bool

Is reports whether there is any step in tree matches target type.

func IsStep added in v0.0.7

func IsStep(step, target Steper) bool

IsStep reports whether there is any step matches target step.

func Keys added in v0.0.3

func Keys[M ~map[K]V, K comparable, V any](m M) []K

Keys returns the keys of the map m. The keys will be in an indeterminate order.

func LogValue

func LogValue(step Steper) logValue

LogValue is used with log/slog, you can use it like:

logger.With("step", LogValue(step))

To prevent expensive String() calls,

logger.With("step", String(step))

func String

func String(step Steper) string

String unwraps step and returns a proper string representation.

Types

type AddStep

type AddStep[S Steper] struct {
	AddSteps
	Steps []S
}

func Step

func Step[S Steper](steps ...S) AddStep[S]

Step declares Step ready to be added into Workflow.

The main difference between Step() and Steps() is that, Step() allows to add Input and InputDependsOn for the Step.

Step(a).Input(func(ctx context.Context, a *A) error {
	// fill a
}))

func (AddStep[S]) AfterStep added in v0.0.3

func (as AddStep[S]) AfterStep(afters ...AfterStep) AddStep[S]

func (AddStep[S]) BeforeStep added in v0.0.3

func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]

func (AddStep[S]) DependsOn

func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]

func (AddStep[S]) Input

func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]

Input adds BeforeStep callback for the Step(s).

Input callbacks will be called before Do, and the order will respect the order of declarations.

Step(a).
	Input(/* 1. this Input will be called first */).
	Input(/* 2. this Input is after up's Output */)
Step(a).Input(/* 3. this Input is after all */)

The Input callbacks are executed at runtime and per try.

func (AddStep[S]) Retry

func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]

func (AddStep[S]) Timeout

func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]

func (AddStep[S]) When

func (as AddStep[S]) When(when Condition) AddStep[S]

type AddSteps

type AddSteps map[Steper]*StepConfig

func BatchPipe added in v0.0.3

func BatchPipe(batch ...AddSteps) AddSteps

BatchPipe creates a batched pipeline in Workflow.

workflow.Add(
	BatchPipe(
		Steps(a, b),
		Steps(c, d, e),
		Steps(f),
	),
)

The above code is equivalent to:

workflow.Add(
	Steps(c, d, e).DependsOn(a, b),
	Steps(f).DependsOn(c, d, e),
)

func Pipe

func Pipe(steps ...Steper) AddSteps

Pipe creates a pipeline in Workflow.

workflow.Add(
	Pipe(a, b, c), // a -> b -> c
)

The above code is equivalent to:

workflow.Add(
	Step(b).DependsOn(a),
	Step(c).DependsOn(b),
)

func Steps

func Steps(steps ...Steper) AddSteps

Steps declares a series of Steps ready to be added into Workflow.

The Steps declared are mutually independent.

workflow.Add(
	Steps(a, b, c),					// a, b, c will be executed in parallel
	Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel
)

func (AddSteps) AfterStep added in v0.0.3

func (as AddSteps) AfterStep(afters ...AfterStep) AddSteps

AfterStep adds AfterStep callback for the Step(s).

The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. The order of execution will respect the order of declarations. The AfterStep callbacks are able to change the error returned by Do. The AfterStep callbacks are executed at runtime and per try.

func (AddSteps) BeforeStep added in v0.0.3

func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps

BeforeStep adds BeforeStep callback for the Step(s).

The BeforeStep callback will be called before Do, and return when first error occurs. The order of execution will respect the order of declarations. The BeforeStep callbacks are able to change the context.Context feed into Do. The BeforeStep callbacks are executed at runtime and per try.

func (AddSteps) DependsOn

func (as AddSteps) DependsOn(ups ...Steper) AddSteps

DependsOn declares dependency on the given Steps.

Step(a).DependsOn(b, c)

Then b, c should happen-before a.

func (AddSteps) Done

func (as AddSteps) Done() map[Steper]*StepConfig

Done implements WorkflowAdder

func (AddSteps) Merge added in v0.0.3

func (as AddSteps) Merge(others ...AddSteps) AddSteps

Merge another AddSteps into one.

func (AddSteps) Retry

func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps

Retry customize how the Step should be retried.

Step will be retried as long as this option is configured.

w.Add(
	Step(a), // not retry
	Step(b).Retry(func(opt *RetryOption) { // will retry 3 times
		opt.MaxAttempts = 3
	}),
	Step(c).Retry(nil), // will use DefaultRetryOption!
)

func (AddSteps) Timeout

func (as AddSteps) Timeout(timeout time.Duration) AddSteps

Timeout sets the Step level timeout.

func (AddSteps) When

func (as AddSteps) When(cond Condition) AddSteps

When set the Condition for the Step.

type AfterStep added in v0.0.3

type AfterStep func(context.Context, Steper, error) error

AfterStep defines callback being called AFTER step being executed.

type BeforeStep added in v0.0.3

type BeforeStep func(context.Context, Steper) (context.Context, error)

BeforeStep defines callback being called BEFORE step being executed.

type BranchCheck added in v0.0.7

type BranchCheck[T Steper] struct {
	Check BranchCheckFunc[T]
	OK    bool
	Error error
}

BranchCheck represents a branch to be checked.

func (*BranchCheck[T]) Do added in v0.0.7

func (bc *BranchCheck[T]) Do(ctx context.Context, target T)

type BranchCheckFunc added in v0.0.7

type BranchCheckFunc[T Steper] func(context.Context, T) (bool, error)

BranchCheckFunc checks the target and returns true if the branch should be taken.

type Condition

type Condition func(ctx context.Context, ups map[Steper]StatusError) StepStatus

Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstream Steps are terminated.

var (
	// DefaultCondition used in workflow, defaults to AllSucceeded
	DefaultCondition Condition = AllSucceeded
	// DefaultIsCanceled is used to determine whether an error is being regarded as canceled.
	DefaultIsCanceled = func(err error) bool {
		switch {
		case errors.Is(err, context.Canceled),
			errors.Is(err, context.DeadlineExceeded),
			StatusFromError(err) == Canceled:
			return true
		}
		return false
	}
)

type ErrBefore added in v0.0.3

type ErrBefore struct {
	// contains filtered or unexported fields
}

func (ErrBefore) Unwrap added in v0.0.3

func (e ErrBefore) Unwrap() error

type ErrCancel

type ErrCancel struct {
	// contains filtered or unexported fields
}

func Cancel

func Cancel(err error) ErrCancel

Cancel marks the current step as `Canceled`, and reports the error.

func (ErrCancel) Unwrap

func (e ErrCancel) Unwrap() error

type ErrCycleDependency

type ErrCycleDependency map[Steper][]Steper

ErrCycleDependency means there is a cycle-dependency in your Workflow!!!

func (ErrCycleDependency) Error

func (e ErrCycleDependency) Error() string

type ErrPanic

type ErrPanic struct {
	// contains filtered or unexported fields
}

func (ErrPanic) Unwrap

func (e ErrPanic) Unwrap() error

type ErrSkip

type ErrSkip struct {
	// contains filtered or unexported fields
}

func Skip

func Skip(err error) ErrSkip

Skip marks the current step as `Skipped`, and reports the error.

func (ErrSkip) Unwrap

func (e ErrSkip) Unwrap() error

type ErrSucceed

type ErrSucceed struct {
	// contains filtered or unexported fields
}

func Succeed

func Succeed(err error) ErrSucceed

Succeed marks the current step as `Succeeded`, while still reports the error.

func (ErrSucceed) Unwrap

func (e ErrSucceed) Unwrap() error

type ErrWorkflow

type ErrWorkflow map[Steper]StatusError

ErrWorkflow contains all errors reported from terminated Steps in Workflow.

Keys are root Steps, values are its status and error.

func (ErrWorkflow) AllSucceeded

func (e ErrWorkflow) AllSucceeded() bool

func (ErrWorkflow) AllSucceededOrSkipped

func (e ErrWorkflow) AllSucceededOrSkipped() bool

func (ErrWorkflow) Error

func (e ErrWorkflow) Error() string

ErrWorkflow will be printed as:

Step: [Status]
	error message

func (ErrWorkflow) MarshalJSON

func (e ErrWorkflow) MarshalJSON() ([]byte, error)

MarshalJSON allows us to marshal ErrWorkflow to json.

{
	"Step": {
		"status": "Status",
		"error": "error message"
	}
}

func (ErrWorkflow) Unwrap

func (e ErrWorkflow) Unwrap() []error

type ErrWrappedStepAlreadyInTree added in v0.0.3

type ErrWrappedStepAlreadyInTree struct {
	StepAlreadyThere Steper
	NewAncestor      Steper
	OldAncestor      Steper
}

func (ErrWrappedStepAlreadyInTree) Error added in v0.0.3

type Function

type Function[I, O any] struct {
	Name   string
	Input  I
	Output O
	DoFunc func(context.Context, I) (O, error)
}

Function wraps an arbitrary function as a Step.

func Func

func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]

Func constructs a Step from an arbitrary function

func FuncI

func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]

func FuncIO

func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]

func FuncO

func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]

func (*Function[I, O]) Do

func (f *Function[I, O]) Do(ctx context.Context) error

func (*Function[I, O]) String

func (f *Function[I, O]) String() string

type IfBranch added in v0.0.7

type IfBranch[T Steper] struct {
	Target      T // the target to check
	BranchCheck BranchCheck[T]
	ThenStep    []Steper
	ElseStep    []Steper
	Cond        Condition // Cond is the When condition for both ThenStep and ElseStep, not target Step!
}

IfBranch adds target step, then and else step to workflow, and check the target step and determine which branch to go.

func If added in v0.0.7

func If[T Steper](target T, check BranchCheckFunc[T]) *IfBranch[T]

If adds a conditional branch to the workflow.

If(someStep, func(ctx context.Context, someStep *SomeStep) (bool, error) {
	// branch condition here, true -> Then, false -> Else.
	// if error is returned, then fail the selected branch step.
}).
Then(thenStep).
Else(elseStep)

func (*IfBranch[T]) Done added in v0.0.7

func (i *IfBranch[T]) Done() map[Steper]*StepConfig

func (*IfBranch[T]) Else added in v0.0.7

func (i *IfBranch[T]) Else(el ...Steper) *IfBranch[T]

Else adds steps to the Else branch.

func (*IfBranch[T]) Then added in v0.0.7

func (i *IfBranch[T]) Then(th ...Steper) *IfBranch[T]

Then adds steps to the Then branch.

func (*IfBranch[T]) When added in v0.0.7

func (i *IfBranch[T]) When(cond Condition) *IfBranch[T]

When adds a condition to both Then and Else steps, not the Target! Default to DefaultCondition.

type MockStep

type MockStep struct {
	Step   Steper
	MockDo func(context.Context) error
}

MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.

func (*MockStep) Do

func (m *MockStep) Do(ctx context.Context) error

func (*MockStep) Unwrap

func (m *MockStep) Unwrap() Steper

type NamedStep

type NamedStep struct {
	Name string
	Steper
}

NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.

func (*NamedStep) String

func (ns *NamedStep) String() string

func (*NamedStep) Unwrap

func (ns *NamedStep) Unwrap() Steper

type NonOpStep

type NonOpStep struct {
	Name string
}

this will be used as a virtual node to handle the dependency of steps

func (*NonOpStep) Do

Do implements Steper.

func (*NonOpStep) String

func (n *NonOpStep) String() string

type Phase

type Phase string

Phase groups Steps into different execution phases.

Workflow supports three built-in phases: Init, Main and Defer. It derives from the below common go pattern:

func init() {}
func main() {
	defer func() {}
}

- Only all Steps in previous phase terminated, the next phase will start. - Even if the steps in previous phase are not successful, the next phase will always start. - The order of steps in the same phase is not guaranteed. (defer here is not stack!)

Customized phase can be added to WorkflowPhases.

const (
	PhaseUnknown Phase = ""
	PhaseInit    Phase = "Init"
	PhaseMain    Phase = "Main"
	PhaseDefer   Phase = "Defer"
)

type RetryOption

type RetryOption struct {
	TimeoutPerTry time.Duration // 0 means no timeout
	Attempts      uint64        // 0 means no limit
	StopIf        func(ctx context.Context, attempt uint64, since time.Duration, err error) bool
	Backoff       backoff.BackOff
	Notify        backoff.Notify
	Timer         backoff.Timer
}

RetryOption customizes retry behavior of a Step in Workflow.

type Set

type Set[T comparable] map[T]struct{}

func (Set[T]) Add

func (s Set[T]) Add(vs ...T)

func (Set[T]) Flatten added in v0.0.7

func (s Set[T]) Flatten() []T

func (Set[T]) Has

func (s Set[T]) Has(v T) bool

func (Set[T]) Union

func (s Set[T]) Union(sets ...Set[T])

type State

type State struct {
	StatusError
	Config *StepConfig
	sync.RWMutex
}

State is the internal state of a Step in a Workflow.

It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.

func (*State) AddUpstream

func (s *State) AddUpstream(up Steper)

func (*State) After added in v0.0.3

func (s *State) After(ctx context.Context, step Steper, err error) error

func (*State) Before added in v0.0.3

func (s *State) Before(ctx context.Context, step Steper) (context.Context, error)

func (*State) GetError

func (s *State) GetError() error

func (*State) GetStatus

func (s *State) GetStatus() StepStatus

func (*State) GetStatusError

func (s *State) GetStatusError() StatusError

func (*State) MergeConfig

func (s *State) MergeConfig(sc *StepConfig)

func (*State) Option

func (s *State) Option() *StepOption

func (*State) SetError

func (s *State) SetError(err error)

func (*State) SetStatus

func (s *State) SetStatus(ss StepStatus)

func (*State) Upstreams

func (s *State) Upstreams() Set[Steper]

type StatusError

type StatusError struct {
	Status StepStatus
	Err    error
}

StatusError contains the status and error of a Step.

func (StatusError) Error

func (e StatusError) Error() string

StatusError will be printed as:

[Status]
	error message

func (StatusError) MarshalJSON

func (e StatusError) MarshalJSON() ([]byte, error)

MarshalJSON allows us to marshal StatusError to json.

{
	"status": "Status",
	"error": "error message"
}

func (StatusError) Unwrap

func (e StatusError) Unwrap() error

type StepBuilder added in v0.0.4

type StepBuilder interface {
	BuildStep()
}

StepBuilder allows to build the internal Steps when adding into Workflow.

type StepImpl struct {}
func (s *StepImpl) Do(ctx context.Context) error { /* ... */ }
func (s *StepImpl) BuildStep() { /* build internal steps */ }

workflow.Add(
	flow.Step(new(StepImpl)), // here will call StepImpl.BuildStep() implicitly
)

type StepConfig

type StepConfig struct {
	Upstreams Set[Steper]       // Upstreams of the Step, means these Steps should happen-before this Step
	Before    []BeforeStep      // Before callbacks of the Step, will be called before Do
	After     []AfterStep       // After callbacks of the Step, will be called before Do
	Option    func(*StepOption) // Option customize the Step settings
}

func (*StepConfig) AddOption

func (sc *StepConfig) AddOption(opt func(*StepOption))

func (*StepConfig) Merge

func (sc *StepConfig) Merge(other *StepConfig)

type StepOption

type StepOption struct {
	RetryOption *RetryOption   // RetryOption customize how the Step should be retried, default (nil) means no retry.
	Condition   Condition      // Condition decides whether Workflow should execute the Step, default to DefaultCondition.
	Timeout     *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout.
}

type StepStatus

type StepStatus string

StepStatus describes the status of a Step.

const (
	Pending   StepStatus = ""
	Running   StepStatus = "Running"
	Failed    StepStatus = "Failed"
	Succeeded StepStatus = "Succeeded"
	Canceled  StepStatus = "Canceled"
	Skipped   StepStatus = "Skipped"
)

func AllSucceeded

func AllSucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus

AllSucceeded: all Upstreams are Succeeded

func AllSucceededOrSkipped added in v0.0.3

func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StatusError) StepStatus

AllSucceededOrSkipped: all Upstreams are Succeeded or Skipped

func Always

Always: as long as all Upstreams are terminated

func AnyFailed

func AnyFailed(ctx context.Context, ups map[Steper]StatusError) StepStatus

AnyFailed: any Upstream is Failed

func AnySucceeded added in v0.0.7

func AnySucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus

AnySucceeded: any Upstream is Succeeded

func BeCanceled

func BeCanceled(ctx context.Context, ups map[Steper]StatusError) StepStatus

BeCanceled: only run when the workflow is canceled

func StatusFromError

func StatusFromError(err error) StepStatus

StatusFromError gets the StepStatus from error.

func (StepStatus) IsTerminated

func (s StepStatus) IsTerminated() bool

func (StepStatus) String

func (s StepStatus) String() string

type StepTree

type StepTree map[Steper]Steper

StepTree is a tree data structure of steps, it helps Workflow tracks Nested Steps.

Why StepTree is needed?

What if someone add a Step and its Nested Step to Workflow?

doSomeThing := &DoSomeThing{}
decorated := &Decorator{Steper: step}
workflow.Add(
	Step(doSomeThing),
	Step(decorated),
)

docorated.Do() will call doSomeThing.Do() internally, and apparently, we don't want doSomeThing.Do() being called twice.

StepTree is the solution to the above questions.

What is StepTree?

Let's dive into the definitions, if some Step wrap another Step, then

type Parent struct {
	Child Steper
}
type Parent struct { // This Parent "branches"
	Children []Steper
}

Then we can draw a tree like:

┌────┐ ┌────┐    ┌────┐
│ R1 │ │ R2 │    │ R3 │
└─┬──┘ └─┬──┘    └─┬──┘
┌─┴──┐ ┌─┴──┐    ┌─┴──┐
│ L1 │ │ T2 │    │ B3 │
└────┘ └─┬──┘    └─┬──┘
         │      ┌──┴────┐
       ┌─┴──┐ ┌─┴──┐  ┌─┴──┐
       │ L2 │ │ L3 │  │ T3 │
       └────┘ └────┘  └─┬──┘
                      ┌─┴──┐
                      │ L4 │
                      └────┘

Where

  • [R]oot: the root Step, there isn't other Step wrapping it.
  • [L]eaf: the leaf Step, it doesn't wrap any Step inside.
  • [T]runk: the trunk Step, it has method Unwrap() Steper, one Child.
  • [B]ranch: the branch Step, it has method Unwrap() []Steper, multiple Children.

Then the StepTree built for the above tree is:

StepTree{
	R1: R1, // root's value is itself
	L1: R1,
	T2: R2,
	L2: T2,
	B3: R3,
	L3: B3,
	T3: B3,
	L4: T3,
	...
}

StepTree is a data structure that

  • keys are all Steps in track
  • values are the ancestor Steps of the corresponding key

If we consider sub-workflow into the tree, all sub-Workflow are "branch" Steps.

The contract between Nested Step and Workflow is:

Once a Step "wrap" other Steps, it should have responsibility to orchestrate the inner steps.

So from the Workflow's perspective, it only needs to orchestrate the root Steps, to make sure all Steps are executed in the right order.

func (StepTree) Add

func (st StepTree) Add(step Steper) (oldRoots Set[Steper])

Add a step and all it's wrapped steps to the tree.

Return the steps that were roots, but now are wrapped and taken place by the new root step.

  • If step is already in the tree, it's no-op.
  • If step is new, the step will becomes a new root. and all its inner steps will be added to the tree.
  • If one of the inner steps is already in tree, panic.

func (StepTree) IsRoot

func (st StepTree) IsRoot(step Steper) bool

IsRoot reports whether the step is a root step.

func (StepTree) RootOf

func (st StepTree) RootOf(step Steper) Steper

RootOf returns the root step of the given step.

func (StepTree) Roots

func (st StepTree) Roots() Set[Steper]

Roots returns all root steps.

type Steper

type Steper interface {
	Do(context.Context) error
}

Steper describes the requirement for a Step, which is basic unit of a Workflow.

Implement this interface to allow Workflow orchestrating your Steps.

Notice Steper will be saved in Workflow as map key, so it's supposed to be 'comparable'.

func ToSteps

func ToSteps[S Steper](steps []S) []Steper

ToSteps converts []<StepDoer implemention> to []StepDoer.

steps := []someStepImpl{ ... }
flow.Add(
	Steps(ToSteps(steps)...),
)

type StringerNamedStep

type StringerNamedStep struct {
	Name fmt.Stringer
	Steper
}

func (*StringerNamedStep) String

func (sns *StringerNamedStep) String() string

func (*StringerNamedStep) Unwrap

func (sns *StringerNamedStep) Unwrap() Steper

type SwitchBranch added in v0.0.7

type SwitchBranch[T Steper] struct {
	Target       T
	CasesToCheck map[Steper]*BranchCheck[T]
	DefaultStep  []Steper
	Cond         Condition
	// contains filtered or unexported fields
}

SwitchBranch adds target step, cases and default step to workflow, and check the target step and determine which branch to go.

func Switch added in v0.0.7

func Switch[T Steper](target T) *SwitchBranch[T]

Switch adds a switch branch to the workflow.

Switch(someStep).
	Case(case1, func(ctx context.Context, someStep *SomeStep) (bool, error) {
		// branch condition here, true to select this branch
		// error will fail the case
	}).
	Default(defaultStep), // the step to run if all case checks return false
)

func (*SwitchBranch[T]) Case added in v0.0.7

func (s *SwitchBranch[T]) Case(step Steper, check BranchCheckFunc[T]) *SwitchBranch[T]

Case adds a case to the switch branch.

func (*SwitchBranch[T]) Cases added in v0.0.7

func (s *SwitchBranch[T]) Cases(steps []Steper, check BranchCheckFunc[T]) *SwitchBranch[T]

Cases adds multiple cases to the switch branch. The check function will be executed for each case step.

func (*SwitchBranch[T]) Default added in v0.0.7

func (s *SwitchBranch[T]) Default(step ...Steper) *SwitchBranch[T]

Default adds default step(s) to the switch branch.

func (*SwitchBranch[T]) Done added in v0.0.7

func (s *SwitchBranch[T]) Done() map[Steper]*StepConfig

func (*SwitchBranch[T]) When added in v0.0.7

func (s *SwitchBranch[T]) When(cond Condition) *SwitchBranch[T]

When adds a condition to all case steps and default, not the Target!

type Workflow

type Workflow struct {
	MaxConcurrency int         // MaxConcurrency limits the max concurrency of running Steps
	DontPanic      bool        // DontPanic suppress panics, instead return it as error
	SkipAsError    bool        // SkipAsError=true will only return nil error when all Steps succeeded, default to false, allowing some Steps skipped
	Clock          clock.Clock // Clock for unit test
	// contains filtered or unexported fields
}

Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).

The Steps are connected via dependency, use Step(), Steps() or Pipe(), BatchPipe() to add Steps into Workflow.

workflow.Add(
	Step(a),
	Steps(b, c).DependsOn(a),	// a -> b, c
	Pipe(d, e, f),              // d -> e -> f
	BatchPipe(
		Steps(g, h),
		Steps(i, j),
	),                          // g, h -> i, j
)

Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine.

Workflow guarantees that

Before a Step goroutine starts, all its Upstream Steps are `terminated`.

Check `StepStatus` and `Condition` for details.

Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports executing Steps phase in phase, check Phase for details. Workflow supports Nested Steps, check Is(), As() and StepTree for details.

func (*Workflow) Add

func (w *Workflow) Add(was ...WorkflowAdder) *Workflow

Add Steps into Workflow in phase Main.

func (*Workflow) Defer

func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow

Defer adds Steps into Workflow in phase Defer.

func (*Workflow) Do

func (w *Workflow) Do(ctx context.Context) error

Do starts the Step execution in topological order, and waits until all Steps terminated.

Do will block the current goroutine.

func (*Workflow) Empty

func (w *Workflow) Empty() bool

Empty returns true if the Workflow don't have any Step.

func (*Workflow) Init

func (w *Workflow) Init(was ...WorkflowAdder) *Workflow

Init adds Steps into Workflow in phase Init.

func (*Workflow) IsPhaseTerminated

func (w *Workflow) IsPhaseTerminated(phase Phase) bool

IsPhaseTerminated returns true if all Steps in the phase terminated.

func (*Workflow) IsTerminated

func (w *Workflow) IsTerminated() bool

IsTerminated returns true if all Steps terminated.

func (*Workflow) PhaseAdd

func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow

PhaseAdd add Steps into specific phase.

func (*Workflow) PhaseOf

func (w *Workflow) PhaseOf(step Steper) Phase

PhaseOf returns the execution phase of the Step.

func (*Workflow) Reset added in v0.0.7

func (w *Workflow) Reset() error

Reset resets the Workflow to ready for a new run.

func (*Workflow) RootOf

func (w *Workflow) RootOf(step Steper) Steper

RootOf returns the root Step of the given Step.

func (*Workflow) StateOf

func (w *Workflow) StateOf(step Steper) *State

StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.

func (*Workflow) Steps

func (w *Workflow) Steps() []Steper

Steps returns all root Steps in the Workflow.

func (*Workflow) Unwrap

func (w *Workflow) Unwrap() []Steper

func (*Workflow) UpstreamOf

func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError

UpstreamOf returns all upstream Steps and their status and error.

type WorkflowAdder

type WorkflowAdder interface {
	Done() map[Steper]*StepConfig
}

WorkflowAdder is addable into Workflow!

func Mock added in v0.0.3

func Mock[T Steper](step T, do func(context.Context) error) WorkflowAdder

Mock helps to mock a step in Workflow.

w.Add(
	flow.Mock(step, func(ctx context.Context) error {}),
)

func Name added in v0.0.3

func Name(step Steper, name string) WorkflowAdder

Name can rename a Step.

workflow.Add(
	Step(a),
	Name(a, "StepA"),
)

Attention: Name will wrap the original Step

func NameFunc added in v0.0.5

func NameFunc(step Steper, fn func() string) WorkflowAdder

NameFunc can rename a Step with a runtime function.

func NameStringer added in v0.0.3

func NameStringer(step Steper, name fmt.Stringer) WorkflowAdder

NameStringer can rename a Step with a fmt.Stringer, which allows String() method to be called at runtime.

func Names added in v0.0.5

func Names(m map[Steper]string) WorkflowAdder

Names can rename multiple Steps.

workflow.Add(
	Names(map[Steper]string{
		stepA: "A",
		stepB: "B",
	},
)

Directories

Path Synopsis
visual
react Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL