Documentation ¶
Index ¶
- func LoadFromContext(ctx context.Context, key any) (any, bool)
- func LoadFromContextOrDefault(ctx context.Context, key any, defValue any) any
- func MustLoadFromContext(ctx context.Context, key any) any
- func MutableContext(parent context.Context) context.Context
- func StoreInContext(ctx context.Context, key, value any)
- type ActionFunc
- type DependencyError
- type DependencyRecorder
- func (s *DependencyRecorder[T]) MustRequireDependencyByFuncName(actions ...ActionFunc[T])
- func (s *DependencyRecorder[T]) MustRequireDependencyByStepName(stepNames ...string)
- func (s *DependencyRecorder[T]) Record(step Step[T])
- func (s *DependencyRecorder[T]) RequireDependencyByFuncName(actions ...ActionFunc[T]) error
- func (s *DependencyRecorder[T]) RequireDependencyByStepName(stepNames ...string) error
- type DependencyResolver
- type ErrorHandler
- type Listener
- type Options
- type ParallelResultHandler
- type Pipeline
- func (p *Pipeline[T]) AddStep(step Step[T]) *Pipeline[T]
- func (p *Pipeline[T]) AddStepFromFunc(name string, fn ActionFunc[T]) *Pipeline[T]
- func (p *Pipeline[T]) AsNestedStep(name string) Step[T]
- func (p *Pipeline[T]) NewStep(name string, action ActionFunc[T]) Step[T]
- func (p *Pipeline[T]) RunWithContext(ctx T) error
- func (p *Pipeline[T]) When(predicate Predicate[T], name string, action ActionFunc[T]) Step[T]
- func (p *Pipeline[T]) WithBeforeHooks(listeners ...Listener[T]) *Pipeline[T]
- func (p *Pipeline[T]) WithFinalizer(handler ErrorHandler[T]) *Pipeline[T]
- func (p *Pipeline[T]) WithNestedSteps(name string, predicate Predicate[T], steps ...Step[T]) Step[T]
- func (p *Pipeline[T]) WithOptions(options Options) *Pipeline[T]
- func (p *Pipeline[T]) WithSteps(steps ...Step[T]) *Pipeline[T]
- type Predicate
- type Recorder
- type Result
- type Step
- func NewFanOutStep[T context.Context](name string, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T]
- func NewStep[T context.Context](name string, action ActionFunc[T]) Step[T]
- func NewStepIf[T context.Context](predicate Predicate[T], name string, actionFunc ActionFunc[T]) Step[T]
- func NewWorkerPoolStep[T context.Context](name string, size int, pipelineSupplier Supplier[T], ...) Step[T]
- type Supplier
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LoadFromContext ¶ added in v0.16.0
LoadFromContext returns the value from the given context with the given key. It returns the value and true, or nil and false if the key doesn't exist. It returns nil and true if the key exists and the value actually is nil. Use StoreInContext to store values.
Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.
func LoadFromContextOrDefault ¶ added in v0.19.0
LoadFromContextOrDefault is similar to MustLoadFromContext, except it returns the given default value if the key doesn't exist. Use StoreInContext to store values.
Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.
func MustLoadFromContext ¶ added in v0.17.0
MustLoadFromContext is similar to LoadFromContext, except it doesn't return a bool to indicate whether the key exists. It panics if the key doesn't exist. Use StoreInContext to store values.
Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.
func MutableContext ¶ added in v0.16.0
MutableContext adds a map to the given context that can be used to store mutable values in the context. It uses sync.Map under the hood. Repeated calls to MutableContext with the same parent has no effect and returns the same context.
See also StoreInContext and LoadFromContext.
Example ¶
type key struct{} ctx := MutableContext(context.Background()) p := NewPipeline[context.Context]().WithSteps( NewStep("store value", func(ctx context.Context) error { StoreInContext(ctx, key{}, "value") return nil }), NewStep("retrieve value", func(ctx context.Context) error { value, _ := LoadFromContext(ctx, key{}) fmt.Println(value) return nil }), ) _ = p.RunWithContext(ctx)
Output: value
func StoreInContext ¶ added in v0.16.0
StoreInContext adds the given key and value to ctx. Any keys or values added during pipeline execution is available in the next steps, provided the pipeline runs synchronously. In parallel executed pipelines you may encounter race conditions. Use LoadFromContext to retrieve values.
Note: This method is thread-safe, but panics if ctx has not been set up with MutableContext first.
Types ¶
type ActionFunc ¶
ActionFunc is the func that contains your business logic.
type DependencyError ¶ added in v0.20.0
type DependencyError struct { // MissingSteps returns a slice of Step or ActionFunc names. MissingSteps []string }
DependencyError is an error that indicates which steps did not satisfy dependency requirements.
func (*DependencyError) Error ¶ added in v0.20.0
func (d *DependencyError) Error() string
Error returns a stringed list of steps that did not run either by Step or ActionFunc name.
type DependencyRecorder ¶ added in v0.20.0
type DependencyRecorder[T context.Context] struct { // Records contains a slice of Steps that were run. // It contains also the last Step that failed with an error. Records []Step[T] }
DependencyRecorder is a Recorder and DependencyResolver that tracks each Step executed and can be used to query if certain steps are in the Records.
func NewDependencyRecorder ¶ added in v0.20.0
func NewDependencyRecorder[T context.Context]() *DependencyRecorder[T]
NewDependencyRecorder returns a new instance of DependencyRecorder.
func (*DependencyRecorder[T]) MustRequireDependencyByFuncName ¶ added in v0.20.0
func (s *DependencyRecorder[T]) MustRequireDependencyByFuncName(actions ...ActionFunc[T])
MustRequireDependencyByFuncName implements DependencyResolver.MustRequireDependencyByFuncName.
func (*DependencyRecorder[T]) MustRequireDependencyByStepName ¶ added in v0.20.0
func (s *DependencyRecorder[T]) MustRequireDependencyByStepName(stepNames ...string)
MustRequireDependencyByStepName implements DependencyResolver.MustRequireDependencyByStepName.
func (*DependencyRecorder[T]) Record ¶ added in v0.20.0
func (s *DependencyRecorder[T]) Record(step Step[T])
Record implements Recorder.
func (*DependencyRecorder[T]) RequireDependencyByFuncName ¶ added in v0.20.0
func (s *DependencyRecorder[T]) RequireDependencyByFuncName(actions ...ActionFunc[T]) error
RequireDependencyByFuncName implements DependencyResolver.RequireDependencyByFuncName.
Direct function pointers can easily be compared:
func myFunc(ctx context.Context) error { return nil } ... pipe.AddStep("test", myFunc) ... recorder.RequireDependencyByFuncName(myFunc)
Note that you may experience unexpected behaviour when dealing with generative functions. For example, the following snippet will not work, since the function names from 2 different call locations are different:
generateFunc() func(ctx context.Context) error { return func(ctx context.Context) error { return nil } } ... pipe.AddStep("test", generateFunc()) ... recorder.RequireDependencyByFuncName(generateFunc()) // will end in an error
As an alternative, you may store the generated function in a variable that is accessible from multiple locations:
var genFunc = generateFunc() ... pipe.AddStep("test", genFunc()) ... recorder.RequireDependencyByFuncName(genFunc()) // works
Example ¶
action := func(_ context.Context) error { fmt.Println("running step 1") return nil } recorder := NewDependencyRecorder[context.Context]() p := NewPipeline[context.Context]().WithBeforeHooks(recorder.Record) p.WithSteps( p.When(Bool[context.Context](false), "step 1", action), p.NewStep("step 2", func(_ context.Context) error { if err := recorder.RequireDependencyByFuncName(action); err != nil { return err } // this won't run fmt.Println("running step 2") return nil }), ) err := p.RunWithContext(context.Background()) fmt.Println(err)
Output: step 'step 2' failed: required steps did not run: [github.com/ccremer/go-command-pipeline.ExampleDependencyRecorder_RequireDependencyByFuncName.func1]
func (*DependencyRecorder[T]) RequireDependencyByStepName ¶ added in v0.20.0
func (s *DependencyRecorder[T]) RequireDependencyByStepName(stepNames ...string) error
RequireDependencyByStepName implements DependencyResolver.RequireDependencyByStepName. A DependencyError is returned with a list of names that aren't in the Records. Steps that share the same name are not distinguishable.
Example ¶
recorder := NewDependencyRecorder[context.Context]() p := NewPipeline[context.Context]().WithBeforeHooks(recorder.Record) p.WithSteps( p.When(Bool[context.Context](false), "step 1", func(_ context.Context) error { fmt.Println("running step 1") return nil }), p.NewStep("step 2", func(_ context.Context) error { if err := recorder.RequireDependencyByStepName("step 1"); err != nil { return err } // this won't run fmt.Println("running step 2") return nil }), ) err := p.RunWithContext(context.Background()) fmt.Println(err)
Output: step 'step 2' failed: required steps did not run: [step 1]
type DependencyResolver ¶ added in v0.20.0
type DependencyResolver[T context.Context] interface { Recorder[T] // RequireDependencyByStepName checks if any of the given step names are present in the Records. // It returns nil if all given step names are in the Records in any order. RequireDependencyByStepName(stepNames ...string) error // MustRequireDependencyByStepName is RequireDependencyByStepName but any non-nil errors result in a panic. MustRequireDependencyByStepName(stepNames ...string) // RequireDependencyByFuncName checks if any of the given action functions are present in the Records. // It returns nil if all given functions are in the Records in any order. // Since functions aren't comparable for equality, the resolver attempts to compare them by name through reflection. RequireDependencyByFuncName(actions ...ActionFunc[T]) error // MustRequireDependencyByFuncName is RequireDependencyByFuncName but any non-nil errors result in a panic. MustRequireDependencyByFuncName(actions ...ActionFunc[T]) }
DependencyResolver provides means to query if a pipeline Step is satisfied as a dependency for another Step. It is used together with Recorder.
type ErrorHandler ¶ added in v0.19.0
ErrorHandler is a func that gets called when a step's ActionFunc has finished with an error.
type Options ¶ added in v0.19.0
type Options struct { // DisableErrorWrapping disables the wrapping of errors that are emitted from pipeline steps. // This effectively causes error to be exactly the error as returned from a step. // The step's name is omitted from the error message. DisableErrorWrapping bool }
Options configures the given Pipeline with a behaviour-altering settings.
type ParallelResultHandler ¶ added in v0.13.0
ParallelResultHandler is a callback that provides a Result map and expect a single, combined Result object. The map key is a zero-based index of n-th Pipeline spawned, e.g. pipeline number 3 will have index 2. Return an empty error if you want to ignore errors, or reduce multiple errors into a single one to make the parent Pipeline fail.
type Pipeline ¶
Pipeline holds and runs intermediate actions, called "steps".
func NewPipeline ¶
NewPipeline returns a new Pipeline instance.
func (*Pipeline[T]) AddStep ¶
AddStep appends the given step to the Pipeline at the end and returns itself.
func (*Pipeline[T]) AddStepFromFunc ¶ added in v0.15.0
func (p *Pipeline[T]) AddStepFromFunc(name string, fn ActionFunc[T]) *Pipeline[T]
AddStepFromFunc appends the given function to the Pipeline at the end and returns itself.
func (*Pipeline[T]) AsNestedStep ¶
AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines. The properties are passed to the nested pipeline.
func (*Pipeline[T]) NewStep ¶ added in v0.19.0
func (p *Pipeline[T]) NewStep(name string, action ActionFunc[T]) Step[T]
NewStep is syntactic sugar for NewStep but with T already set.
func (*Pipeline[T]) RunWithContext ¶ added in v0.13.0
RunWithContext executes the Pipeline. Steps are executed sequentially as they were added to the Pipeline. Upon cancellation of the context, the pipeline does not terminate a currently running step, instead it skips the remaining steps in the execution order. The context is passed to each Step.Action and each Step may need to listen to the context cancellation event to truly cancel a long-running step. If the pipeline gets canceled, the context's error is returned.
All non-nil errors, except the error returned from the pipeline's finalizer, are wrapped in Result. This can be used to retrieve the metadata of the step that returned the error with errors.As:
err := p.RunWithContext(ctx) var result pipeline.Result if errors.As(err, &result) { fmt.Println(result.Name()) }
Example ¶
// prepare pipeline type exampleContext struct { context.Context field string } p := NewPipeline[*exampleContext]() p.WithSteps( p.NewStep("short step", func(ctx *exampleContext) error { fmt.Println(ctx.field) return nil }), p.NewStep("long running step", func(ctx *exampleContext) error { time.Sleep(100 * time.Millisecond) return nil }), p.NewStep("canceled step", func(ctx *exampleContext) error { return errors.New("shouldn't execute") }), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() pctx := &exampleContext{ctx, "hello world"} err := p.RunWithContext(pctx) // inspect the result fmt.Println(err)
Output: hello world step 'canceled step' failed: context deadline exceeded
func (*Pipeline[T]) When ¶ added in v0.19.0
func (p *Pipeline[T]) When(predicate Predicate[T], name string, action ActionFunc[T]) Step[T]
When is syntactic sugar for NewStep combined with Step.When.
Example ¶
p := NewPipeline[context.Context]() p.WithSteps( p.When(Bool[context.Context](true), "run", func(ctx context.Context) error { return nil }), )
Output:
func (*Pipeline[T]) WithBeforeHooks ¶ added in v0.7.0
WithBeforeHooks takes a list of listeners. Each Listener is called once in the given order just before the ActionFunc is invoked. The listeners should return as fast as possible, as they are not intended to do actual business logic.
func (*Pipeline[T]) WithFinalizer ¶ added in v0.8.0
func (p *Pipeline[T]) WithFinalizer(handler ErrorHandler[T]) *Pipeline[T]
WithFinalizer returns itself while setting the finalizer for the pipeline. The finalizer is a handler that gets called after the last step is in the pipeline is completed. If a pipeline aborts early or gets canceled then it is also called.
func (*Pipeline[T]) WithNestedSteps ¶ added in v0.4.0
func (p *Pipeline[T]) WithNestedSteps(name string, predicate Predicate[T], steps ...Step[T]) Step[T]
WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters. When predicate is non-nil then the steps are only executed if it evaluates to `true`.
func (*Pipeline[T]) WithOptions ¶ added in v0.12.0
WithOptions configures the Pipeline with settings. The Options are applied immediately. Options are applied to nested pipelines provided they are set before building the nested pipeline. Nested pipelines can be configured with their own Options.
type Predicate ¶
Predicate is a function that expects 'true' if an ActionFunc should run. It is evaluated lazily resp. only when needed. Predicate should be idempotent, meaning multiple invocations return the same result and without side effects.
func And ¶
And returns a Predicate that does logical AND of the given predicates. p2 is not evaluated if p1 evaluates already to false.
func Bool ¶
Bool returns a Predicate that simply returns v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.
func BoolPtr ¶ added in v0.13.0
BoolPtr returns a Predicate that returns *v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.
type Recorder ¶ added in v0.20.0
type Recorder[T context.Context] interface { // Record adds the step to the execution Records. Record(step Step[T]) }
Recorder Records the steps executed in a pipeline.
type Result ¶
type Result interface { error // Name retrieves the name of the (last) step that has been executed. Name() string }
Result is the object that is returned after each step and after running a pipeline.
type Step ¶
type Step[T context.Context] struct { // Name describes the step's human-readable name. // It has no other uses other than easily identifying a step for debugging or logging. Name string // Action is the ActionFunc assigned to a pipeline Step. // This is required. Action ActionFunc[T] // Handler is the ErrorHandler assigned to a pipeline Step. // This is optional, and it will be called if it is set after Action completed. // Use cases could be logging, updating a GUI or handle errors while continuing the pipeline. // The function may return nil even if the given error is non-nil, in which case the pipeline will continue. // This function is called before the next step's Action is invoked. Handler ErrorHandler[T] // Condition determines if the Step's Action is actually going to be executed in the pipeline. // When nil, the Action is executed. Condition Predicate[T] }
Step is an intermediary action and part of a Pipeline.
func NewFanOutStep ¶ added in v0.13.0
func NewFanOutStep[T context.Context](name string, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T]
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances. Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler. However, the error returned from ParallelResultHandler is wrapped in context.Canceled.
Example ¶
p := NewPipeline[context.Context]() fanout := NewFanOutStep[context.Context]("fanout", func(ctx context.Context, pipelines chan *Pipeline[context.Context]) { defer close(pipelines) // create some pipelines for i := 0; i < 3; i++ { n := i select { case <-ctx.Done(): return // parent pipeline has been canceled, let's not create more pipelines. default: p := NewPipeline[context.Context]() pipelines <- p.AddStep(p.NewStep(fmt.Sprintf("i = %d", n), func(_ context.Context) error { time.Sleep(time.Duration(n * 10000000)) // fake some load fmt.Println(fmt.Sprintf("I am worker %d", n)) return nil })) } } }, func(ctx context.Context, results map[uint64]error) error { for worker, result := range results { if result != nil { fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result)) } } return nil }) p.AddStep(fanout) p.RunWithContext(context.Background())
Output: I am worker 0 I am worker 1 I am worker 2
func NewStep ¶
func NewStep[T context.Context](name string, action ActionFunc[T]) Step[T]
NewStep returns a new Step with given name and action.
func NewStepIf ¶ added in v0.19.0
func NewStepIf[T context.Context](predicate Predicate[T], name string, actionFunc ActionFunc[T]) Step[T]
NewStepIf is syntactic sugar for NewStep with Step.When.
func NewWorkerPoolStep ¶ added in v0.13.0
func NewWorkerPoolStep[T context.Context](name string, size int, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T]
NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.
- If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
- The pipelines are executed in a pool of a number of Go routines indicated by size.
- If size is 1, the pipelines are effectively run in sequence.
- If size is 0 or less, the function panics.
Example ¶
p := NewPipeline[*testContext]() pool := NewWorkerPoolStep[*testContext]("pool", 2, func(ctx *testContext, pipelines chan *Pipeline[*testContext]) { defer close(pipelines) // create some pipelines for i := 0; i < 3; i++ { n := i select { case <-ctx.Done(): return // parent pipeline has been canceled, let's not create more pipelines. default: newP := NewPipeline[*testContext]() pipelines <- newP.AddStep(newP.NewStep(fmt.Sprintf("i = %d", n), func(_ *testContext) error { time.Sleep(time.Duration(n * 100000000)) // fake some load fmt.Println(fmt.Sprintf("This is job item %d", n)) return nil })) } } }, func(ctx *testContext, results map[uint64]error) error { for jobIndex, err := range results { var result Result if errors.As(err, &result) { fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result)) } } return nil }) p.AddStep(pool) p.RunWithContext(&testContext{Context: context.Background()})
Output: This is job item 0 This is job item 1 This is job item 2
func (Step[T]) When ¶ added in v0.19.0
When sets Step.Condition. When the given predicate returns false, the step is skipped without error.
func (Step[T]) WithErrorHandler ¶ added in v0.10.0
func (s Step[T]) WithErrorHandler(errorHandler ErrorHandler[T]) Step[T]
WithErrorHandler sets the ErrorHandler of this specific step and returns the step itself.
type Supplier ¶ added in v0.13.0
Supplier is a function that spawns Pipeline for consumption. Supply new pipelines by putting new Pipeline instances into the given channel. The function must close the channel once all pipelines are spawned (`defer close()` recommended).
The parent pipeline may get canceled, thus the given context is provided to stop putting more Pipeline instances into the channel. Use
select { case <-ctx.Done(): return, default: pipelinesChan <- ... }
to cancel the supply, otherwise you may leak an orphaned goroutine.
func SupplierFromSlice ¶ added in v0.13.0
SupplierFromSlice returns a Supplier that accepts the given slice of Pipeline and iterates over it to feed the channel.
Context cancellation is only effective if the channel is limited in size. All pipelines may get executed even if the parent pipeline has been canceled, unless each child Pipeline listens for context.Done() in their steps.