Documentation ¶
Index ¶
- Variables
- func As[T Steper](s Steper) []T
- func Is[T Steper](s Steper) bool
- func Keys[M ~map[K]V, K comparable, V any](m M) []K
- func LogValue(step Steper) logValue
- func String(step Steper) string
- type AddStep
- func (as AddStep[S]) AfterStep(afters ...AfterStep) AddStep[S]
- func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]
- func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]
- func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]
- func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
- func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]
- func (as AddStep[S]) When(when Condition) AddStep[S]
- type AddSteps
- func (as AddSteps) AfterStep(afters ...AfterStep) AddSteps
- func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps
- func (as AddSteps) DependsOn(ups ...Steper) AddSteps
- func (as AddSteps) Done() map[Steper]*StepConfig
- func (as AddSteps) Merge(other AddSteps) AddSteps
- func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps
- func (as AddSteps) Timeout(timeout time.Duration) AddSteps
- func (as AddSteps) When(cond Condition) AddSteps
- type AfterStep
- type BeforeStep
- type Condition
- type ErrBefore
- type ErrCancel
- type ErrCycleDependency
- type ErrPanic
- type ErrSkip
- type ErrSucceed
- type ErrUnexpectedStepInitStatus
- type ErrWorkflow
- type ErrWrappedStepAlreadyInTree
- type Function
- func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]
- func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]
- func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]
- func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]
- type MockStep
- type NamedStep
- type NonOpStep
- type Phase
- type RetryOption
- type Set
- type State
- func (s *State) AddUpstream(up Steper)
- func (s *State) After(ctx context.Context, step Steper, err error) error
- func (s *State) Before(ctx context.Context, step Steper) (context.Context, error)
- func (s *State) GetError() error
- func (s *State) GetStatus() StepStatus
- func (s *State) GetStatusError() StatusError
- func (s *State) MergeConfig(sc *StepConfig)
- func (s *State) Option() *StepOption
- func (s *State) SetError(err error)
- func (s *State) SetStatus(ss StepStatus)
- func (s *State) Upstreams() Set[Steper]
- type StatusError
- type StepBuilder
- type StepConfig
- type StepOption
- type StepStatus
- func AllSucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus
- func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StatusError) StepStatus
- func Always(context.Context, map[Steper]StatusError) StepStatus
- func AnyFailed(ctx context.Context, ups map[Steper]StatusError) StepStatus
- func BeCanceled(ctx context.Context, ups map[Steper]StatusError) StepStatus
- func StatusFromError(err error) StepStatus
- type StepTree
- type Steper
- type StringerNamedStep
- type Workflow
- func (w *Workflow) Add(was ...WorkflowAdder) *Workflow
- func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow
- func (w *Workflow) Do(ctx context.Context) error
- func (w *Workflow) Empty() bool
- func (w *Workflow) Init(was ...WorkflowAdder) *Workflow
- func (w *Workflow) IsPhaseTerminated(phase Phase) bool
- func (w *Workflow) IsTerminated() bool
- func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow
- func (w *Workflow) PhaseOf(step Steper) Phase
- func (w *Workflow) RootOf(step Steper) Steper
- func (w *Workflow) StateOf(step Steper) *State
- func (w *Workflow) Steps() []Steper
- func (w *Workflow) Unwrap() []Steper
- func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError
- type WorkflowAdder
Constants ¶
This section is empty.
Variables ¶
var DefaultRetryOption = RetryOption{
Backoff: backoff.NewExponentialBackOff(),
Attempts: 3,
}
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")
var WorkflowPhases = []Phase{PhaseInit, PhaseMain, PhaseDefer}
WorkflowPhases defines the order of phases Workflow executes. New phases can be added to this, please support the built-in phases.
i.e.
var PhaseDebug flow.Phase = "Debug" func init() { flow.WorkflowPhases = []flow.Phase{flow.PhaseInit, flow.PhaseMain, PhaseDebug, flow.PhaseDefer} }
Then in your package, workflow will execute the steps in PhaseDebug after PhaseMain, before PhaseDefer.
Functions ¶
func As ¶
As finds all steps in the tree of step that matches target type, and returns them. The sequence of the returned steps is preorder traversal.
func Keys ¶ added in v0.0.3
func Keys[M ~map[K]V, K comparable, V any](m M) []K
Keys returns the keys of the map m. The keys will be in an indeterminate order.
Types ¶
type AddStep ¶
func Step ¶
Step declares Step ready to be added into Workflow.
The main difference between Step() and Steps() is that, Step() allows to add Input and InputDependsOn for the Step.
Step(a).Input(func(ctx context.Context, a *A) error { // fill a }))
func (AddStep[S]) BeforeStep ¶ added in v0.0.3
func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]
func (AddStep[S]) Input ¶
Input adds BeforeStep callback for the Step(s).
Input callbacks will be called before Do, and the order will respect the order of declarations.
Step(a). Input(/* 1. this Input will be called first */). Input(/* 2. this Input is after up's Output */) Step(a).Input(/* 3. this Input is after all */)
The Input callbacks are executed at runtime and per try.
func (AddStep[S]) Retry ¶
func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
type AddSteps ¶
type AddSteps map[Steper]*StepConfig
func BatchPipe ¶ added in v0.0.3
BatchPipe creates a batched pipeline in Workflow.
workflow.Add( BatchPipe( Steps(a, b), Steps(c, d, e), Steps(f), ), )
The above code is equivalent to:
workflow.Add( Steps(c, d, e).DependsOn(a, b), Steps(f).DependsOn(c, d, e), )
func Pipe ¶
Pipe creates a pipeline in Workflow.
workflow.Add( Pipe(a, b, c), // a -> b -> c )
The above code is equivalent to:
workflow.Add( Step(b).DependsOn(a), Step(c).DependsOn(b), )
func Steps ¶
Steps declares a series of Steps ready to be added into Workflow.
The Steps declared are mutually independent.
workflow.Add( Steps(a, b, c), // a, b, c will be executed in parallel Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel )
func (AddSteps) AfterStep ¶ added in v0.0.3
AfterStep adds AfterStep callback for the Step(s).
The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. The order of execution will respect the order of declarations. The AfterStep callbacks are able to change the error returned by Do. The AfterStep callbacks are executed at runtime and per try.
func (AddSteps) BeforeStep ¶ added in v0.0.3
func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps
BeforeStep adds BeforeStep callback for the Step(s).
The BeforeStep callback will be called before Do, and return when first error occurs. The order of execution will respect the order of declarations. The BeforeStep callbacks are able to change the context.Context feed into Do. The BeforeStep callbacks are executed at runtime and per try.
func (AddSteps) DependsOn ¶
DependsOn declares dependency on the given Steps.
Step(a).DependsOn(b, c)
Then b, c should happen-before a.
func (AddSteps) Done ¶
func (as AddSteps) Done() map[Steper]*StepConfig
Done implements WorkflowAdder
func (AddSteps) Retry ¶
func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps
Retry customize how the Step should be retried.
Step will be retried as long as this option is configured.
w.Add( Step(a), // not retry Step(b).Retry(func(opt *RetryOption) { // will retry 3 times opt.MaxAttempts = 3 }), Step(c).Retry(nil), // will use DefaultRetryOption! )
type BeforeStep ¶ added in v0.0.3
BeforeStep defines callback being called BEFORE step being executed.
type Condition ¶
type Condition func(ctx context.Context, ups map[Steper]StatusError) StepStatus
Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstream Steps are terminated.
var ( // DefaultCondition used in workflow, defaults to AllSucceeded DefaultCondition Condition = AllSucceeded // DefaultIsCanceled is used to determine whether an error is being regarded as canceled. DefaultIsCanceled = func(err error) bool { switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), StatusFromError(err) == Canceled: return true } return false } )
type ErrBefore ¶ added in v0.0.3
type ErrBefore struct {
// contains filtered or unexported fields
}
type ErrCancel ¶
type ErrCancel struct {
// contains filtered or unexported fields
}
type ErrCycleDependency ¶
ErrCycleDependency means there is a cycle-dependency in your Workflow!!!
func (ErrCycleDependency) Error ¶
func (e ErrCycleDependency) Error() string
type ErrSkip ¶
type ErrSkip struct {
// contains filtered or unexported fields
}
type ErrSucceed ¶
type ErrSucceed struct {
// contains filtered or unexported fields
}
func Succeed ¶
func Succeed(err error) ErrSucceed
Succeed marks the current step as `Succeeded`, while still reports the error.
func (ErrSucceed) Unwrap ¶
func (e ErrSucceed) Unwrap() error
type ErrUnexpectedStepInitStatus ¶ added in v0.0.3
type ErrUnexpectedStepInitStatus map[Steper]StepStatus
ErrUnexpectedStepInitStatus is Step status not Pending when Workflow starts to run.
func (ErrUnexpectedStepInitStatus) Error ¶ added in v0.0.3
func (e ErrUnexpectedStepInitStatus) Error() string
type ErrWorkflow ¶
type ErrWorkflow map[Steper]StatusError
ErrWorkflow contains all errors reported from terminated Steps in Workflow.
Keys are root Steps, values are its status and error.
func (ErrWorkflow) AllSucceeded ¶
func (e ErrWorkflow) AllSucceeded() bool
func (ErrWorkflow) AllSucceededOrSkipped ¶
func (e ErrWorkflow) AllSucceededOrSkipped() bool
func (ErrWorkflow) Error ¶
func (e ErrWorkflow) Error() string
ErrWorkflow will be printed as:
Step: [Status] error message
func (ErrWorkflow) MarshalJSON ¶
func (e ErrWorkflow) MarshalJSON() ([]byte, error)
MarshalJSON allows us to marshal ErrWorkflow to json.
{ "Step": { "status": "Status", "error": "error message" } }
func (ErrWorkflow) Unwrap ¶
func (e ErrWorkflow) Unwrap() []error
type ErrWrappedStepAlreadyInTree ¶ added in v0.0.3
type ErrWrappedStepAlreadyInTree struct { StepAlreadyThere Steper NewAncestor Steper OldAncestor Steper }
func (ErrWrappedStepAlreadyInTree) Error ¶ added in v0.0.3
func (e ErrWrappedStepAlreadyInTree) Error() string
type Function ¶
type Function[I, O any] struct { Name string Input I Output O DoFunc func(context.Context, I) (O, error) }
Function wraps an arbitrary function as a Step.
type MockStep ¶
MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.
type NamedStep ¶
NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.
type NonOpStep ¶
type NonOpStep struct {
Name string
}
this will be used as a virtual node to handle the dependency of steps
type Phase ¶
type Phase string
Phase groups Steps into different execution phases.
Workflow supports three built-in phases: Init, Main and Defer. It derives from the below common go pattern:
func init() {} func main() { defer func() {} }
- Only all Steps in previous phase terminated, the next phase will start. - Even if the steps in previous phase are not successful, the next phase will always start. - The order of steps in the same phase is not guaranteed. (defer here is not stack!)
Customized phase can be added to WorkflowPhases.
type RetryOption ¶
type RetryOption struct { TimeoutPerTry time.Duration // 0 means no timeout Attempts uint64 // 0 means no limit StopIf func(ctx context.Context, attempt uint64, since time.Duration, err error) bool Backoff backoff.BackOff Notify backoff.Notify Timer backoff.Timer }
RetryOption customizes retry behavior of a Step in Workflow.
type Set ¶
type Set[T comparable] map[T]struct{}
type State ¶
type State struct { StatusError Config *StepConfig sync.RWMutex }
State is the internal state of a Step in a Workflow.
It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.
func (*State) AddUpstream ¶
func (*State) GetStatus ¶
func (s *State) GetStatus() StepStatus
func (*State) GetStatusError ¶
func (s *State) GetStatusError() StatusError
func (*State) MergeConfig ¶
func (s *State) MergeConfig(sc *StepConfig)
func (*State) Option ¶
func (s *State) Option() *StepOption
func (*State) SetStatus ¶
func (s *State) SetStatus(ss StepStatus)
type StatusError ¶
type StatusError struct { Status StepStatus Err error }
StatusError contains the status and error of a Step.
func (StatusError) Error ¶
func (e StatusError) Error() string
StatusError will be printed as:
[Status] error message
func (StatusError) MarshalJSON ¶
func (e StatusError) MarshalJSON() ([]byte, error)
MarshalJSON allows us to marshal StatusError to json.
{ "status": "Status", "error": "error message" }
func (StatusError) Unwrap ¶
func (e StatusError) Unwrap() error
type StepBuilder ¶ added in v0.0.4
type StepBuilder interface {
BuildStep()
}
StepBuilder allows to build the internal Steps when adding into Workflow.
type StepImpl struct {} func (s *StepImpl) Do(ctx context.Context) error { /* ... */ } func (s *StepImpl) BuildStep() { /* build internal steps */ } workflow.Add( flow.Step(new(StepImpl)), // here will call StepImpl.BuildStep() implicitly )
type StepConfig ¶
type StepConfig struct { Upstreams Set[Steper] // Upstreams of the Step, means these Steps should happen-before this Step Before BeforeStep // Before callbacks of the Step, will be called before Do After AfterStep // After callbacks of the Step, will be called before Do Option func(*StepOption) // Option customize the Step settings }
func (*StepConfig) AddAfter ¶ added in v0.0.3
func (sc *StepConfig) AddAfter(after AfterStep)
func (*StepConfig) AddBefore ¶ added in v0.0.3
func (sc *StepConfig) AddBefore(before BeforeStep)
func (*StepConfig) AddOption ¶
func (sc *StepConfig) AddOption(opt func(*StepOption))
func (*StepConfig) Merge ¶
func (sc *StepConfig) Merge(other *StepConfig)
type StepOption ¶
type StepOption struct { RetryOption *RetryOption // RetryOption customize how the Step should be retried, default (nil) means no retry. Condition Condition // Condition decides whether Workflow should execute the Step, default to DefaultCondition. Timeout *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout. }
type StepStatus ¶
type StepStatus string
StepStatus describes the status of a Step.
const ( Pending StepStatus = "" Running StepStatus = "Running" Failed StepStatus = "Failed" Succeeded StepStatus = "Succeeded" Canceled StepStatus = "Canceled" Skipped StepStatus = "Skipped" )
func AllSucceeded ¶
func AllSucceeded(ctx context.Context, ups map[Steper]StatusError) StepStatus
AllSucceeded: all Upstreams are Succeeded
func AllSucceededOrSkipped ¶ added in v0.0.3
func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StatusError) StepStatus
AllSucceededOrSkipped: all Upstreams are Succeeded or Skipped
func Always ¶
func Always(context.Context, map[Steper]StatusError) StepStatus
Always: as long as all Upstreams are terminated
func AnyFailed ¶
func AnyFailed(ctx context.Context, ups map[Steper]StatusError) StepStatus
AnyFailed: any Upstream is Failed
func BeCanceled ¶
func BeCanceled(ctx context.Context, ups map[Steper]StatusError) StepStatus
BeCanceled: only run when the workflow is canceled
func StatusFromError ¶
func StatusFromError(err error) StepStatus
StatusFromError gets the StepStatus from error.
func (StepStatus) IsTerminated ¶
func (s StepStatus) IsTerminated() bool
func (StepStatus) String ¶
func (s StepStatus) String() string
type StepTree ¶
StepTree is a tree data structure of steps, it helps Workflow tracks Nested Steps.
Why StepTree is needed? ¶
What if someone add a Step and its Nested Step to Workflow?
doSomeThing := &DoSomeThing{} decorated := &Decorator{Steper: step} workflow.Add( Step(doSomeThing), Step(decorated), )
docorated.Do() will call doSomeThing.Do() internally, and apparently, we don't want doSomeThing.Do() being called twice.
StepTree is the solution to the above questions.
What is StepTree? ¶
Let's dive into the definitions, if some Step wrap another Step, then
type Parent struct { Child Steper } type Parent struct { // This Parent "branches" Children []Steper }
Then we can draw a tree like:
┌────┐ ┌────┐ ┌────┐ │ R1 │ │ R2 │ │ R3 │ └─┬──┘ └─┬──┘ └─┬──┘ ┌─┴──┐ ┌─┴──┐ ┌─┴──┐ │ L1 │ │ T2 │ │ B3 │ └────┘ └─┬──┘ └─┬──┘ │ ┌──┴────┐ ┌─┴──┐ ┌─┴──┐ ┌─┴──┐ │ L2 │ │ L3 │ │ T3 │ └────┘ └────┘ └─┬──┘ ┌─┴──┐ │ L4 │ └────┘
Where
- [R]oot: the root Step, there isn't other Step wrapping it.
- [L]eaf: the leaf Step, it doesn't wrap any Step inside.
- [T]runk: the trunk Step, it has method Unwrap() Steper, one Child.
- [B]ranch: the branch Step, it has method Unwrap() []Steper, multiple Children.
Then the StepTree built for the above tree is:
StepTree{ R1: R1, // root's value is itself L1: R1, T2: R2, L2: T2, B3: R3, L3: B3, T3: B3, L4: T3, ... }
StepTree is a data structure that
- keys are all Steps in track
- values are the ancestor Steps of the corresponding key
If we consider sub-workflow into the tree, all sub-Workflow are "branch" Steps.
The contract between Nested Step and Workflow is:
Once a Step "wrap" other Steps, it should have responsibility to orchestrate the inner steps.
So from the Workflow's perspective, it only needs to orchestrate the root Steps, to make sure all Steps are executed in the right order.
func (StepTree) Add ¶
Add a step and all it's wrapped steps to the tree.
Return the steps that were roots, but now are wrapped and taken place by the new root step.
- If step is already in the tree, it's no-op.
- If step is new, the step will becomes a new root. and all its inner steps will be added to the tree.
- If one of the inner steps is already in tree, panic.
type Steper ¶
Steper describes the requirement for a Step, which is basic unit of a Workflow.
Implement this interface to allow Workflow orchestrating your Steps.
Notice Steper will be saved in Workflow as map key, so it's supposed to be 'comparable'.
type StringerNamedStep ¶
func (*StringerNamedStep) String ¶
func (sns *StringerNamedStep) String() string
func (*StringerNamedStep) Unwrap ¶
func (sns *StringerNamedStep) Unwrap() Steper
type Workflow ¶
type Workflow struct { MaxConcurrency int // MaxConcurrency limits the max concurrency of running Steps DontPanic bool // DontPanic suppress panics, instead return it as error OKToSkip bool // OKToSkip returns nil if all Steps succeeded or skipped, otherwise only return nil if all Steps succeeded Clock clock.Clock // Clock for unit test // contains filtered or unexported fields }
Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).
The Steps are connected via dependency, use Step(), Steps() or Pipe(), BatchPipe() to add Steps into Workflow.
workflow.Add( Step(a), Steps(b, c).DependsOn(a), // a -> b, c Pipe(d, e, f), // d -> e -> f BatchPipe( Steps(g, h), Steps(i, j), ), // g, h -> i, j )
Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine.
Workflow guarantees that
Before a Step goroutine starts, all its Upstream Steps are `terminated`.
Check `StepStatus` and `Condition` for details.
Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports executing Steps phase in phase, check Phase for details. Workflow supports Nested Steps, check Is(), As() and StepTree for details.
func (*Workflow) Add ¶
func (w *Workflow) Add(was ...WorkflowAdder) *Workflow
Add Steps into Workflow in phase Main.
func (*Workflow) Defer ¶
func (w *Workflow) Defer(was ...WorkflowAdder) *Workflow
Defer adds Steps into Workflow in phase Defer.
func (*Workflow) Do ¶
Do starts the Step execution in topological order, and waits until all Steps terminated.
Do will block the current goroutine.
func (*Workflow) Init ¶
func (w *Workflow) Init(was ...WorkflowAdder) *Workflow
Init adds Steps into Workflow in phase Init.
func (*Workflow) IsPhaseTerminated ¶
IsPhaseTerminated returns true if all Steps in the phase terminated.
func (*Workflow) IsTerminated ¶
IsTerminated returns true if all Steps terminated.
func (*Workflow) PhaseAdd ¶
func (w *Workflow) PhaseAdd(phase Phase, was ...WorkflowAdder) *Workflow
PhaseAdd add Steps into specific phase.
func (*Workflow) StateOf ¶
StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.
func (*Workflow) UpstreamOf ¶
func (w *Workflow) UpstreamOf(step Steper) map[Steper]StatusError
UpstreamOf returns all upstream Steps and their status and error.
type WorkflowAdder ¶
type WorkflowAdder interface {
Done() map[Steper]*StepConfig
}
WorkflowAdder is addable into Workflow!
func Mock ¶ added in v0.0.3
func Mock[T Steper](step T, do func(context.Context) error) WorkflowAdder
Mock helps to mock a step in Workflow.
w.Add( flow.Mock(step, func(ctx context.Context) error {}), )
func Name ¶ added in v0.0.3
func Name(step Steper, name string) WorkflowAdder
Name can rename a Step.
workflow.Add( Step(a), Name(a, "StepA"), )
Attention: Name will wrap the original Step
func NameFunc ¶ added in v0.0.5
func NameFunc(step Steper, fn func() string) WorkflowAdder
NameFunc can rename a Step with a runtime function.
func NameStringer ¶ added in v0.0.3
func NameStringer(step Steper, name fmt.Stringer) WorkflowAdder
NameStringer can rename a Step with a fmt.Stringer, which allows String() method to be called at runtime.
func Names ¶ added in v0.0.5
func Names(m map[Steper]string) WorkflowAdder
Names can rename multiple Steps.
workflow.Add( Names(map[Steper]string{ stepA: "A", stepB: "B", }, )