flow

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2024 License: MIT Imports: 10 Imported by: 2

README

Workflow - a library organizes steps with dependencies into DAG (Directed-Acyclic-Graph) for Go

Go Report Card Go Test Status Go Test Coverage

Overview

Strongly encourage everyone to read examples in the examples directory to have a quick understanding of how to use this library.

go-workflow helps Go developers organize steps with dependencies into a Directed-Acyclic-Graph (DAG).

  • It provides a simple and flexible way to define and execute a workflow.
  • It is easy to implement steps and compose them into a composite step.
  • It uses goroutine to execute steps concurrently.
  • It supports retry, timeout, and other configurations for each step.
  • It supports callbacks to hook before / after each step.

See it in action:

package yours

import (
    "context"

    flow "github.com/Azure/go-workflow"
)

type Step struct{ Value string }

// All required for a step is `Do(context.Context) error`
func (s *Step) Do(ctx context.Context) error {
    fmt.Println(s.Value)
    return nil
}

func main() {
    // declare steps
    var (
        a = new(Step)
        b = &Step{Value: "B"}
        c = flow.Func("declare from anonymous function", func(ctx context.Context) error {
            fmt.Println("C")
            return nil
        })
    )
    // compose steps into a workflow!
    w := new(flow.Workflow)
    w.Add(
        flow.Step(b).DependsOn(a),     // use DependsOn to define dependencies
        flow.Steps(a, b).DependsOn(c), // final execution order: c -> a -> b

        // other configurations, like retry, timeout, condition, etc.
        flow.Step(c).
            Retry(func(ro *flow.RetryOption) {
                ro.Attempts = 3 // retry 3 times
            }).
            Timeout(10*time.Minute), // timeout after 10 minutes

        // use Input to change step at runtime
        flow.Step(a).Input(func(ctx context.Context, a *Step) error {
            a.Value = "A"
            return nil
        }),
    )
    // execute the workflow and block until all steps are terminated
    err := w.Do(context.Background())
}

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

Documentation

Index

Constants

View Source
const (
	TraverseContinue  = iota // TraverseContinue continue the traversal
	TraverseStop             // TraverseStop stop and exit the traversal immediately
	TraverseEndBranch        // TraverseEndBranch end the current branch, but continue sibling branches
)

Variables

View Source
var DefaultRetryOption = RetryOption{
	Backoff:  backoff.NewExponentialBackOff(),
	Attempts: 3,
}
View Source
var ErrWorkflowIsRunning = fmt.Errorf("Workflow is running, please wait for it terminated")

Functions

func As

func As[T Steper](s Steper) []T

As finds all steps in the tree of step that matches target type, and returns them. The sequence of the returned steps is pre-order traversal.

func ConditionOrDefault added in v0.0.7

func ConditionOrDefault(cond Condition) func(context.Context, map[Steper]StepResult) StepStatus

ConditionOrDefault will use DefaultCondition if cond is nil.

func Has added in v0.1.0

func Has[T Steper](s Steper) bool

Has reports whether there is any step inside matches target type.

func HasStep added in v0.1.0

func HasStep(step, target Steper) bool

HasStep reports whether there is any step matches target step.

func Keys added in v0.0.3

func Keys[M ~map[K]V, K comparable, V any](m M) []K

Keys returns the keys of the map m. The keys will be in an indeterminate order.

func LogValue

func LogValue(step Steper) logValue

LogValue is used with log/slog, you can use it like:

logger.With("step", LogValue(step))

To prevent expensive String() calls,

logger.With("step", String(step))

func String

func String(step Steper) string

String unwraps step and returns a proper string representation.

func Values added in v0.1.1

func Values[M ~map[K]V, K comparable, V any](m M) []V

Values returns the values of the map m. The values will be in an indeterminate order.

func WithStackTraces added in v0.0.8

func WithStackTraces(skip, depth int, ignores ...func(runtime.Frame) bool) func(error) error

WithStackTraces saves stack frames into error

Types

type AddStep

type AddStep[S Steper] struct {
	AddSteps
	Steps []S
}

func Step

func Step[S Steper](steps ...S) AddStep[S]

Step declares Step ready to be added into Workflow.

The main difference between Step() and Steps() is that, Step() allows to add Input for the Step.

Step(a).Input(func(ctx context.Context, a *A) error {
	// fill a
}))

func (AddStep[S]) AfterStep added in v0.0.3

func (as AddStep[S]) AfterStep(afters ...AfterStep) AddStep[S]

func (AddStep[S]) BeforeStep added in v0.0.3

func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S]

func (AddStep[S]) DependsOn

func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S]

func (AddStep[S]) Input

func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S]

Input adds BeforeStep callback for the Step(s).

Input callbacks will be called before Do, and the order will respect the order of declarations.

Step(a).
	Input(/* 1. this Input will be called first */).
	Input(/* 2. this Input will be called after 1. */)
Step(a).Input(/* 3. this Input is after all */)

The Input callbacks are executed at runtime and per-try.

func (AddStep[S]) Output added in v0.1.1

func (as AddStep[S]) Output(fns ...func(context.Context, S) error) AddStep[S]

Output can pass the results of the Step to outer scope. Output is only triggered when the Step is successful (returns nil error).

Output actually adds AfterStep callback for the Step(s).

The Output callbacks are executed at runtime and per-try.

func (AddStep[S]) Retry

func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S]

func (AddStep[S]) Timeout

func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S]

func (AddStep[S]) When

func (as AddStep[S]) When(when Condition) AddStep[S]

type AddSteps

type AddSteps map[Steper]*StepConfig

func BatchPipe added in v0.0.3

func BatchPipe(batch ...AddSteps) AddSteps

BatchPipe creates a batched pipeline in Workflow.

workflow.Add(
	BatchPipe(
		Steps(a, b),
		Steps(c, d, e),
		Steps(f),
	),
)

The above code is equivalent to:

workflow.Add(
	Steps(c, d, e).DependsOn(a, b),
	Steps(f).DependsOn(c, d, e),
)

func Pipe

func Pipe(steps ...Steper) AddSteps

Pipe creates a pipeline in Workflow.

workflow.Add(
	Pipe(a, b, c), // a -> b -> c
)

The above code is equivalent to:

workflow.Add(
	Step(b).DependsOn(a),
	Step(c).DependsOn(b),
)

func Steps

func Steps(steps ...Steper) AddSteps

Steps declares a series of Steps ready to be added into Workflow.

The Steps declared are mutually independent.

workflow.Add(
	Steps(a, b, c),					// a, b, c will be executed in parallel
	Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel
)

func (AddSteps) AddToWorkflow added in v0.1.0

func (as AddSteps) AddToWorkflow() map[Steper]*StepConfig

AddToWorkflow implements Builder

func (AddSteps) AfterStep added in v0.0.3

func (as AddSteps) AfterStep(afters ...AfterStep) AddSteps

AfterStep adds AfterStep callback for the Step(s).

The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. The order of execution will respect the order of declarations. The AfterStep callbacks are able to change the error returned by Do. The AfterStep callbacks are executed at runtime and per-try.

func (AddSteps) BeforeStep added in v0.0.3

func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps

BeforeStep adds BeforeStep callback for the Step(s).

The BeforeStep callback will be called before Do, and return when first error occurs. The order of execution will respect the order of declarations. The BeforeStep callbacks are able to change the context.Context feed into Do. The BeforeStep callbacks are executed at runtime and per-try.

func (AddSteps) DependsOn

func (as AddSteps) DependsOn(ups ...Steper) AddSteps

DependsOn declares dependency on the given Steps.

Step(a).DependsOn(b, c)

Then b, c should happen-before a.

func (AddSteps) Merge added in v0.0.3

func (as AddSteps) Merge(others ...AddSteps) AddSteps

Merge another AddSteps into one.

func (AddSteps) Retry

func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps

Retry customize how the Step should be retried.

Step will be retried as long as this option is configured.

w.Add(
	Step(a), // not retry
	Step(b).Retry(func(opt *RetryOption) { // will retry 3 times
		opt.MaxAttempts = 3
	}),
	Step(c).Retry(nil), // will use DefaultRetryOption!
)

func (AddSteps) Timeout

func (as AddSteps) Timeout(timeout time.Duration) AddSteps

Timeout sets the Step level timeout.

func (AddSteps) When

func (as AddSteps) When(cond Condition) AddSteps

When set the Condition for the Step.

type AfterStep added in v0.0.3

type AfterStep func(context.Context, Steper, error) error

AfterStep defines callback being called AFTER step being executed.

type BeforeStep added in v0.0.3

type BeforeStep func(context.Context, Steper) (context.Context, error)

BeforeStep defines callback being called BEFORE step being executed.

type BranchCheck added in v0.0.7

type BranchCheck[T Steper] struct {
	Check BranchCheckFunc[T]
	OK    bool
	Error error
}

BranchCheck represents a branch to be checked.

func (*BranchCheck[T]) Do added in v0.0.7

func (bc *BranchCheck[T]) Do(ctx context.Context, target T)

type BranchCheckFunc added in v0.0.7

type BranchCheckFunc[T Steper] func(context.Context, T) (bool, error)

BranchCheckFunc checks the target and returns true if the branch should be selected.

type Builder added in v0.1.4

type Builder interface {
	AddToWorkflow() map[Steper]*StepConfig
}

Builder builds a Workflow by adding Steps.

func Mock added in v0.0.3

func Mock[T Steper](step T, do func(context.Context) error) Builder

Mock helps to mock a step in Workflow.

w.Add(
	flow.Mock(step, func(ctx context.Context) error {}),
)

func Name added in v0.0.3

func Name(step Steper, name string) Builder

Name can rename a Step.

workflow.Add(
	Step(a),
	Name(a, "StepA"),
)

Attention: Name will wrap the original Step

func NameFunc added in v0.0.5

func NameFunc(step Steper, fn func() string) Builder

NameFunc can rename a Step with a runtime function.

func NameStringer added in v0.0.3

func NameStringer(step Steper, name fmt.Stringer) Builder

NameStringer can rename a Step with a fmt.Stringer, which allows String() method to be called at runtime.

func Names added in v0.0.5

func Names(m map[Steper]string) Builder

Names can rename multiple Steps.

workflow.Add(
	Names(map[Steper]string{
		stepA: "A",
		stepB: "B",
	},
)

type Condition

type Condition func(ctx context.Context, ups map[Steper]StepResult) StepStatus

Condition is a function to determine what's the next status of Step. Condition makes the decision based on the status and result of all the Upstream Steps. Condition is only called when all Upstream Steps are terminated.

var (
	// DefaultCondition used in workflow, defaults to AllSucceeded
	DefaultCondition Condition = AllSucceeded
	// DefaultIsCanceled is used to determine whether an error is being regarded as canceled.
	DefaultIsCanceled = func(err error) bool {
		switch {
		case errors.Is(err, context.Canceled),
			errors.Is(err, context.DeadlineExceeded),
			StatusFromError(err) == Canceled:
			return true
		}
		return false
	}
)

type ErrBeforeStep added in v0.1.0

type ErrBeforeStep struct {
	// contains filtered or unexported fields
}

func (ErrBeforeStep) Unwrap added in v0.1.0

func (e ErrBeforeStep) Unwrap() error

type ErrCancel

type ErrCancel struct {
	// contains filtered or unexported fields
}

func Cancel

func Cancel(err error) ErrCancel

Cancel marks the current step as `Canceled`, and reports the error.

func (ErrCancel) Unwrap

func (e ErrCancel) Unwrap() error

type ErrCycleDependency

type ErrCycleDependency map[Steper][]Steper

ErrCycleDependency means there is a cycle-dependency in your Workflow!!!

func (ErrCycleDependency) Error

func (e ErrCycleDependency) Error() string

type ErrPanic

type ErrPanic struct {
	// contains filtered or unexported fields
}

func (ErrPanic) Unwrap

func (e ErrPanic) Unwrap() error

type ErrSkip

type ErrSkip struct {
	// contains filtered or unexported fields
}

func Skip

func Skip(err error) ErrSkip

Skip marks the current step as `Skipped`, and reports the error.

func (ErrSkip) Unwrap

func (e ErrSkip) Unwrap() error

type ErrSucceed

type ErrSucceed struct {
	// contains filtered or unexported fields
}

func Succeed

func Succeed(err error) ErrSucceed

Succeed marks the current step as `Succeeded`, while still reports the error.

func (ErrSucceed) Unwrap

func (e ErrSucceed) Unwrap() error

type ErrWithStackTraces added in v0.0.8

type ErrWithStackTraces struct {
	Err    error
	Frames []runtime.Frame
}

ErrWithStackTraces saves stack frames into error, and prints error into

error message

Stack Traces:
	file:line

func (ErrWithStackTraces) Error added in v0.0.8

func (e ErrWithStackTraces) Error() string

func (ErrWithStackTraces) StackTraces added in v0.0.8

func (e ErrWithStackTraces) StackTraces() []string

func (ErrWithStackTraces) Unwrap added in v0.0.10

func (e ErrWithStackTraces) Unwrap() error

type ErrWorkflow

type ErrWorkflow map[Steper]StepResult

ErrWorkflow contains all errors reported from terminated Steps in Workflow.

Keys are root Steps, values are its status and error.

func (ErrWorkflow) AllSucceeded

func (e ErrWorkflow) AllSucceeded() bool

func (ErrWorkflow) AllSucceededOrSkipped

func (e ErrWorkflow) AllSucceededOrSkipped() bool

func (ErrWorkflow) Error

func (e ErrWorkflow) Error() string

ErrWorkflow will be printed as:

Step: [Status]
	error message

func (ErrWorkflow) Unwrap

func (e ErrWorkflow) Unwrap() []error

type Function

type Function[I, O any] struct {
	Name   string
	Input  I
	Output O
	DoFunc func(context.Context, I) (O, error)
}

Function wraps an arbitrary function as a Step.

func Func

func Func(name string, do func(context.Context) error) *Function[struct{}, struct{}]

Func constructs a Step from an arbitrary function

func FuncI

func FuncI[I any](name string, do func(context.Context, I) error) *Function[I, struct{}]

func FuncIO

func FuncIO[I, O any](name string, do func(context.Context, I) (O, error)) *Function[I, O]

func FuncO

func FuncO[O any](name string, do func(context.Context) (O, error)) *Function[struct{}, O]

func (*Function[I, O]) Do

func (f *Function[I, O]) Do(ctx context.Context) error

func (*Function[I, O]) String

func (f *Function[I, O]) String() string

type IfBranch added in v0.0.7

type IfBranch[T Steper] struct {
	Target      T // the target to check
	BranchCheck BranchCheck[T]
	ThenStep    []Steper
	ElseStep    []Steper
	Cond        Condition // Cond is the When condition for both ThenStep and ElseStep, not target Step!
}

IfBranch adds target step, then and else step to workflow, and check the target step and determine which branch to go.

func If added in v0.0.7

func If[T Steper](target T, check BranchCheckFunc[T]) *IfBranch[T]

If adds a conditional branch to the workflow.

If(someStep, func(ctx context.Context, someStep *SomeStep) (bool, error) {
	// branch condition here, true -> Then, false -> Else.
	// if error is returned, then fail the selected branch step.
}).
Then(thenStep).
Else(elseStep)

func (*IfBranch[T]) AddToWorkflow added in v0.1.0

func (i *IfBranch[T]) AddToWorkflow() map[Steper]*StepConfig

func (*IfBranch[T]) Else added in v0.0.7

func (i *IfBranch[T]) Else(el ...Steper) *IfBranch[T]

Else adds steps to the Else branch.

func (*IfBranch[T]) Then added in v0.0.7

func (i *IfBranch[T]) Then(th ...Steper) *IfBranch[T]

Then adds steps to the Then branch.

func (*IfBranch[T]) When added in v0.0.7

func (i *IfBranch[T]) When(cond Condition) *IfBranch[T]

When adds a condition to both Then and Else steps, not the Target! Default to DefaultCondition.

type MockStep

type MockStep struct {
	Step   Steper
	MockDo func(context.Context) error
}

MockStep helps to mock a step. After building a workflow, you can mock the original step with a mock step.

func (*MockStep) Do

func (m *MockStep) Do(ctx context.Context) error

func (*MockStep) Unwrap

func (m *MockStep) Unwrap() Steper

type NamedStep

type NamedStep struct {
	Name string
	Steper
}

NamedStep is a wrapper of Steper, it gives your step a name by overriding String() method.

func (*NamedStep) String

func (ns *NamedStep) String() string

func (*NamedStep) Unwrap

func (ns *NamedStep) Unwrap() Steper

type NoOpStep added in v0.0.8

type NoOpStep struct{ Name string }

func NoOp added in v0.0.8

func NoOp(name string) *NoOpStep

NoOp constructs a step doing nothing.

func (*NoOpStep) Do added in v0.0.8

func (*NoOpStep) String added in v0.0.8

func (n *NoOpStep) String() string

type RetryEvent added in v0.1.2

type RetryEvent struct {
	Attempt uint64
	Since   time.Duration
	Error   error
}

RetryEvent is the event fired when a retry happens

type RetryOption

type RetryOption struct {
	TimeoutPerTry time.Duration // 0 means no timeout
	Attempts      uint64        // 0 means no limit
	// NextBackOff is called after each retry to determine the next backoff duration.
	// Notice if attempts limits are reach, or context timeout, or BackOff fires backoff.Stop,
	// this function will not be called.
	//
	// RetryEvent: the event records attempt, duration since the start, and the error of the last try.
	// nextBackOff: the next backoff duration calculated by the inner BackOff
	NextBackOff func(ctx context.Context, re RetryEvent, nextBackOff time.Duration) time.Duration

	Backoff backoff.BackOff
	Notify  backoff.Notify
	Timer   backoff.Timer
}

RetryOption customizes retry behavior of a Step in Workflow.

type Set

type Set[T comparable] map[T]struct{}

func (*Set[T]) Add

func (s *Set[T]) Add(vs ...T)

func (Set[T]) Flatten added in v0.0.7

func (s Set[T]) Flatten() []T

func (Set[T]) Has

func (s Set[T]) Has(v T) bool

func (*Set[T]) Union

func (s *Set[T]) Union(sets ...Set[T])

type State

type State struct {
	StepResult
	Config *StepConfig
	sync.RWMutex
}

State is the internal state of a Step in a Workflow.

It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step. The status could be read / write from different goroutines, so use RWMutex to protect it.

func (*State) AddUpstream

func (s *State) AddUpstream(up Steper)

func (*State) After added in v0.0.3

func (s *State) After(ctx context.Context, step Steper, err error) error

func (*State) Before added in v0.0.3

func (s *State) Before(ctx context.Context, step Steper) (context.Context, error)

func (*State) GetError

func (s *State) GetError() error

func (*State) GetStatus

func (s *State) GetStatus() StepStatus

func (*State) GetStepResult added in v0.1.0

func (s *State) GetStepResult() StepResult

func (*State) MergeConfig

func (s *State) MergeConfig(sc *StepConfig)

func (*State) Option

func (s *State) Option() *StepOption

func (*State) SetError

func (s *State) SetError(err error)

func (*State) SetStatus

func (s *State) SetStatus(ss StepStatus)

func (*State) Upstreams

func (s *State) Upstreams() Set[Steper]

type StepBuilder added in v0.0.4

type StepBuilder struct {
	// contains filtered or unexported fields
}

StepBuilder allows to build the internal Steps when adding into Workflow.

type StepImpl struct {}
func (s *StepImpl) Unwrap() []flow.Steper { return /* internal steps */ }
func (s *StepImpl) Do(ctx context.Context) error { /* ... */ }
func (s *StepImpl) BuildStep() { /* build internal steps */ }

workflow.Add(
	flow.Step(new(StepImpl)), // here will call StepImpl.BuildStep() once implicitly
)

func (*StepBuilder) BuildStep added in v0.0.4

func (sb *StepBuilder) BuildStep(s Steper)

BuildStep calls BuildStep() method of the Steper if it's implemented, and ensure it's called only once for each Steper.

type StepConfig

type StepConfig struct {
	Upstreams Set[Steper]         // Upstreams of the Step, means these Steps should happen-before this Step
	Before    []BeforeStep        // Before callbacks of the Step, will be called before Do
	After     []AfterStep         // After callbacks of the Step, will be called before Do
	Option    []func(*StepOption) // Option customize the Step settings
}

func (*StepConfig) Merge

func (sc *StepConfig) Merge(other *StepConfig)

type StepOption

type StepOption struct {
	RetryOption *RetryOption   // RetryOption customize how the Step should be retried, default (nil) means no retry.
	Condition   Condition      // Condition decides whether Workflow should execute the Step, default to DefaultCondition.
	Timeout     *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout.
}

type StepResult added in v0.1.0

type StepResult struct {
	Status StepStatus
	Err    error
}

StepResult contains the status and error of a Step.

func (StepResult) Error added in v0.1.0

func (e StepResult) Error() string

StatusError will be printed as:

[Status]
	error message

func (StepResult) Unwrap added in v0.1.0

func (e StepResult) Unwrap() error

type StepStatus

type StepStatus string

StepStatus describes the status of a Step.

const (
	Pending   StepStatus = ""
	Running   StepStatus = "Running"
	Failed    StepStatus = "Failed"
	Succeeded StepStatus = "Succeeded"
	Canceled  StepStatus = "Canceled"
	Skipped   StepStatus = "Skipped"
)

func AllSucceeded

func AllSucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus

AllSucceeded: all Upstreams are Succeeded

func AllSucceededOrSkipped added in v0.0.3

func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StepResult) StepStatus

AllSucceededOrSkipped: all Upstreams are Succeeded or Skipped

func Always

Always: as long as all Upstreams are terminated

func AnyFailed

func AnyFailed(ctx context.Context, ups map[Steper]StepResult) StepStatus

AnyFailed: any Upstream is Failed

func AnySucceeded added in v0.0.7

func AnySucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus

AnySucceeded: any Upstream is Succeeded

func BeCanceled

func BeCanceled(ctx context.Context, ups map[Steper]StepResult) StepStatus

BeCanceled: only run when the workflow is canceled

func StatusFromError

func StatusFromError(err error) StepStatus

StatusFromError gets the StepStatus from error.

func (StepStatus) IsTerminated

func (s StepStatus) IsTerminated() bool

func (StepStatus) String

func (s StepStatus) String() string

type Steper

type Steper interface {
	Do(context.Context) error
}

Steper describes the requirement for a Step, which is basic unit of a Workflow.

Implement this interface to allow Workflow orchestrating your Steps.

Notice Steper will be saved in Workflow as map key, so it's supposed to be 'comparable' type like pointer.

func ToSteps

func ToSteps[S Steper](steps []S) []Steper

ToSteps converts []<StepDoer implemention> to []StepDoer.

steps := []someStepImpl{ ... }
flow.Add(
	Steps(ToSteps(steps)...),
)

type StringerNamedStep

type StringerNamedStep struct {
	Name fmt.Stringer
	Steper
}

func (*StringerNamedStep) String

func (sns *StringerNamedStep) String() string

func (*StringerNamedStep) Unwrap

func (sns *StringerNamedStep) Unwrap() Steper

type SwitchBranch added in v0.0.7

type SwitchBranch[T Steper] struct {
	Target       T
	CasesToCheck map[Steper]*BranchCheck[T]
	DefaultStep  []Steper
	Cond         Condition
}

SwitchBranch adds target step, cases and default step to workflow, and check the target step and determine which branch to go.

func Switch added in v0.0.7

func Switch[T Steper](target T) *SwitchBranch[T]

Switch adds a switch branch to the workflow.

Switch(someStep).
	Case(case1, func(ctx context.Context, someStep *SomeStep) (bool, error) {
		// branch condition here, true to select this branch
		// error will fail the case
	}).
	Default(defaultStep), // the step to run if all case checks return false
)

func (*SwitchBranch[T]) AddToWorkflow added in v0.1.0

func (s *SwitchBranch[T]) AddToWorkflow() map[Steper]*StepConfig

func (*SwitchBranch[T]) Case added in v0.0.7

func (s *SwitchBranch[T]) Case(step Steper, check BranchCheckFunc[T]) *SwitchBranch[T]

Case adds a case to the switch branch.

func (*SwitchBranch[T]) Cases added in v0.0.7

func (s *SwitchBranch[T]) Cases(steps []Steper, check BranchCheckFunc[T]) *SwitchBranch[T]

Cases adds multiple cases to the switch branch. The check function will be executed for each case step.

func (*SwitchBranch[T]) Default added in v0.0.7

func (s *SwitchBranch[T]) Default(step ...Steper) *SwitchBranch[T]

Default adds default step(s) to the switch branch.

func (*SwitchBranch[T]) When added in v0.0.7

func (s *SwitchBranch[T]) When(cond Condition) *SwitchBranch[T]

When adds a condition to all case steps and default, not the Target!

type TraverseDecision added in v0.1.0

type TraverseDecision int

func Traverse added in v0.1.0

func Traverse(s Steper, f func(Steper, []Steper) TraverseDecision, walked ...Steper) TraverseDecision

Traverse performs a pre-order traversal of the tree of step.

type Workflow

type Workflow struct {
	MaxConcurrency int         // MaxConcurrency limits the max concurrency of running Steps
	DontPanic      bool        // DontPanic suppress panics, instead return it as error
	SkipAsError    bool        // SkipAsError marks skipped Steps as an error if true, otherwise ignore them
	Clock          clock.Clock // Clock for retry and unit test

	StepBuilder // StepBuilder to call BuildStep() for Steps
	// contains filtered or unexported fields
}

Workflow represents a collection of connected Steps that form a directed acyclic graph (DAG).

The Steps are connected via dependency, use Step(), Steps() or Pipe(), BatchPipe() to add Steps into Workflow.

workflow.Add(
	Step(a),
	Steps(b, c).DependsOn(a),	// a -> b, c
	Pipe(d, e, f),              // d -> e -> f
	BatchPipe(
		Steps(g, h),
		Steps(i, j),
	),                          // g, h -> i, j
)

Workflow will execute Steps in a topological order, each Step will be executed in a separate goroutine.

Workflow guarantees that

Before a Step goroutine starts, all its Upstream Steps are `terminated`.

Check `StepStatus` and `Condition` for details.

Workflow supports Step-level configuration, check Step(), Steps() and Pipe() for details. Workflow supports Composite Steps, check Has(), As() and HasStep() for details.

func (*Workflow) Add

func (w *Workflow) Add(was ...Builder) *Workflow

Add Steps into Workflow in phase Main.

func (*Workflow) Do

func (w *Workflow) Do(ctx context.Context) error

Do starts the Step execution in topological order, and waits until all Steps terminated.

Do will block the current goroutine.

func (*Workflow) Empty

func (w *Workflow) Empty() bool

Empty returns true if the Workflow don't have any Step.

func (*Workflow) IsTerminated

func (w *Workflow) IsTerminated() bool

IsTerminated returns true if all Steps terminated.

func (*Workflow) Reset added in v0.0.7

func (w *Workflow) Reset() error

Reset resets the Workflow to ready for a new run.

func (*Workflow) RootOf

func (w *Workflow) RootOf(step Steper) Steper

RootOf returns the root Step of the given Step.

func (*Workflow) StateOf

func (w *Workflow) StateOf(step Steper) *State

StateOf returns the internal state of the Step. State includes Step's status, error, input, dependency and config.

func (*Workflow) Steps

func (w *Workflow) Steps() []Steper

Steps returns all root Steps in the Workflow.

func (*Workflow) Unwrap

func (w *Workflow) Unwrap() []Steper

func (*Workflow) UpstreamOf

func (w *Workflow) UpstreamOf(step Steper) map[Steper]StepResult

UpstreamOf returns all upstream Steps and their status and error.

Directories

Path Synopsis
visual

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL