Documentation ¶
Index ¶
- Variables
- func As[T Steper](s Steper) []T
- func DontPanic(w *Workflow)
- func Is[T Steper](s Steper) bool
- func LogValue(step Steper) logValue
- func String(step Steper) string
- type Adapter
- type AddStep
- func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]
- func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]
- func (as AddStep[S]) InputDependsOn(adapts ...Adapter[S]) AddStep[S]
- func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
- func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]
- func (as AddStep[S]) When(when Condition) AddStep[S]
- type AddSteps
- type Condition
- type ErrCancel
- type ErrCycleDependency
- type ErrInput
- type ErrPanic
- type ErrSkip
- type ErrUnexpectStepInitStatus
- type ErrWorkflow
- type Function
- func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]
- func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]
- func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]
- func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]
- type MockStep
- type NamedStep
- type NonOpStep
- type Notify
- type Phase
- type RetryOption
- type Set
- type State
- func (s *State) AddUpstream(up Steper)
- func (s *State) GetError() error
- func (s *State) GetStatus() StepStatus
- func (s *State) GetStatusError() StatusError
- func (s *State) Input(ctx context.Context) error
- func (s *State) MergeConfig(sc *StepConfig)
- func (s *State) Option() *StepOption
- func (s *State) SetError(err error)
- func (s *State) SetStatus(ss StepStatus)
- func (s *State) Upstreams() Set[Steper]
- type StatusError
- type StepConfig
- type StepOption
- type StepStatus
- type StepTree
- type Steper
- type StringerNamedStep
- type Workflow
- func (w *Workflow) Add(was ...WorkflowAdder) *Workflow
- func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow
- func (w *Workflow) Do(ctx context.Context) error
- func (w *Workflow) Empty() bool
- func (w *Workflow) Init(was ...WorkflowAdder) *Workflow
- func (w *Workflow) IsPhaseTerminated(phase Phase) bool
- func (w *Workflow) IsTerminated() bool
- func (s *Workflow) Options(opts ...WorkflowOption) *Workflow
- func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow
- func (w *Workflow) PhaseOf(step Steper) Phase
- func (w *Workflow) RootOf(step Steper) Steper
- func (w *Workflow) StateOf(step Steper) *State
- func (w *Workflow) Steps() []Steper
- func (w *Workflow) Unwrap() []Steper
- func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError
- type WorkflowAdder
- type WorkflowOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultRetryOption = RetryOption{
Backoff: backoff.NewExponentialBackOff(),
Attempts: 3,
}
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")
var WorkflowPhases = []Phase{PhaseInit, PhaseMain, PhaseDefer}
WorkflowPhases defines the order of phases Workflow executes. New phases can be added to this, please support the built-in phases.
i.e.
var PhaseDebug flow.Phase = "Debug" func init() { flow.WorkflowPhases = []flow.Phase{flow.PhaseInit, flow.PhaseMain, PhaseDebug, flow.PhaseDefer} }
Then in your package, workflow will execute the steps in PhaseDebug after PhaseMain, before PhaseDefer.
Functions ¶
Types ¶
type Adapter ¶
type AddStep ¶
func Step ¶
Step declares Step ready to be added into Workflow.
The main difference between Step() and Steps() is that, Step() allows to add Input and InputDependsOn for the Step.
Step(a).Input(func(ctx context.Context, a *A) error { // fill a }))
func (AddStep[S]) Input ¶
Input adds Input callback for the Step(s).
Input callback will be called before Do, and the order will respect the order of declarations.
Step(a). Input(/* 1. this Input will be called first */). InputDependsOn(Adapt(up, /* 2. then receive Output from up */)). Input(/* 3. this Input is after up's Output */) Step(a).Input(/* 4. this Input is after all */)
func (AddStep[S]) InputDependsOn ¶
InputDependsOn declares dependency between Steps, and with feeding data from Upstream to Downstream.
It's useful when the Downstream needs some data from Upstream, and the data is not available until Upstream is done. The Input callback will ignore the Upstream's result as long as it's terminated.
Due to limitation of Go's generic type system, Use Adapt function to workaround the type check.
Step(down).InputDependsOn( Adapt(up, func(_ context.Context, u *Up, d *Down) error { // fill Down from Up // here Up is terminated, and Down has not started yet }), )
func (AddStep[S]) Retry ¶
func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
type AddSteps ¶
type AddSteps map[Steper]*StepConfig
func Pipe ¶
Pipe creates a pipeline in Workflow.
workflow.Add( Pipe(a, b, c), // a -> b -> c )
The above code is equivalent to:
workflow.Add( Step(b).DependsOn(a), Step(c).DependsOn(b), )
func Steps ¶
Steps declares a series of Steps ready to be added into Workflow.
The Steps declared are mutually independent.
workflow.Add( Steps(a, b, c), // a, b, c will be executed in parallel Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel )
func (AddSteps) DependsOn ¶
DependsOn declares dependency on the given Steps.
Step(a).DependsOn(b, c)
Then b, c should happen-before a.
func (AddSteps) Done ¶
func (as AddSteps) Done() map[Steper]*StepConfig
func (AddSteps) Retry ¶
func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps
Retry customize how the Step should be retried.
If it's never called, the Step will not be retried. The RetryOption has a DefaultRetryOption as base to be modified.
type Condition ¶
type Condition func(ctx context.Context, ups map[Steper]StatusError) StepStatus
Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstreams are terminated.
var ( DefaultCondition Condition = AllSucceeded // DefaultIsCanceled is used to determine whether an error is being regarded as canceled. DefaultIsCanceled = func(err error) bool { switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, ErrCancel{}): return true } return false } )
type ErrCancel ¶
type ErrCancel struct {
// contains filtered or unexported fields
}
type ErrCycleDependency ¶
There is a cycle-dependency in your Workflow!!!
func (ErrCycleDependency) Error ¶
func (e ErrCycleDependency) Error() string
type ErrSkip ¶
type ErrSkip struct {
// contains filtered or unexported fields
}
type ErrUnexpectStepInitStatus ¶
type ErrUnexpectStepInitStatus map[Steper]StepStatus
Step status is not Pending when Workflow starts to run.
func (ErrUnexpectStepInitStatus) Error ¶
func (e ErrUnexpectStepInitStatus) Error() string
type ErrWorkflow ¶
type ErrWorkflow map[Steper]StatusError
ErrWorkflow contains all errors reported from terminated Steps in Workflow.
Keys are root Steps, values are its status and error.
func (ErrWorkflow) Error ¶
func (e ErrWorkflow) Error() string
ErrWorkflow will be printed as:
Step: [Status] error message
func (ErrWorkflow) IsNil ¶
func (e ErrWorkflow) IsNil() bool
func (ErrWorkflow) MarshalJSON ¶
func (e ErrWorkflow) MarshalJSON() ([]byte, error)
MarshalJSON allows us to marshal ErrWorkflow to json.
{ "Step": { "status": "Status", "error": "error message" } }
func (ErrWorkflow) Unwrap ¶
func (e ErrWorkflow) Unwrap() []error
type Function ¶
type Function[I, O any] struct { Name string Input I Output O DoFunc func(context.Context, I) (O, error) }
Function wraps an arbitrary function as a Step.
type MockStep ¶
MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.
type NamedStep ¶
NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.
type NonOpStep ¶
type NonOpStep struct {
Name string
}
this will be used as a virtual node to handle the dependency of steps
type Notify ¶
type Notify struct { BeforeStep func(ctx context.Context, step Steper) context.Context AfterStep func(ctx context.Context, step Steper, err error) }
Notify will be called before and after each step being executed.
Example ¶
workflow := new(Workflow) workflow.Add( Step(Func("dummy step", func(ctx context.Context) error { fmt.Println("inside step") return fmt.Errorf("step error") })), ).Options( WithNotify(Notify{ BeforeStep: func(ctx context.Context, step Steper) context.Context { fmt.Printf("before step: %s\n", step) return ctx }, AfterStep: func(ctx context.Context, step Steper, err error) { fmt.Printf("after step: %s error: %s\n", step, err) }, }), ) _ = workflow.Do(context.Background())
Output: before step: dummy step inside step after step: dummy step error: step error
type Phase ¶
type Phase string
Phase clusters Steps into different execution phases.
Workflow supports three built-in phases: Init, Main and Defer. It derives from the below common go pattern:
func init() {} func main() { defer func() {} }
- Only all Steps in previous phase terminated, the next phase will start. - Even if the steps in previous phase are not successful, the next phase will still start. - The order of steps in the same phase is not guaranteed. (defer is not stack!)
Customized phase can be added to WorkflowPhases.
type RetryOption ¶
type RetryOption struct { Timeout time.Duration // 0 means no timeout, it's per-retry timeout Attempts uint64 // 0 means no limit StopIf func(ctx context.Context, attempt uint64, since time.Duration, err error) bool Backoff backoff.BackOff Notify backoff.Notify Timer backoff.Timer }
RetryOption customizes retry behavior of a Step in Workflow.
type Set ¶
type Set[T comparable] map[T]struct{}
type State ¶
type State struct { StatusError Config *StepConfig sync.RWMutex }
State is the internal state of a Step in a Workflow.
It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.
func (*State) AddUpstream ¶
func (*State) GetStatus ¶
func (s *State) GetStatus() StepStatus
func (*State) GetStatusError ¶
func (s *State) GetStatusError() StatusError
func (*State) MergeConfig ¶
func (s *State) MergeConfig(sc *StepConfig)
func (*State) Option ¶
func (s *State) Option() *StepOption
func (*State) SetStatus ¶
func (s *State) SetStatus(ss StepStatus)
type StatusError ¶
type StatusError struct { Status StepStatus Err error }
StatusError contains the status and error of a Step.
func (StatusError) Error ¶
func (e StatusError) Error() string
StatusError will be printed as:
[Status] error message
func (StatusError) MarshalJSON ¶
func (e StatusError) MarshalJSON() ([]byte, error)
MarshalJSON allows us to marshal StatusError to json.
{ "status": "Status", "error": "error message" }
func (StatusError) Unwrap ¶
func (e StatusError) Unwrap() error
type StepConfig ¶
type StepConfig struct { Upstreams Set[Steper] // Upstreams of the Step, means these Steps should happen-before this Step Input func(context.Context) error // Input callback of the Step, will be called before Do Option func(*StepOption) // Option customize the Step settings }
func (*StepConfig) AddOption ¶
func (sc *StepConfig) AddOption(opt func(*StepOption))
func (*StepConfig) Merge ¶
func (sc *StepConfig) Merge(other *StepConfig)
type StepOption ¶
type StepOption struct { RetryOption *RetryOption // RetryOption customize how the Step should be retried, default (nil) means no retry. Condition Condition // Condition decides whether Workflow should execute the Step, default to DefaultCondition. Timeout *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout. }
type StepStatus ¶
type StepStatus string
StepStatus describes the status of a Step.
const ( Pending StepStatus = "" Running StepStatus = "Running" Failed StepStatus = "Failed" Succeeded StepStatus = "Succeeded" Canceled StepStatus = "Canceled" Skipped StepStatus = "Skipped" )
func AllSucceeded ¶
func AllSucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus
AllSucceeded: all Upstreams are Succeeded
func Always ¶
func Always(context.Context, map[Steper]StatusError) StepStatus
Always: as long as all Upstreams are terminated
func AnyFailed ¶
func AnyFailed(ctx context.Context, ups map[Steper]StatusError) StepStatus
AnyFailed: any Upstream is Failed
func BeCanceled ¶
func BeCanceled(ctx context.Context, ups map[Steper]StatusError) StepStatus
BeCanceled: only run when the workflow is canceled
func (StepStatus) IsTerminated ¶
func (s StepStatus) IsTerminated() bool
func (StepStatus) String ¶
func (s StepStatus) String() string
type StepTree ¶
StepTree is a tree data structure of steps, it helps Workflow tracks Nested Steps.
Why StepTree is needed? ¶
What if someone add a Step and its Nested Step to Workflow?
doSomeThing := &DoSomeThing{} decorated := &Docorator{Steper: step} workflow.Add( Step(doSomeThing), Step(decorated), )
docorated.Do() will call doSomeThing.Do() internally, and apparently, we don't want doSomeThing.Do() being called twice.
StepTree is the solution to the above questions.
What is StepTree? ¶
Let's dive into the definitions, if some Step wrap another Step, then
type Parent struct { Child Steper } type Parent struct { // This Parent "branches" Children []Steper }
Then we can draw a tree like:
┌────┐ ┌────┐ ┌────┐ │ R1 │ │ R2 │ │ R3 │ └─┬──┘ └─┬──┘ └─┬──┘ ┌─┴──┐ ┌─┴──┐ ┌─┴──┐ │ L1 │ │ T2 │ │ B3 │ └────┘ └─┬──┘ └─┬──┘ │ ┌──┴────┐ ┌─┴──┐ ┌─┴──┐ ┌─┴──┐ │ L2 │ │ L3 │ │ T3 │ └────┘ └────┘ └─┬──┘ ┌─┴──┐ │ L4 │ └────┘
Where
- [R]oot: the root Step, there isn't other Step wrapping it.
- [L]eaf: the leaf Step, it doesn't wrap any Step inside.
- [T]runk: the trunk Step, it has method Unwrap() Steper, one Child.
- [B]ranch: the branch Step, it has method Unwrap() []Steper, multiple Children.
Then the StepTree built for the above tree is:
StepTree{ R1: R1, // root's value is itself L1: R1, T2: R2, // trunk is "trasparent" L2: R2, B3: R3, L3: B3, // to the lowest [B]ranch ancestor T3: B3, L4: B3, ... }
StepTree is a data structure that
- keys are all Steps in track
- values are the root of that Step, or lowest ancestor that branches.
If we consider Workflow into the tree, all sub-Workflow are "branch" Steps.
The contract between Nested Step and Workflow is:
Once a Step "wrap" other Steps, it should have responsibility to orchestrate them.
So from the Workflow's perspective, it only needs to orchestrate the root Steps, to make sure all Steps are executed in the right order.
type Steper ¶
Steper describes the requirement for a Step, which is basic unit of a Workflow.
Implement this interface to allow the power of Workflow to orchestrate your Steps. Notice your implementation should be a pointer type.
type StringerNamedStep ¶
func (*StringerNamedStep) String ¶
func (sns *StringerNamedStep) String() string
func (*StringerNamedStep) Unwrap ¶
func (sns *StringerNamedStep) Unwrap() Steper
type Workflow ¶
type Workflow struct { DontPanic bool // whether recover panic from Step(s) // contains filtered or unexported fields }
Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).
The Steps are connected via dependency, use Step(), Steps() or Pipe() to add Steps into Workflow.
workflow.Add( Step(a), Steps(b, c).DependsOn(a), // a -> b, c Pipe(d, e, f), // d -> e -> f )
Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine. Workflow guarantees that
Before a Step goroutine starts, all its Upstream Steps are terminated, and registered Input callbacks are called.
Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports Workflow-level configuration, check WorkflowOption for details. Workflow supports executing Steps phase in phase, check Phase for details. Workflow supports Nested Steps, check Is(), As() and StepTree for details.
func (*Workflow) Add ¶
func (w *Workflow) Add(was ...WorkflowAdder) *Workflow
Add Steps into Workflow in phase Main.
func (*Workflow) Defer ¶
func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow
Defer adds Steps into Workflow in phase Defer.
func (*Workflow) Do ¶
Do starts the Step execution in topological order, and waits until all Steps terminated.
Do will block the current goroutine.
func (*Workflow) Init ¶
func (w *Workflow) Init(was ...WorkflowAdder) *Workflow
Init adds Steps into Workflow in phase Init.
func (*Workflow) IsPhaseTerminated ¶
func (*Workflow) IsTerminated ¶
IsTerminated returns true if all Steps terminated.
func (*Workflow) Options ¶
func (s *Workflow) Options(opts ...WorkflowOption) *Workflow
func (*Workflow) PhaseAdd ¶
func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow
PhaseAdd add Steps into specific phase.
func (*Workflow) StateOf ¶
StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.
func (*Workflow) UpstreamOf ¶
func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError
UpstreamOf returns all upstream Steps and their status / error of the Step.
type WorkflowAdder ¶
type WorkflowAdder interface {
Done() map[Steper]*StepConfig
}
Implement this interface to be added into Workflow!
type WorkflowOption ¶
type WorkflowOption func(*Workflow)
WorkflowOption alters the behavior of a Workflow.
func WithClock ¶
func WithClock(clock clock.Clock) WorkflowOption
func WithMaxConcurrency ¶
func WithMaxConcurrency(n int) WorkflowOption
WithMaxConcurrency limits the max concurrency of Steps in StepStatusRunning.
func WithNotify ¶
func WithNotify(notify Notify) WorkflowOption