Documentation ¶
Index ¶
- Constants
- Variables
- func As[T Steper](s Steper) []T
- func ConditionOrDefault(cond Condition) func(context.Context, map[Steper]StepResult) StepStatus
- func Has[T Steper](s Steper) bool
- func HasStep(step, target Steper) bool
- func Keys[M ~map[K]V, K comparable, V any](m M) []K
- func LogValue(step Steper) logValue
- func String(step Steper) string
- func Values[M ~map[K]V, K comparable, V any](m M) []V
- func WithStackTraces(skip, depth int, ignores ...func(runtime.Frame) bool) func(error) error
- type AddStep
- func (as AddStep[S]) AfterStep(afters ...AfterStep) AddStep[S]
- func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]
- func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]
- func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]
- func (as AddStep[S]) Output(fns ...func(context.Context, S) error) AddStep[S]
- func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
- func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]
- func (as AddStep[S]) When(when Condition) AddStep[S]
- type AddSteps
- func (as AddSteps) AddToWorkflow() map[Steper]*StepConfig
- func (as AddSteps) AfterStep(afters ...AfterStep) AddSteps
- func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps
- func (as AddSteps) DependsOn(ups ...Steper) AddSteps
- func (as AddSteps) Merge(others ...AddSteps) AddSteps
- func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps
- func (as AddSteps) Timeout(timeout time.Duration) AddSteps
- func (as AddSteps) When(cond Condition) AddSteps
- type AfterStep
- type BeforeStep
- type BranchCheck
- type BranchCheckFunc
- type Builder
- type Condition
- type ErrBeforeStep
- type ErrCancel
- type ErrCycleDependency
- type ErrPanic
- type ErrSkip
- type ErrSucceed
- type ErrWithStackTraces
- type ErrWorkflow
- type Function
- func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]
- func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]
- func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]
- func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]
- type IfBranch
- type MockStep
- type NamedStep
- type NoOpStep
- type RetryEvent
- type RetryOption
- type Set
- type State
- func (s *State) AddUpstream(up Steper)
- func (s *State) After(ctx context.Context, step Steper, err error) error
- func (s *State) Before(ctx context.Context, step Steper) (context.Context, error)
- func (s *State) GetError() error
- func (s *State) GetStatus() StepStatus
- func (s *State) GetStepResult() StepResult
- func (s *State) MergeConfig(sc *StepConfig)
- func (s *State) Option() *StepOption
- func (s *State) SetError(err error)
- func (s *State) SetStatus(ss StepStatus)
- func (s *State) Upstreams() Set[Steper]
- type StepBuilder
- type StepConfig
- type StepOption
- type StepResult
- type StepStatus
- func AllSucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus
- func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StepResult) StepStatus
- func Always(context.Context, map[Steper]StepResult) StepStatus
- func AnyFailed(ctx context.Context, ups map[Steper]StepResult) StepStatus
- func AnySucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus
- func BeCanceled(ctx context.Context, ups map[Steper]StepResult) StepStatus
- func StatusFromError(err error) StepStatus
- type Steper
- type StringerNamedStep
- type SwitchBranch
- func (s *SwitchBranch[T]) AddToWorkflow() map[Steper]*StepConfig
- func (s *SwitchBranch[T]) Case(step Steper, check BranchCheckFunc[T]) *SwitchBranch[T]
- func (s *SwitchBranch[T]) Cases(steps []Steper, check BranchCheckFunc[T]) *SwitchBranch[T]
- func (s *SwitchBranch[T]) Default(step ...Steper) *SwitchBranch[T]
- func (s *SwitchBranch[T]) When(cond Condition) *SwitchBranch[T]
- type TraverseDecision
- type Workflow
- func (w *Workflow) Add(was ...Builder) *Workflow
- func (w *Workflow) Do(ctx context.Context) error
- func (w *Workflow) Empty() bool
- func (w *Workflow) IsTerminated() bool
- func (w *Workflow) Reset() error
- func (w *Workflow) RootOf(step Steper) Steper
- func (w *Workflow) StateOf(step Steper) *State
- func (w *Workflow) Steps() []Steper
- func (w *Workflow) Unwrap() []Steper
- func (w *Workflow) UpstreamOf(step Steper) map[Steper]StepResult
Constants ¶
const ( TraverseContinue = iota // TraverseContinue continue the traversal TraverseStop // TraverseStop stop and exit the traversal immediately TraverseEndBranch // TraverseEndBranch end the current branch, but continue sibling branches )
Variables ¶
var DefaultRetryOption = RetryOption{
Backoff: backoff.NewExponentialBackOff(),
Attempts: 3,
}
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")
Functions ¶
func As ¶
As finds all steps in the tree of step that matches target type, and returns them. The sequence of the returned steps is pre-order traversal.
func ConditionOrDefault ¶ added in v0.0.7
func ConditionOrDefault(cond Condition) func(context.Context, map[Steper]StepResult) StepStatus
ConditionOrDefault will use DefaultCondition if cond is nil.
func Keys ¶ added in v0.0.3
func Keys[M ~map[K]V, K comparable, V any](m M) []K
Keys returns the keys of the map m. The keys will be in an indeterminate order.
func LogValue ¶
func LogValue(step Steper) logValue
LogValue is used with log/slog, you can use it like:
logger.With("step", LogValue(step))
To prevent expensive String() calls,
logger.With("step", String(step))
func Values ¶ added in v0.1.1
func Values[M ~map[K]V, K comparable, V any](m M) []V
Values returns the values of the map m. The values will be in an indeterminate order.
Types ¶
type AddStep ¶
func Step ¶
Step declares Step ready to be added into Workflow.
The main difference between Step() and Steps() is that, Step() allows to add Input for the Step.
Step(a).Input(func(ctx context.Context, a *A) error { // fill a }))
func (AddStep[S]) BeforeStep ¶ added in v0.0.3
func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]
func (AddStep[S]) Input ¶
Input adds BeforeStep callback for the Step(s).
Input callbacks will be called before Do, and the order will respect the order of declarations.
Step(a). Input(/* 1. this Input will be called first */). Input(/* 2. this Input will be called after 1. */) Step(a).Input(/* 3. this Input is after all */)
The Input callbacks are executed at runtime and per-try.
func (AddStep[S]) Output ¶ added in v0.1.1
Output can pass the results of the Step to outer scope. Output is only triggered when the Step is successful (returns nil error).
Output actually adds AfterStep callback for the Step(s).
The Output callbacks are executed at runtime and per-try.
func (AddStep[S]) Retry ¶
func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]
type AddSteps ¶
type AddSteps map[Steper]*StepConfig
func BatchPipe ¶ added in v0.0.3
BatchPipe creates a batched pipeline in Workflow.
workflow.Add( BatchPipe( Steps(a, b), Steps(c, d, e), Steps(f), ), )
The above code is equivalent to:
workflow.Add( Steps(c, d, e).DependsOn(a, b), Steps(f).DependsOn(c, d, e), )
func Pipe ¶
Pipe creates a pipeline in Workflow.
workflow.Add( Pipe(a, b, c), // a -> b -> c )
The above code is equivalent to:
workflow.Add( Step(b).DependsOn(a), Step(c).DependsOn(b), )
func Steps ¶
Steps declares a series of Steps ready to be added into Workflow.
The Steps declared are mutually independent.
workflow.Add( Steps(a, b, c), // a, b, c will be executed in parallel Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel )
func (AddSteps) AddToWorkflow ¶ added in v0.1.0
func (as AddSteps) AddToWorkflow() map[Steper]*StepConfig
AddToWorkflow implements Builder
func (AddSteps) AfterStep ¶ added in v0.0.3
AfterStep adds AfterStep callback for the Step(s).
The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. The order of execution will respect the order of declarations. The AfterStep callbacks are able to change the error returned by Do. The AfterStep callbacks are executed at runtime and per-try.
func (AddSteps) BeforeStep ¶ added in v0.0.3
func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps
BeforeStep adds BeforeStep callback for the Step(s).
The BeforeStep callback will be called before Do, and return when first error occurs. The order of execution will respect the order of declarations. The BeforeStep callbacks are able to change the context.Context feed into Do. The BeforeStep callbacks are executed at runtime and per-try.
func (AddSteps) DependsOn ¶
DependsOn declares dependency on the given Steps.
Step(a).DependsOn(b, c)
Then b, c should happen-before a.
func (AddSteps) Retry ¶
func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps
Retry customize how the Step should be retried.
Step will be retried as long as this option is configured.
w.Add( Step(a), // not retry Step(b).Retry(func(opt *RetryOption) { // will retry 3 times opt.MaxAttempts = 3 }), Step(c).Retry(nil), // will use DefaultRetryOption! )
type BeforeStep ¶ added in v0.0.3
BeforeStep defines callback being called BEFORE step being executed.
type BranchCheck ¶ added in v0.0.7
type BranchCheck[T Steper] struct { Check BranchCheckFunc[T] OK bool Error error }
BranchCheck represents a branch to be checked.
func (*BranchCheck[T]) Do ¶ added in v0.0.7
func (bc *BranchCheck[T]) Do(ctx context.Context, target T)
type BranchCheckFunc ¶ added in v0.0.7
BranchCheckFunc checks the target and returns true if the branch should be selected.
type Builder ¶ added in v0.1.4
type Builder interface {
AddToWorkflow() map[Steper]*StepConfig
}
Builder builds a Workflow by adding Steps.
func Mock ¶ added in v0.0.3
Mock helps to mock a step in Workflow.
w.Add( flow.Mock(step, func(ctx context.Context) error {}), )
func Name ¶ added in v0.0.3
Name can rename a Step.
workflow.Add( Step(a), Name(a, "StepA"), )
Attention: Name will wrap the original Step
func NameStringer ¶ added in v0.0.3
NameStringer can rename a Step with a fmt.Stringer, which allows String() method to be called at runtime.
type Condition ¶
type Condition func(ctx context.Context, ups map[Steper]StepResult) StepStatus
Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstream Steps are terminated.
var ( // DefaultCondition used in workflow, defaults to AllSucceeded DefaultCondition Condition = AllSucceeded // DefaultIsCanceled is used to determine whether an error is being regarded as canceled. DefaultIsCanceled = func(err error) bool { switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), StatusFromError(err) == Canceled: return true } return false } )
type ErrBeforeStep ¶ added in v0.1.0
type ErrBeforeStep struct {
// contains filtered or unexported fields
}
func (ErrBeforeStep) Unwrap ¶ added in v0.1.0
func (e ErrBeforeStep) Unwrap() error
type ErrCancel ¶
type ErrCancel struct {
// contains filtered or unexported fields
}
type ErrCycleDependency ¶
ErrCycleDependency means there is a cycle-dependency in your Workflow!!!
func (ErrCycleDependency) Error ¶
func (e ErrCycleDependency) Error() string
type ErrSkip ¶
type ErrSkip struct {
// contains filtered or unexported fields
}
type ErrSucceed ¶
type ErrSucceed struct {
// contains filtered or unexported fields
}
func Succeed ¶
func Succeed(err error) ErrSucceed
Succeed marks the current step as `Succeeded`, while still reports the error.
func (ErrSucceed) Unwrap ¶
func (e ErrSucceed) Unwrap() error
type ErrWithStackTraces ¶ added in v0.0.8
ErrWithStackTraces saves stack frames into error, and prints error into
error message Stack Traces: file:line
func (ErrWithStackTraces) Error ¶ added in v0.0.8
func (e ErrWithStackTraces) Error() string
func (ErrWithStackTraces) StackTraces ¶ added in v0.0.8
func (e ErrWithStackTraces) StackTraces() []string
func (ErrWithStackTraces) Unwrap ¶ added in v0.0.10
func (e ErrWithStackTraces) Unwrap() error
type ErrWorkflow ¶
type ErrWorkflow map[Steper]StepResult
ErrWorkflow contains all errors reported from terminated Steps in Workflow.
Keys are root Steps, values are its status and error.
func (ErrWorkflow) AllSucceeded ¶
func (e ErrWorkflow) AllSucceeded() bool
func (ErrWorkflow) AllSucceededOrSkipped ¶
func (e ErrWorkflow) AllSucceededOrSkipped() bool
func (ErrWorkflow) Error ¶
func (e ErrWorkflow) Error() string
ErrWorkflow will be printed as:
Step: [Status] error message
func (ErrWorkflow) Unwrap ¶
func (e ErrWorkflow) Unwrap() []error
type Function ¶
type Function[I, O any] struct { Name string Input I Output O DoFunc func(context.Context, I) (O, error) }
Function wraps an arbitrary function as a Step.
type IfBranch ¶ added in v0.0.7
type IfBranch[T Steper] struct { Target T // the target to check BranchCheck BranchCheck[T] ThenStep []Steper ElseStep []Steper Cond Condition // Cond is the When condition for both ThenStep and ElseStep, not target Step! }
IfBranch adds target step, then and else step to workflow, and check the target step and determine which branch to go.
func If ¶ added in v0.0.7
func If[T Steper](target T, check BranchCheckFunc[T]) *IfBranch[T]
If adds a conditional branch to the workflow.
If(someStep, func(ctx context.Context, someStep *SomeStep) (bool, error) { // branch condition here, true -> Then, false -> Else. // if error is returned, then fail the selected branch step. }). Then(thenStep). Else(elseStep)
func (*IfBranch[T]) AddToWorkflow ¶ added in v0.1.0
func (i *IfBranch[T]) AddToWorkflow() map[Steper]*StepConfig
type MockStep ¶
MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.
type NamedStep ¶
NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.
type RetryEvent ¶ added in v0.1.2
RetryEvent is the event fired when a retry happens
type RetryOption ¶
type RetryOption struct { TimeoutPerTry time.Duration // 0 means no timeout Attempts uint64 // 0 means no limit // NextBackOff is called after each retry to determine the next backoff duration. // Notice if attempts limits are reach, or context timeout, or BackOff fires backoff.Stop, // this function will not be called. // // RetryEvent: the event records attempt, duration since the start, and the error of the last try. // nextBackOff: the next backoff duration calculated by the inner BackOff NextBackOff func(ctx context.Context, re RetryEvent, nextBackOff time.Duration) time.Duration Backoff backoff.BackOff Notify backoff.Notify Timer backoff.Timer }
RetryOption customizes retry behavior of a Step in Workflow.
type Set ¶
type Set[T comparable] map[T]struct{}
type State ¶
type State struct { StepResult Config *StepConfig sync.RWMutex }
State is the internal state of a Step in a Workflow.
It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.
func (*State) AddUpstream ¶
func (*State) GetStatus ¶
func (s *State) GetStatus() StepStatus
func (*State) GetStepResult ¶ added in v0.1.0
func (s *State) GetStepResult() StepResult
func (*State) MergeConfig ¶
func (s *State) MergeConfig(sc *StepConfig)
func (*State) Option ¶
func (s *State) Option() *StepOption
func (*State) SetStatus ¶
func (s *State) SetStatus(ss StepStatus)
type StepBuilder ¶ added in v0.0.4
type StepBuilder struct {
// contains filtered or unexported fields
}
StepBuilder allows to build the internal Steps when adding into Workflow.
type StepImpl struct {} func (s *StepImpl) Unwrap() []flow.Steper { return /* internal steps */ } func (s *StepImpl) Do(ctx context.Context) error { /* ... */ } func (s *StepImpl) BuildStep() { /* build internal steps */ } workflow.Add( flow.Step(new(StepImpl)), // here will call StepImpl.BuildStep() once implicitly )
func (*StepBuilder) BuildStep ¶ added in v0.0.4
func (sb *StepBuilder) BuildStep(s Steper)
BuildStep calls BuildStep() method of the Steper if it's implemented, and ensure it's called only once for each Steper.
type StepConfig ¶
type StepConfig struct { Upstreams Set[Steper] // Upstreams of the Step, means these Steps should happen-before this Step Before []BeforeStep // Before callbacks of the Step, will be called before Do After []AfterStep // After callbacks of the Step, will be called before Do Option []func(*StepOption) // Option customize the Step settings }
func (*StepConfig) Merge ¶
func (sc *StepConfig) Merge(other *StepConfig)
type StepOption ¶
type StepOption struct { RetryOption *RetryOption // RetryOption customize how the Step should be retried, default (nil) means no retry. Condition Condition // Condition decides whether Workflow should execute the Step, default to DefaultCondition. Timeout *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout. }
type StepResult ¶ added in v0.1.0
type StepResult struct { Status StepStatus Err error }
StepResult contains the status and error of a Step.
func (StepResult) Error ¶ added in v0.1.0
func (e StepResult) Error() string
StatusError will be printed as:
[Status] error message
func (StepResult) Unwrap ¶ added in v0.1.0
func (e StepResult) Unwrap() error
type StepStatus ¶
type StepStatus string
StepStatus describes the status of a Step.
const ( Pending StepStatus = "" Running StepStatus = "Running" Failed StepStatus = "Failed" Succeeded StepStatus = "Succeeded" Canceled StepStatus = "Canceled" Skipped StepStatus = "Skipped" )
func AllSucceeded ¶
func AllSucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus
AllSucceeded: all Upstreams are Succeeded
func AllSucceededOrSkipped ¶ added in v0.0.3
func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StepResult) StepStatus
AllSucceededOrSkipped: all Upstreams are Succeeded or Skipped
func Always ¶
func Always(context.Context, map[Steper]StepResult) StepStatus
Always: as long as all Upstreams are terminated
func AnyFailed ¶
func AnyFailed(ctx context.Context, ups map[Steper]StepResult) StepStatus
AnyFailed: any Upstream is Failed
func AnySucceeded ¶ added in v0.0.7
func AnySucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus
AnySucceeded: any Upstream is Succeeded
func BeCanceled ¶
func BeCanceled(ctx context.Context, ups map[Steper]StepResult) StepStatus
BeCanceled: only run when the workflow is canceled
func StatusFromError ¶
func StatusFromError(err error) StepStatus
StatusFromError gets the StepStatus from error.
func (StepStatus) IsTerminated ¶
func (s StepStatus) IsTerminated() bool
func (StepStatus) String ¶
func (s StepStatus) String() string
type Steper ¶
Steper describes the requirement for a Step, which is basic unit of a Workflow.
Implement this interface to allow Workflow orchestrating your Steps.
Notice Steper will be saved in Workflow as map key, so it's supposed to be 'comparable' type like pointer.
type StringerNamedStep ¶
func (*StringerNamedStep) String ¶
func (sns *StringerNamedStep) String() string
func (*StringerNamedStep) Unwrap ¶
func (sns *StringerNamedStep) Unwrap() Steper
type SwitchBranch ¶ added in v0.0.7
type SwitchBranch[T Steper] struct { Target T CasesToCheck map[Steper]*BranchCheck[T] DefaultStep []Steper Cond Condition }
SwitchBranch adds target step, cases and default step to workflow, and check the target step and determine which branch to go.
func Switch ¶ added in v0.0.7
func Switch[T Steper](target T) *SwitchBranch[T]
Switch adds a switch branch to the workflow.
Switch(someStep). Case(case1, func(ctx context.Context, someStep *SomeStep) (bool, error) { // branch condition here, true to select this branch // error will fail the case }). Default(defaultStep), // the step to run if all case checks return false )
func (*SwitchBranch[T]) AddToWorkflow ¶ added in v0.1.0
func (s *SwitchBranch[T]) AddToWorkflow() map[Steper]*StepConfig
func (*SwitchBranch[T]) Case ¶ added in v0.0.7
func (s *SwitchBranch[T]) Case(step Steper, check BranchCheckFunc[T]) *SwitchBranch[T]
Case adds a case to the switch branch.
func (*SwitchBranch[T]) Cases ¶ added in v0.0.7
func (s *SwitchBranch[T]) Cases(steps []Steper, check BranchCheckFunc[T]) *SwitchBranch[T]
Cases adds multiple cases to the switch branch. The check function will be executed for each case step.
func (*SwitchBranch[T]) Default ¶ added in v0.0.7
func (s *SwitchBranch[T]) Default(step ...Steper) *SwitchBranch[T]
Default adds default step(s) to the switch branch.
func (*SwitchBranch[T]) When ¶ added in v0.0.7
func (s *SwitchBranch[T]) When(cond Condition) *SwitchBranch[T]
When adds a condition to all case steps and default, not the Target!
type TraverseDecision ¶ added in v0.1.0
type TraverseDecision int
func Traverse ¶ added in v0.1.0
func Traverse(s Steper, f func(Steper, []Steper) TraverseDecision, walked ...Steper) TraverseDecision
Traverse performs a pre-order traversal of the tree of step.
type Workflow ¶
type Workflow struct { MaxConcurrency int // MaxConcurrency limits the max concurrency of running Steps DontPanic bool // DontPanic suppress panics, instead return it as error SkipAsError bool // SkipAsError marks skipped Steps as an error if true, otherwise ignore them Clock clock.Clock // Clock for retry and unit test StepBuilder // StepBuilder to call BuildStep() for Steps // contains filtered or unexported fields }
Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).
The Steps are connected via dependency, use Step(), Steps() or Pipe(), BatchPipe() to add Steps into Workflow.
workflow.Add( Step(a), Steps(b, c).DependsOn(a), // a -> b, c Pipe(d, e, f), // d -> e -> f BatchPipe( Steps(g, h), Steps(i, j), ), // g, h -> i, j )
Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine.
Workflow guarantees that
Before a Step goroutine starts, all its Upstream Steps are `terminated`.
Check `StepStatus` and `Condition` for details.
Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports Composite Steps, check Has(), As() and HasStep() for details.
func (*Workflow) Do ¶
Do starts the Step execution in topological order, and waits until all Steps terminated.
Do will block the current goroutine.
func (*Workflow) IsTerminated ¶
IsTerminated returns true if all Steps terminated.
func (*Workflow) StateOf ¶
StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.
func (*Workflow) UpstreamOf ¶
func (w *Workflow) UpstreamOf(step Steper) map[Steper]StepResult
UpstreamOf returns all upstream Steps and their status and error.