pipeline

package module
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2022 License: Apache-2.0 Imports: 7 Imported by: 30

README

go-command-pipeline

Go version Version Go Reference Go Report Card Codecov

Small Go utility that executes business actions (functions) in a pipeline. This utility is for you if you think that the business logic is distracted by Go's error handling with if err != nil all over the place.

Usage

import (
    "context"
    pipeline "github.com/ccremer/go-command-pipeline"
)

type Data struct {
    context.Context
    Number int
}

func main() {
	data := &Data{context.Background(), 0} // define arbitrary data to pass around in the steps.
	p := pipeline.NewPipeline[*Data]()
	// define business steps neatly in one place:
	p.WithSteps(
		p.NewStep("define random number", defineNumber),
		p.NewStep("print number", printNumber),
	)
	err := p.RunWithContext(data)
	if err != nil {
		log.Fatal(result)
	}
}

func defineNumber(ctx *Data) error {
	ctx.Number = 10
	return nil
}

// Let's assume this is a business function that can fail.
// You can enable "automatic" fail-on-first-error pipelines by having more small functions that return errors.
func printNumber(ctx *Data) error {
	_, err := fmt.Println(ctx.Number)
	return err
}

See more usage in the examples dir

Who is it for

This utility is interesting for you if you have many business functions that are executed sequentially, each with their own error handling. Do you grow tired of the tedious error handling in Go when all you do is passing the error "up" in the stack in over 90% of the cases, only to log it at the root? This utility helps you focus on the business logic by dividing each failure-prone action into small steps since pipeline aborts on first error.

Consider the following prose example:

func Persist(data Data) error {
    err := database.prepareTransaction()
    if err != nil {
        return err
    }
    err = database.executeQuery("SOME QUERY", data)
    if err != nil {
        return err
    }
    err = database.commit()
    return err
}

We have tons of if err != nil that bloats the function with more error handling than actual interesting business logic.

It could be simplified to something like this:

func Persist(data *Data) error {
    p := pipeline.NewPipeline[*Data]()
    p.WithSteps(
        p.NewStep("prepareTransaction", prepareTransaction()),
        p.NewStep("executeQuery", executeQuery()),
        p.NewStep("commitTransaction", commit()),
    )
    return p.RunWithContext(data)
}

func executeQuery() pipeline.ActionFunc[*Data] {
	return func(data *Data) error {
		err := database.executeQuery("SOME QUERY", data)
		return err
	)
}
...

While it seems to add more lines in order to set up a pipeline, it makes it very easily understandable what Persist() does without all the error handling. Plus, each small step might get easier to unit test.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadFromContext added in v0.16.0

func LoadFromContext(ctx context.Context, key any) (any, bool)

LoadFromContext returns the value from the given context with the given key. It returns the value and true, or nil and false if the key doesn't exist. It returns nil and true if the key exists and the value actually is nil. Use StoreInContext to store values.

Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.

func LoadFromContextOrDefault added in v0.19.0

func LoadFromContextOrDefault(ctx context.Context, key any, defValue any) any

LoadFromContextOrDefault is similar to MustLoadFromContext, except it returns the given default value if the key doesn't exist. Use StoreInContext to store values.

Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.

func MustLoadFromContext added in v0.17.0

func MustLoadFromContext(ctx context.Context, key any) any

MustLoadFromContext is similar to LoadFromContext, except it doesn't return a bool to indicate whether the key exists. It panics if the key doesn't exist. Use StoreInContext to store values.

Note: This method is thread-safe, but panics if the ctx has not been set up with MutableContext first.

func MutableContext added in v0.16.0

func MutableContext(parent context.Context) context.Context

MutableContext adds a map to the given context that can be used to store mutable values in the context. It uses sync.Map under the hood. Repeated calls to MutableContext with the same parent has no effect and returns the same context.

See also StoreInContext and LoadFromContext.

Example
type key struct{}

ctx := MutableContext(context.Background())
p := NewPipeline[context.Context]().WithSteps(
	NewStep("store value", func(ctx context.Context) error {
		StoreInContext(ctx, key{}, "value")
		return nil
	}),
	NewStep("retrieve value", func(ctx context.Context) error {
		value, _ := LoadFromContext(ctx, key{})
		fmt.Println(value)
		return nil
	}),
)
_ = p.RunWithContext(ctx)
Output:

value

func StoreInContext added in v0.16.0

func StoreInContext(ctx context.Context, key, value any)

StoreInContext adds the given key and value to ctx. Any keys or values added during pipeline execution is available in the next steps, provided the pipeline runs synchronously. In parallel executed pipelines you may encounter race conditions. Use LoadFromContext to retrieve values.

Note: This method is thread-safe, but panics if ctx has not been set up with MutableContext first.

Types

type ActionFunc

type ActionFunc[T context.Context] func(ctx T) error

ActionFunc is the func that contains your business logic.

type DependencyError added in v0.20.0

type DependencyError struct {
	// MissingSteps returns a slice of Step or ActionFunc names.
	MissingSteps []string
}

DependencyError is an error that indicates which steps did not satisfy dependency requirements.

func (*DependencyError) Error added in v0.20.0

func (d *DependencyError) Error() string

Error returns a stringed list of steps that did not run either by Step or ActionFunc name.

type DependencyRecorder added in v0.20.0

type DependencyRecorder[T context.Context] struct {
	// Records contains a slice of Steps that were run.
	// It contains also the last Step that failed with an error.
	Records []Step[T]
}

DependencyRecorder is a Recorder and DependencyResolver that tracks each Step executed and can be used to query if certain steps are in the Records.

func NewDependencyRecorder added in v0.20.0

func NewDependencyRecorder[T context.Context]() *DependencyRecorder[T]

NewDependencyRecorder returns a new instance of DependencyRecorder.

func (*DependencyRecorder[T]) MustRequireDependencyByFuncName added in v0.20.0

func (s *DependencyRecorder[T]) MustRequireDependencyByFuncName(actions ...ActionFunc[T])

MustRequireDependencyByFuncName implements DependencyResolver.MustRequireDependencyByFuncName.

func (*DependencyRecorder[T]) MustRequireDependencyByStepName added in v0.20.0

func (s *DependencyRecorder[T]) MustRequireDependencyByStepName(stepNames ...string)

MustRequireDependencyByStepName implements DependencyResolver.MustRequireDependencyByStepName.

func (*DependencyRecorder[T]) Record added in v0.20.0

func (s *DependencyRecorder[T]) Record(step Step[T])

Record implements Recorder.

func (*DependencyRecorder[T]) RequireDependencyByFuncName added in v0.20.0

func (s *DependencyRecorder[T]) RequireDependencyByFuncName(actions ...ActionFunc[T]) error

RequireDependencyByFuncName implements DependencyResolver.RequireDependencyByFuncName.

Direct function pointers can easily be compared:

func myFunc(ctx context.Context) error {
  return nil
}
...
pipe.AddStep("test", myFunc)
...
recorder.RequireDependencyByFuncName(myFunc)

Note that you may experience unexpected behaviour when dealing with generative functions. For example, the following snippet will not work, since the function names from 2 different call locations are different:

generateFunc() func(ctx context.Context) error {
  return func(ctx context.Context) error {
    return nil
  }
}
...
pipe.AddStep("test", generateFunc())
...
recorder.RequireDependencyByFuncName(generateFunc()) // will end in an error

As an alternative, you may store the generated function in a variable that is accessible from multiple locations:

var genFunc = generateFunc()
...
pipe.AddStep("test", genFunc())
...
recorder.RequireDependencyByFuncName(genFunc()) // works
Example
action := func(_ context.Context) error {
	fmt.Println("running step 1")
	return nil
}
recorder := NewDependencyRecorder[context.Context]()
p := NewPipeline[context.Context]().WithBeforeHooks(recorder.Record)
p.WithSteps(
	p.When(Bool[context.Context](false), "step 1", action),
	p.NewStep("step 2", func(_ context.Context) error {
		if err := recorder.RequireDependencyByFuncName(action); err != nil {
			return err
		}
		// this won't run
		fmt.Println("running step 2")
		return nil
	}),
)
err := p.RunWithContext(context.Background())
fmt.Println(err)
Output:

step 'step 2' failed: required steps did not run: [github.com/ccremer/go-command-pipeline.ExampleDependencyRecorder_RequireDependencyByFuncName.func1]

func (*DependencyRecorder[T]) RequireDependencyByStepName added in v0.20.0

func (s *DependencyRecorder[T]) RequireDependencyByStepName(stepNames ...string) error

RequireDependencyByStepName implements DependencyResolver.RequireDependencyByStepName. A DependencyError is returned with a list of names that aren't in the Records. Steps that share the same name are not distinguishable.

Example
recorder := NewDependencyRecorder[context.Context]()
p := NewPipeline[context.Context]().WithBeforeHooks(recorder.Record)
p.WithSteps(
	p.When(Bool[context.Context](false), "step 1", func(_ context.Context) error {
		fmt.Println("running step 1")
		return nil
	}),
	p.NewStep("step 2", func(_ context.Context) error {
		if err := recorder.RequireDependencyByStepName("step 1"); err != nil {
			return err
		}
		// this won't run
		fmt.Println("running step 2")
		return nil
	}),
)
err := p.RunWithContext(context.Background())
fmt.Println(err)
Output:

step 'step 2' failed: required steps did not run: [step 1]

type DependencyResolver added in v0.20.0

type DependencyResolver[T context.Context] interface {
	Recorder[T]
	// RequireDependencyByStepName checks if any of the given step names are present in the Records.
	// It returns nil if all given step names are in the Records in any order.
	RequireDependencyByStepName(stepNames ...string) error
	// MustRequireDependencyByStepName is RequireDependencyByStepName but any non-nil errors result in a panic.
	MustRequireDependencyByStepName(stepNames ...string)
	// RequireDependencyByFuncName checks if any of the given action functions are present in the Records.
	// It returns nil if all given functions are in the Records in any order.
	// Since functions aren't comparable for equality, the resolver attempts to compare them by name through reflection.
	RequireDependencyByFuncName(actions ...ActionFunc[T]) error
	// MustRequireDependencyByFuncName is RequireDependencyByFuncName but any non-nil errors result in a panic.
	MustRequireDependencyByFuncName(actions ...ActionFunc[T])
}

DependencyResolver provides means to query if a pipeline Step is satisfied as a dependency for another Step. It is used together with Recorder.

type ErrorHandler added in v0.19.0

type ErrorHandler[T context.Context] func(ctx T, err error) error

ErrorHandler is a func that gets called when a step's ActionFunc has finished with an error.

type Listener added in v0.7.0

type Listener[T context.Context] func(step Step[T])

Listener is a simple func that listens to Pipeline events.

type Options added in v0.19.0

type Options struct {
	// DisableErrorWrapping disables the wrapping of errors that are emitted from pipeline steps.
	// This effectively causes error to be exactly the error as returned from a step.
	// The step's name is omitted from the error message.
	DisableErrorWrapping bool
}

Options configures the given Pipeline with a behaviour-altering settings.

type ParallelResultHandler added in v0.13.0

type ParallelResultHandler[T context.Context] func(ctx T, results map[uint64]error) error

ParallelResultHandler is a callback that provides a Result map and expect a single, combined Result object. The map key is a zero-based index of n-th Pipeline spawned, e.g. pipeline number 3 will have index 2. Return an empty error if you want to ignore errors, or reduce multiple errors into a single one to make the parent Pipeline fail.

type Pipeline

type Pipeline[T context.Context] struct {
	// contains filtered or unexported fields
}

Pipeline holds and runs intermediate actions, called "steps".

func NewPipeline

func NewPipeline[T context.Context]() *Pipeline[T]

NewPipeline returns a new Pipeline instance.

func (*Pipeline[T]) AddStep

func (p *Pipeline[T]) AddStep(step Step[T]) *Pipeline[T]

AddStep appends the given step to the Pipeline at the end and returns itself.

func (*Pipeline[T]) AddStepFromFunc added in v0.15.0

func (p *Pipeline[T]) AddStepFromFunc(name string, fn ActionFunc[T]) *Pipeline[T]

AddStepFromFunc appends the given function to the Pipeline at the end and returns itself.

func (*Pipeline[T]) AsNestedStep

func (p *Pipeline[T]) AsNestedStep(name string) Step[T]

AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines. The properties are passed to the nested pipeline.

func (*Pipeline[T]) NewStep added in v0.19.0

func (p *Pipeline[T]) NewStep(name string, action ActionFunc[T]) Step[T]

NewStep is syntactic sugar for NewStep but with T already set.

func (*Pipeline[T]) RunWithContext added in v0.13.0

func (p *Pipeline[T]) RunWithContext(ctx T) error

RunWithContext executes the Pipeline. Steps are executed sequentially as they were added to the Pipeline. Upon cancellation of the context, the pipeline does not terminate a currently running step, instead it skips the remaining steps in the execution order. The context is passed to each Step.Action and each Step may need to listen to the context cancellation event to truly cancel a long-running step. If the pipeline gets canceled, the context's error is returned.

All non-nil errors, except the error returned from the pipeline's finalizer, are wrapped in Result. This can be used to retrieve the metadata of the step that returned the error with errors.As:

err := p.RunWithContext(ctx)
var result pipeline.Result
if errors.As(err, &result) {
  fmt.Println(result.Name())
}
Example
// prepare pipeline
type exampleContext struct {
	context.Context
	field string
}

p := NewPipeline[*exampleContext]()
p.WithSteps(
	p.NewStep("short step", func(ctx *exampleContext) error {
		fmt.Println(ctx.field)
		return nil
	}),
	p.NewStep("long running step", func(ctx *exampleContext) error {
		time.Sleep(100 * time.Millisecond)
		return nil
	}),
	p.NewStep("canceled step", func(ctx *exampleContext) error {
		return errors.New("shouldn't execute")
	}),
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
pctx := &exampleContext{ctx, "hello world"}
err := p.RunWithContext(pctx)
// inspect the result
fmt.Println(err)
Output:

hello world
step 'canceled step' failed: context deadline exceeded

func (*Pipeline[T]) When added in v0.19.0

func (p *Pipeline[T]) When(predicate Predicate[T], name string, action ActionFunc[T]) Step[T]

When is syntactic sugar for NewStep combined with Step.When.

Example
p := NewPipeline[context.Context]()
p.WithSteps(
	p.When(Bool[context.Context](true), "run", func(ctx context.Context) error {
		return nil
	}),
)
Output:

func (*Pipeline[T]) WithBeforeHooks added in v0.7.0

func (p *Pipeline[T]) WithBeforeHooks(listeners ...Listener[T]) *Pipeline[T]

WithBeforeHooks takes a list of listeners. Each Listener is called once in the given order just before the ActionFunc is invoked. The listeners should return as fast as possible, as they are not intended to do actual business logic.

func (*Pipeline[T]) WithFinalizer added in v0.8.0

func (p *Pipeline[T]) WithFinalizer(handler ErrorHandler[T]) *Pipeline[T]

WithFinalizer returns itself while setting the finalizer for the pipeline. The finalizer is a handler that gets called after the last step is in the pipeline is completed. If a pipeline aborts early or gets canceled then it is also called.

func (*Pipeline[T]) WithNestedSteps added in v0.4.0

func (p *Pipeline[T]) WithNestedSteps(name string, predicate Predicate[T], steps ...Step[T]) Step[T]

WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters. When predicate is non-nil then the steps are only executed if it evaluates to `true`.

func (*Pipeline[T]) WithOptions added in v0.12.0

func (p *Pipeline[T]) WithOptions(options Options) *Pipeline[T]

WithOptions configures the Pipeline with settings. The Options are applied immediately. Options are applied to nested pipelines provided they are set before building the nested pipeline. Nested pipelines can be configured with their own Options.

func (*Pipeline[T]) WithSteps

func (p *Pipeline[T]) WithSteps(steps ...Step[T]) *Pipeline[T]

WithSteps appends the given array of steps to the Pipeline at the end and returns itself.

type Predicate

type Predicate[T context.Context] func(ctx T) bool

Predicate is a function that expects 'true' if an ActionFunc should run. It is evaluated lazily resp. only when needed. Predicate should be idempotent, meaning multiple invocations return the same result and without side effects.

func And

func And[T context.Context](p1, p2 Predicate[T]) Predicate[T]

And returns a Predicate that does logical AND of the given predicates. p2 is not evaluated if p1 evaluates already to false.

func Bool

func Bool[T context.Context](v bool) Predicate[T]

Bool returns a Predicate that simply returns v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.

func BoolPtr added in v0.13.0

func BoolPtr[T context.Context](v *bool) Predicate[T]

BoolPtr returns a Predicate that returns *v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.

func Not

func Not[T context.Context](predicate Predicate[T]) Predicate[T]

Not returns a Predicate that evaluates, but then negates the given Predicate.

func Or added in v0.13.0

func Or[T context.Context](p1, p2 Predicate[T]) Predicate[T]

Or returns a Predicate that does logical OR of the given predicates. p2 is not evaluated if p1 evaluates already to true.

type Recorder added in v0.20.0

type Recorder[T context.Context] interface {
	// Record adds the step to the execution Records.
	Record(step Step[T])
}

Recorder Records the steps executed in a pipeline.

type Result

type Result interface {
	error
	// Name retrieves the name of the (last) step that has been executed.
	Name() string
}

Result is the object that is returned after each step and after running a pipeline.

type Step

type Step[T context.Context] struct {
	// Name describes the step's human-readable name.
	// It has no other uses other than easily identifying a step for debugging or logging.
	Name string
	// Action is the ActionFunc assigned to a pipeline Step.
	// This is required.
	Action ActionFunc[T]
	// Handler is the ErrorHandler assigned to a pipeline Step.
	// This is optional, and it will be called if it is set after Action completed.
	// Use cases could be logging, updating a GUI or handle errors while continuing the pipeline.
	// The function may return nil even if the given error is non-nil, in which case the pipeline will continue.
	// This function is called before the next step's Action is invoked.
	Handler ErrorHandler[T]
	// Condition determines if the Step's Action is actually going to be executed in the pipeline.
	// When nil, the Action is executed.
	Condition Predicate[T]
}

Step is an intermediary action and part of a Pipeline.

func NewFanOutStep added in v0.13.0

func NewFanOutStep[T context.Context](name string, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T]

NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.

If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances. Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler. However, the error returned from ParallelResultHandler is wrapped in context.Canceled.

Example
p := NewPipeline[context.Context]()
fanout := NewFanOutStep[context.Context]("fanout", func(ctx context.Context, pipelines chan *Pipeline[context.Context]) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		select {
		case <-ctx.Done():
			return // parent pipeline has been canceled, let's not create more pipelines.
		default:
			p := NewPipeline[context.Context]()
			pipelines <- p.AddStep(p.NewStep(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
				time.Sleep(time.Duration(n * 10000000)) // fake some load
				fmt.Println(fmt.Sprintf("I am worker %d", n))
				return nil
			}))
		}
	}
}, func(ctx context.Context, results map[uint64]error) error {
	for worker, result := range results {
		if result != nil {
			fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result))
		}
	}
	return nil
})
p.AddStep(fanout)
p.RunWithContext(context.Background())
Output:

I am worker 0
I am worker 1
I am worker 2

func NewStep

func NewStep[T context.Context](name string, action ActionFunc[T]) Step[T]

NewStep returns a new Step with given name and action.

func NewStepIf added in v0.19.0

func NewStepIf[T context.Context](predicate Predicate[T], name string, actionFunc ActionFunc[T]) Step[T]

NewStepIf is syntactic sugar for NewStep with Step.When.

func NewWorkerPoolStep added in v0.13.0

func NewWorkerPoolStep[T context.Context](name string, size int, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T]

NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.

  • If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
  • The pipelines are executed in a pool of a number of Go routines indicated by size.
  • If size is 1, the pipelines are effectively run in sequence.
  • If size is 0 or less, the function panics.
Example
p := NewPipeline[*testContext]()
pool := NewWorkerPoolStep[*testContext]("pool", 2, func(ctx *testContext, pipelines chan *Pipeline[*testContext]) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		select {
		case <-ctx.Done():
			return // parent pipeline has been canceled, let's not create more pipelines.
		default:
			newP := NewPipeline[*testContext]()
			pipelines <- newP.AddStep(newP.NewStep(fmt.Sprintf("i = %d", n), func(_ *testContext) error {
				time.Sleep(time.Duration(n * 100000000)) // fake some load
				fmt.Println(fmt.Sprintf("This is job item %d", n))
				return nil
			}))
		}
	}
}, func(ctx *testContext, results map[uint64]error) error {
	for jobIndex, err := range results {
		var result Result
		if errors.As(err, &result) {
			fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result))
		}
	}
	return nil
})
p.AddStep(pool)
p.RunWithContext(&testContext{Context: context.Background()})
Output:

This is job item 0
This is job item 1
This is job item 2

func (Step[T]) When added in v0.19.0

func (s Step[T]) When(predicate Predicate[T]) Step[T]

When sets Step.Condition. When the given predicate returns false, the step is skipped without error.

func (Step[T]) WithErrorHandler added in v0.10.0

func (s Step[T]) WithErrorHandler(errorHandler ErrorHandler[T]) Step[T]

WithErrorHandler sets the ErrorHandler of this specific step and returns the step itself.

type Supplier added in v0.13.0

type Supplier[T context.Context] func(ctx T, pipelinesChan chan *Pipeline[T])

Supplier is a function that spawns Pipeline for consumption. Supply new pipelines by putting new Pipeline instances into the given channel. The function must close the channel once all pipelines are spawned (`defer close()` recommended).

The parent pipeline may get canceled, thus the given context is provided to stop putting more Pipeline instances into the channel. Use

select { case <-ctx.Done(): return, default: pipelinesChan <- ... }

to cancel the supply, otherwise you may leak an orphaned goroutine.

func SupplierFromSlice added in v0.13.0

func SupplierFromSlice[T context.Context](pipelines []*Pipeline[T]) Supplier[T]

SupplierFromSlice returns a Supplier that accepts the given slice of Pipeline and iterates over it to feed the channel.

Context cancellation is only effective if the channel is limited in size. All pipelines may get executed even if the parent pipeline has been canceled, unless each child Pipeline listens for context.Done() in their steps.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL