flow

package module
v0.0.1-alpha4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: MIT Imports: 10 Imported by: 1

README

Workflow - a library organizes steps with dependencies into DAG (Directed-Acyclic-Graph) for Go

Go Report Card Go Test Status Go Test Coverage

This repo has been populated by an initial template to help get you started. Please make sure to update the content to build a great experience for community-building.

As the maintainer of this project, please make a few updates:

  • Improving this README.MD file to provide a great experience
  • Updating SUPPORT.MD with content about this project's support experience
  • Understanding the security reporting process in SECURITY.MD
  • Remove this section from the README

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultRetryOption = RetryOption{
	Backoff:  backoff.NewExponentialBackOff(),
	Attempts: 3,
}
View Source
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")
View Source
var WorkflowPhases = []Phase{PhaseInit, PhaseMain, PhaseDefer}

WorkflowPhases defines the order of phases Workflow executes. New phases can be added to this, please support the built-in phases.

i.e.

var PhaseDebug flow.Phase = "Debug"

func init() {
	flow.WorkflowPhases = []flow.Phase{flow.PhaseInit, flow.PhaseMain, PhaseDebug, flow.PhaseDefer}
}

Then in your package, workflow will execute the steps in PhaseDebug after PhaseMain, before PhaseDefer.

Functions

func As

func As[T Steper](s Steper) []T

As finds all steps in the tree of step that matches target type, and returns them.

func DontPanic

func DontPanic(w *Workflow)

func Is

func Is[T Steper](s Steper) bool

Is reports whether the any step in step's tree matches target type.

func LogValue

func LogValue(step Steper) logValue

LogValue is used with log/slog, you can use it like:

logger.With("step", LogValue(step))

To prevent expensive String() calls,

logger.With("step", String(step))

func String

func String(step Steper) string

String unwraps step and returns a proper string representation.

Types

type Adapter

type Adapter[S Steper] struct {
	Upstream Steper
	Flow     func(context.Context, S) error
}

func Adapt

func Adapt[U, D Steper](up U, fn func(context.Context, U, D) error) Adapter[D]

Adapt bridges Upstream and Downstream with defining how to flow data.

Use it with InputDependsOn.

Step(down).InputDependsOn(
	Adapt(up, func(_ context.Context, u *Up, d *Down) error {
		// fill Down from Up
		// here Up is terminated, and Down has not started yet
	}),
)

type AddStep

type AddStep[S Steper] struct {
	AddSteps
	Steps []S
}

func Step

func Step[S Steper](steps ...S) AddStep[S]

Step declares Step ready to be added into Workflow.

The main difference between Step() and Steps() is that, Step() allows to add Input and InputDependsOn for the Step.

Step(a).Input(func(ctx context.Context, a *A) error {
	// fill a
}))

func (AddStep[S]) DependsOn

func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]

func (AddStep[S]) Input

func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]

Input adds Input callback for the Step(s).

Input callback will be called before Do, and the order will respect the order of declarations.

Step(a).
	Input(/* 1. this Input will be called first */).
	InputDependsOn(Adapt(up, /* 2. then receive Output from up */)).
	Input(/* 3. this Input is after up's Output */)
Step(a).Input(/* 4. this Input is after all */)

func (AddStep[S]) InputDependsOn

func (as AddStep[S]) InputDependsOn(adapts ...Adapter[S]) AddStep[S]

InputDependsOn declares dependency between Steps, and with feeding data from Upstream to Downstream.

It's useful when the Downstream needs some data from Upstream, and the data is not available until Upstream is done. The Input callback will ignore the Upstream's result as long as it's terminated.

Due to limitation of Go's generic type system, Use Adapt function to workaround the type check.

Step(down).InputDependsOn(
	Adapt(up, func(_ context.Context, u *Up, d *Down) error {
		// fill Down from Up
		// here Up is terminated, and Down has not started yet
	}),
)

func (AddStep[S]) Retry

func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]

func (AddStep[S]) Timeout

func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]

func (AddStep[S]) When

func (as AddStep[S]) When(when Condition) AddStep[S]

type AddSteps

type AddSteps map[Steper]*StepConfig

func Pipe

func Pipe(steps ...Steper) AddSteps

Pipe creates a pipeline in Workflow.

workflow.Add(
	Pipe(a, b, c), // a -> b -> c
)

The above code is equivalent to:

workflow.Add(
	Step(b).DependsOn(a),
	Step(c).DependsOn(b),
)

func Steps

func Steps(steps ...Steper) AddSteps

Steps declares a series of Steps ready to be added into Workflow.

The Steps declared are mutually independent.

workflow.Add(
	Steps(a, b, c),					// a, b, c will be executed in parallel
	Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel
)

func (AddSteps) DependsOn

func (as AddSteps) DependsOn(ups ...Steper) AddSteps

DependsOn declares dependency on the given Steps.

Step(a).DependsOn(b, c)

Then b, c should happen-before a.

func (AddSteps) Done

func (as AddSteps) Done() map[Steper]*StepConfig

func (AddSteps) Retry

func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps

Retry customize how the Step should be retried.

If it's never called, the Step will not be retried. The RetryOption has a DefaultRetryOption as base to be modified.

func (AddSteps) Timeout

func (as AddSteps) Timeout(timeout time.Duration) AddSteps

Timeout sets the Step level timeout.

func (AddSteps) When

func (as AddSteps) When(cond Condition) AddSteps

When set the Condition for the Step.

type Condition

type Condition func(ctx context.Context, ups map[Steper]StatusError) StepStatus

Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstreams are terminated.

var (
	DefaultCondition Condition = AllSucceeded
	// DefaultIsCanceled is used to determine whether an error is being regarded as canceled.
	DefaultIsCanceled = func(err error) bool {
		switch _, isCancel := err.(ErrCancel); {
		case errors.Is(err, context.Canceled),
			errors.Is(err, context.DeadlineExceeded),
			isCancel:
			return true
		}
		return false
	}
)

type ErrCancel

type ErrCancel struct {
	// contains filtered or unexported fields
}

func Cancel

func Cancel(err error) ErrCancel

Cancel will terminate the current step and set status to `Canceled`. Notice Cancel will only take effect when Do returns ErrCancel directly.

Example:

func (s *SomeStepImpl) Do(ctx context.Context) error {
	err := flow.Cancel(fmt.Errorf("some error"))
	return err // this will cancel the step
	return fmt.Errorf("wrap error: %w", err) // this will not cancel the step, but fail it
}

func (ErrCancel) Unwrap

func (e ErrCancel) Unwrap() error

type ErrCycleDependency

type ErrCycleDependency map[Steper][]Steper

There is a cycle-dependency in your Workflow!!!

func (ErrCycleDependency) Error

func (e ErrCycleDependency) Error() string

type ErrInput

type ErrInput struct {
	// contains filtered or unexported fields
}

func (ErrInput) Unwrap

func (e ErrInput) Unwrap() error

type ErrPanic

type ErrPanic struct {
	// contains filtered or unexported fields
}

func (ErrPanic) Unwrap

func (e ErrPanic) Unwrap() error

type ErrSkip

type ErrSkip struct {
	// contains filtered or unexported fields
}

func Skip

func Skip(err error) ErrSkip

Skip will terminate the current step and set status to `Skipped`. Notice Skip will only take effect when Do returns ErrSkip directly.

func (ErrSkip) Unwrap

func (e ErrSkip) Unwrap() error

type ErrUnexpectStepInitStatus

type ErrUnexpectStepInitStatus map[Steper]StepStatus

Step status is not Pending when Workflow starts to run.

func (ErrUnexpectStepInitStatus) Error

type ErrWorkflow

type ErrWorkflow map[Steper]StatusError

ErrWorkflow contains all errors reported from terminated Steps in Workflow.

Keys are root Steps, values are its status and error.

func (ErrWorkflow) AllSucceeded

func (e ErrWorkflow) AllSucceeded() bool

func (ErrWorkflow) AllSucceededOrSkipped

func (e ErrWorkflow) AllSucceededOrSkipped() bool

func (ErrWorkflow) Error

func (e ErrWorkflow) Error() string

ErrWorkflow will be printed as:

Step: [Status]
	error message

func (ErrWorkflow) MarshalJSON

func (e ErrWorkflow) MarshalJSON() ([]byte, error)

MarshalJSON allows us to marshal ErrWorkflow to json.

{
	"Step": {
		"status": "Status",
		"error": "error message"
	}
}

func (ErrWorkflow) Unwrap

func (e ErrWorkflow) Unwrap() []error

type Function

type Function[I, O any] struct {
	Name   string
	Input  I
	Output O
	DoFunc func(context.Context, I) (O, error)
}

Function wraps an arbitrary function as a Step.

func Func

func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]

Func constructs a Step from an arbitrary function

func FuncI

func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]

func FuncIO

func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]

func FuncO

func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]

func (*Function[I, O]) Do

func (f *Function[I, O]) Do(ctx context.Context) error

func (*Function[I, O]) String

func (f *Function[I, O]) String() string

type MockStep

type MockStep struct {
	Step   Steper
	MockDo func(context.Context) error
}

MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.

func (*MockStep) Do

func (m *MockStep) Do(ctx context.Context) error

func (*MockStep) Unwrap

func (m *MockStep) Unwrap() Steper

type NamedStep

type NamedStep struct {
	Name string
	Steper
}

NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.

func WithName

func WithName(name string, step Steper) *NamedStep

WithName gives your step a name by overriding String() method.

func (*NamedStep) String

func (ns *NamedStep) String() string

func (*NamedStep) Unwrap

func (ns *NamedStep) Unwrap() Steper

type NonOpStep

type NonOpStep struct {
	Name string
}

this will be used as a virtual node to handle the dependency of steps

func (*NonOpStep) Do

Do implements Steper.

func (*NonOpStep) String

func (n *NonOpStep) String() string

type Notify

type Notify struct {
	BeforeStep func(ctx context.Context, step Steper) context.Context
	AfterStep  func(ctx context.Context, step Steper, err error)
}

Notify will be called before and after each step being executed.

Example
workflow := new(Workflow)
workflow.Add(
	Step(Func("dummy step", func(ctx context.Context) error {
		fmt.Println("inside step")
		return fmt.Errorf("step error")
	})),
).Options(
	WithNotify(Notify{
		BeforeStep: func(ctx context.Context, step Steper) context.Context {
			fmt.Printf("before step: %s\n", step)
			return ctx
		},
		AfterStep: func(ctx context.Context, step Steper, err error) {
			fmt.Printf("after step: %s error: %s\n", step, err)
		},
	}),
)
_ = workflow.Do(context.Background())
Output:

before step: dummy step
inside step
after step: dummy step error: step error

type Phase

type Phase string

Phase clusters Steps into different execution phases.

Workflow supports three built-in phases: Init, Main and Defer. It derives from the below common go pattern:

func init() {}
func main() {
	defer func() {}
}

- Only all Steps in previous phase terminated, the next phase will start. - Even if the steps in previous phase are not successful, the next phase will still start. - The order of steps in the same phase is not guaranteed. (defer is not stack!)

Customized phase can be added to WorkflowPhases.

const (
	PhaseUnknown Phase = ""
	PhaseInit    Phase = "Init"
	PhaseMain    Phase = "Main"
	PhaseDefer   Phase = "Defer"
)

type RetryOption

type RetryOption struct {
	Timeout  time.Duration // 0 means no timeout, it's per-retry timeout
	Attempts uint64        // 0 means no limit
	StopIf   func(ctx context.Context, attempt uint64, since time.Duration, err error) bool
	Backoff  backoff.BackOff
	Notify   backoff.Notify
	Timer    backoff.Timer
}

RetryOption customizes retry behavior of a Step in Workflow.

type Set

type Set[T comparable] map[T]struct{}

func (Set[T]) Add

func (s Set[T]) Add(vs ...T)

func (Set[T]) Has

func (s Set[T]) Has(v T) bool

func (Set[T]) Union

func (s Set[T]) Union(sets ...Set[T])

type State

type State struct {
	StatusError
	Config *StepConfig
	sync.RWMutex
}

State is the internal state of a Step in a Workflow.

It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.

func (*State) AddUpstream

func (s *State) AddUpstream(up Steper)

func (*State) GetError

func (s *State) GetError() error

func (*State) GetStatus

func (s *State) GetStatus() StepStatus

func (*State) GetStatusError

func (s *State) GetStatusError() StatusError

func (*State) Input

func (s *State) Input(ctx context.Context) error

func (*State) MergeConfig

func (s *State) MergeConfig(sc *StepConfig)

func (*State) Option

func (s *State) Option() *StepOption

func (*State) SetError

func (s *State) SetError(err error)

func (*State) SetStatus

func (s *State) SetStatus(ss StepStatus)

func (*State) Upstreams

func (s *State) Upstreams() Set[Steper]

type StatusError

type StatusError struct {
	Status StepStatus
	Err    error
}

StatusError contains the status and error of a Step.

func (StatusError) Error

func (e StatusError) Error() string

StatusError will be printed as:

[Status]
	error message

func (StatusError) MarshalJSON

func (e StatusError) MarshalJSON() ([]byte, error)

MarshalJSON allows us to marshal StatusError to json.

{
	"status": "Status",
	"error": "error message"
}

func (StatusError) Unwrap

func (e StatusError) Unwrap() error

type StepConfig

type StepConfig struct {
	Upstreams Set[Steper]                 // Upstreams of the Step, means these Steps should happen-before this Step
	Input     func(context.Context) error // Input callback of the Step, will be called before Do
	Option    func(*StepOption)           // Option customize the Step settings
}

func (*StepConfig) AddInput

func (sc *StepConfig) AddInput(input func(context.Context) error)

func (*StepConfig) AddOption

func (sc *StepConfig) AddOption(opt func(*StepOption))

func (*StepConfig) Merge

func (sc *StepConfig) Merge(other *StepConfig)

type StepOption

type StepOption struct {
	RetryOption *RetryOption   // RetryOption customize how the Step should be retried, default (nil) means no retry.
	Condition   Condition      // Condition decides whether Workflow should execute the Step, default to DefaultCondition.
	Timeout     *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout.
}

type StepStatus

type StepStatus string

StepStatus describes the status of a Step.

const (
	Pending   StepStatus = ""
	Running   StepStatus = "Running"
	Failed    StepStatus = "Failed"
	Succeeded StepStatus = "Succeeded"
	Canceled  StepStatus = "Canceled"
	Skipped   StepStatus = "Skipped"
)

func AllSucceeded

func AllSucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus

AllSucceeded: all Upstreams are Succeeded

func Always

Always: as long as all Upstreams are terminated

func AnyFailed

func AnyFailed(ctx context.Context, ups map[Steper]StatusError) StepStatus

AnyFailed: any Upstream is Failed

func BeCanceled

func BeCanceled(ctx context.Context, ups map[Steper]StatusError) StepStatus

BeCanceled: only run when the workflow is canceled

func (StepStatus) IsTerminated

func (s StepStatus) IsTerminated() bool

func (StepStatus) String

func (s StepStatus) String() string

type StepTree

type StepTree map[Steper]Steper

StepTree is a tree data structure of steps, it helps Workflow tracks Nested Steps.

Why StepTree is needed?

What if someone add a Step and its Nested Step to Workflow?

doSomeThing := &DoSomeThing{}
decorated := &Docorator{Steper: step}
workflow.Add(
	Step(doSomeThing),
	Step(decorated),
)

docorated.Do() will call doSomeThing.Do() internally, and apparently, we don't want doSomeThing.Do() being called twice.

StepTree is the solution to the above questions.

What is StepTree?

Let's dive into the definitions, if some Step wrap another Step, then

type Parent struct {
	Child Steper
}
type Parent struct { // This Parent "branches"
	Children []Steper
}

Then we can draw a tree like:

┌────┐ ┌────┐    ┌────┐
│ R1 │ │ R2 │    │ R3 │
└─┬──┘ └─┬──┘    └─┬──┘
┌─┴──┐ ┌─┴──┐    ┌─┴──┐
│ L1 │ │ T2 │    │ B3 │
└────┘ └─┬──┘    └─┬──┘
         │      ┌──┴────┐
       ┌─┴──┐ ┌─┴──┐  ┌─┴──┐
       │ L2 │ │ L3 │  │ T3 │
       └────┘ └────┘  └─┬──┘
                      ┌─┴──┐
                      │ L4 │
                      └────┘

Where

  • [R]oot: the root Step, there isn't other Step wrapping it.
  • [L]eaf: the leaf Step, it doesn't wrap any Step inside.
  • [T]runk: the trunk Step, it has method Unwrap() Steper, one Child.
  • [B]ranch: the branch Step, it has method Unwrap() []Steper, multiple Children.

Then the StepTree built for the above tree is:

StepTree{
	R1: R1, // root's value is itself
	L1: R1,
	T2: R2, // trunk is "trasparent"
	L2: R2,
	B3: R3,
	L3: B3, // to the lowest [B]ranch ancestor
	T3: B3,
	L4: B3,
	...
}

StepTree is a data structure that

  • keys are all Steps in track
  • values are the root of that Step, or lowest ancestor that branches.

If we consider Workflow into the tree, all sub-Workflow are "branch" Steps.

The contract between Nested Step and Workflow is:

Once a Step "wrap" other Steps, it should have responsibility to orchestrate them.

So from the Workflow's perspective, it only needs to orchestrate the root Steps, to make sure all Steps are executed in the right order.

func (StepTree) Add

func (sc StepTree) Add(step Steper) (oldRoots Set[Steper])

Add a step and all it's descendant steps to the tree.

If step is already in the tree, it's no-op. If step is new, the step will becomes a new root.

func (StepTree) IsRoot

func (st StepTree) IsRoot(step Steper) bool

func (StepTree) RootOf

func (st StepTree) RootOf(step Steper) Steper

func (StepTree) Roots

func (sc StepTree) Roots() Set[Steper]

type Steper

type Steper interface {
	Do(context.Context) error
}

Steper describes the requirement for a Step, which is basic unit of a Workflow.

Implement this interface to allow the power of Workflow to orchestrate your Steps. Notice your implementation should be a pointer type.

func ToSteps

func ToSteps[S Steper](steps []S) []Steper

ToSteps converts []<StepDoer implemention> to []StepDoer.

steps := []someStepImpl{ ... }
flow.Add(
	Steps(ToSteps(steps)...),
)

type StringerNamedStep

type StringerNamedStep struct {
	Name fmt.Stringer
	Steper
}

func (*StringerNamedStep) String

func (sns *StringerNamedStep) String() string

func (*StringerNamedStep) Unwrap

func (sns *StringerNamedStep) Unwrap() Steper

type Workflow

type Workflow struct {
	DontPanic bool // whether recover panic from Step(s)
	// contains filtered or unexported fields
}

Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).

The Steps are connected via dependency, use Step(), Steps() or Pipe() to add Steps into Workflow.

workflow.Add(
	Step(a),
	Steps(b, c).DependsOn(a),	// a -> b, c
	Pipe(d, e, f),              // d -> e -> f
)

Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine. Workflow guarantees that

Before a Step goroutine starts, all its Upstream Steps are terminated, and registered Input callbacks are called.

Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports Workflow-level configuration, check WorkflowOption for details. Workflow supports executing Steps phase in phase, check Phase for details. Workflow supports Nested Steps, check Is(), As() and StepTree for details.

func (*Workflow) Add

func (w *Workflow) Add(was ...WorkflowAdder) *Workflow

Add Steps into Workflow in phase Main.

func (*Workflow) Defer

func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow

Defer adds Steps into Workflow in phase Defer.

func (*Workflow) Do

func (w *Workflow) Do(ctx context.Context) error

Do starts the Step execution in topological order, and waits until all Steps terminated.

Do will block the current goroutine.

func (*Workflow) Empty

func (w *Workflow) Empty() bool

func (*Workflow) Init

func (w *Workflow) Init(was ...WorkflowAdder) *Workflow

Init adds Steps into Workflow in phase Init.

func (*Workflow) IsPhaseTerminated

func (w *Workflow) IsPhaseTerminated(phase Phase) bool

func (*Workflow) IsTerminated

func (w *Workflow) IsTerminated() bool

IsTerminated returns true if all Steps terminated.

func (*Workflow) Options

func (s *Workflow) Options(opts ...WorkflowOption) *Workflow

func (*Workflow) PhaseAdd

func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow

PhaseAdd add Steps into specific phase.

func (*Workflow) PhaseOf

func (w *Workflow) PhaseOf(step Steper) Phase

PhaseOf returns the execution phase of the Step.

func (*Workflow) RootOf

func (w *Workflow) RootOf(step Steper) Steper

RootOf returns the root Step of the given Step.

func (*Workflow) StateOf

func (w *Workflow) StateOf(step Steper) *State

StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.

func (*Workflow) Steps

func (w *Workflow) Steps() []Steper

Steps returns all root Steps in the Workflow.

func (*Workflow) Unwrap

func (w *Workflow) Unwrap() []Steper

func (*Workflow) UpstreamOf

func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError

UpstreamOf returns all upstream Steps and their status / error of the Step.

type WorkflowAdder

type WorkflowAdder interface {
	Done() map[Steper]*StepConfig
}

Implement this interface to be added into Workflow!

type WorkflowOption

type WorkflowOption func(*Workflow)

WorkflowOption alters the behavior of a Workflow.

func WithClock

func WithClock(clock clock.Clock) WorkflowOption

func WithMaxConcurrency

func WithMaxConcurrency(n int) WorkflowOption

WithMaxConcurrency limits the max concurrency of Steps in StepStatusRunning.

func WithNotify

func WithNotify(notify Notify) WorkflowOption

Directories

Path Synopsis
visual
react Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL