pipeline

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2021 License: Apache-2.0 Imports: 2 Imported by: 30

README

go-command-pipeline

Go version Version Go Report Card Codecov

Small Go utility that executes business actions in a pipeline.

Usage

import (
    pipeline "github.com/ccremer/go-command-pipeline"
    "github.com/ccremer/go-command-pipeline/predicate"
)

func main() {
	number := 0 // define arbitrary data to pass around in the steps.
	p := pipeline.NewPipeline()
	p.WithContext(&number)
	p.WithSteps(
		pipeline.NewStep("define random number", defineNumber),
		pipeline.NewStepFromFunc("print number", printNumber),
	)
	result := p.Run()
	if !result.IsSuccessful() {
		log.Fatal(result.Err)
	}
}

func defineNumber(ctx pipeline.Context) pipeline.Result {
	ctx.(*int) = 10
	return pipeline.Result{}
}

// Let's assume this is a business function that can fail.
// You can enable "automatic" fail-on-first-error pipelines by having more small functions that return errors.
func printNumber(ctx pipeline.Context) error {
	number := ctx.(*int)
	_, err := fmt.Println(*number)
	return err
}

See more usage in the examples dir

Who is it for

This utility is interesting for you if you have many business functions that are executed sequentially, each with their own error handling. Do you grow tired of the tedious error handling in Go when all you do is passing the error "up" in the stack in over 90% of the cases, only to log it at the root? This utility helps you focus on the business logic by dividing each failure-prone action into small steps since pipeline aborts on first error.

Consider the following prose example:

func Persist(data Data) error {
    err := database.prepareTransaction()
    if err != nil {
        return err
    }
    err = database.executeQuery("SOME QUERY", data)
    if err != nil {
        return err
    }
    err = database.commit()
    return err
}

We have tons of if err != nil that bloats the function with more error handling than actual interesting business logic.

It could be simplified to something like this:

func Persist(data Data) error {
    p := pipeline.NewPipeline().WithContext(data).WithSteps(
        pipeline.NewStep("prepareTransaction", prepareTransaction()),
        pipeline.NewStep("executeQuery", executeQuery()),
        pipeline.NewStep("commitTransaction", commit()),
    )
    return p.Run().Err
}

func executeQuery() pipeline.ActionFunc {
	return func(ctx pipeline.Context) pipeline.Result {
		data := ctx.(Data)
		err := database.executeQuery("SOME QUERY", data)
		return pipeline.Result{Err: err}
	)
}
...

While it seems to add more lines in order to set up a pipeline, it makes it very easily understandable what Persist() does without all the error handling.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAbort = errors.New("abort")

ErrAbort indicates that the pipeline should be terminated immediately without returning an error.

Functions

This section is empty.

Types

type ActionFunc

type ActionFunc func(ctx Context) Result

ActionFunc is the func that contains your business logic. The context is a user-defined arbitrary data of type interface{} that gets provided in every Step, but may be nil if not set.

type Context added in v0.5.0

type Context interface{}

Context contains arbitrary data relevant for the pipeline execution.

type Listener added in v0.7.0

type Listener func(step Step)

Listener is a simple func that listens to Pipeline events.

type Option added in v0.12.0

type Option func(pipeline *Pipeline)

Option configures the given Pipeline with a behaviour-altering setting.

var DisableErrorWrapping Option = func(pipeline *Pipeline) {
	pipeline.disableErrorWrapping = true
}

DisableErrorWrapping disables the wrapping of errors that are emitted from pipeline steps. This effectively causes Result.Err to be exactly the error as returned from a step.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline holds and runs intermediate actions, called "steps".

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline returns a new quiet Pipeline instance with KeyValueContext.

func NewPipelineWithContext added in v0.5.0

func NewPipelineWithContext(ctx Context) *Pipeline

NewPipelineWithContext returns a new Pipeline instance with the given context.

func (*Pipeline) AddBeforeHook added in v0.7.0

func (p *Pipeline) AddBeforeHook(listener Listener) *Pipeline

AddBeforeHook adds the given listener to the list of hooks. See WithBeforeHooks.

func (*Pipeline) AddStep

func (p *Pipeline) AddStep(step Step) *Pipeline

AddStep appends the given step to the Pipeline at the end and returns itself.

func (*Pipeline) AsNestedStep

func (p *Pipeline) AsNestedStep(name string) Step

AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines. The properties are passed to the nested pipeline.

func (*Pipeline) Run

func (p *Pipeline) Run() Result

Run executes the pipeline and returns the result. Steps are executed sequentially as they were added to the Pipeline. If a Step returns a Result with a non-nil error, the Pipeline is aborted and its Result contains the affected step's error. However, if Result.Err is wrapped in ErrAbort, then the pipeline is aborted, but the final Result.Err will be nil.

func (*Pipeline) WithBeforeHooks added in v0.7.0

func (p *Pipeline) WithBeforeHooks(listeners []Listener) *Pipeline

WithBeforeHooks takes a list of listeners. Each Listener.Accept is called once in the given order just before the ActionFunc is invoked. The listeners should return as fast as possible, as they are not intended to do actual business logic.

func (*Pipeline) WithContext added in v0.5.0

func (p *Pipeline) WithContext(ctx Context) *Pipeline

WithContext returns itself while setting the context for the pipeline steps.

func (*Pipeline) WithFinalizer added in v0.8.0

func (p *Pipeline) WithFinalizer(handler ResultHandler) *Pipeline

WithFinalizer returns itself while setting the finalizer for the pipeline. The finalizer is a handler that gets called after the last step is in the pipeline is completed. If a pipeline aborts early then it is also called.

func (*Pipeline) WithNestedSteps added in v0.4.0

func (p *Pipeline) WithNestedSteps(name string, steps ...Step) Step

WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters.

func (*Pipeline) WithOptions added in v0.12.0

func (p *Pipeline) WithOptions(options ...Option) *Pipeline

WithOptions configures the Pipeline with settings. The options are applied immediately. Options are applied to nested pipelines provided they are set before building the nested pipeline. Nested pipelines can be configured with their own options.

func (*Pipeline) WithSteps

func (p *Pipeline) WithSteps(steps ...Step) *Pipeline

WithSteps appends the given arrway of steps to the Pipeline at the end and returns itself.

type Result

type Result struct {
	// Err contains the step's returned error, nil otherwise.
	Err error
	// Name is an optional identifier for a result.
	// ActionFunc may set this property before returning to help a ResultHandler with further processing.
	Name string
}

Result is the object that is returned after each step and after running a pipeline.

func (Result) IsFailed added in v0.2.0

func (r Result) IsFailed() bool

IsFailed returns true if the contained error is non-nil.

func (Result) IsSuccessful

func (r Result) IsSuccessful() bool

IsSuccessful returns true if the contained error is nil.

type ResultHandler added in v0.3.0

type ResultHandler func(ctx Context, result Result) error

ResultHandler is a func that gets called when a step's ActionFunc has finished with any Result. Context may be nil.

type Step

type Step struct {
	// Name describes the step's human-readable name.
	// It has no other uses other than easily identifying a step for debugging or logging.
	Name string
	// F is the ActionFunc assigned to a pipeline Step.
	// This is required.
	F ActionFunc
	// H is the ResultHandler assigned to a pipeline Step.
	// This is optional, and it will be called in any case if it is set after F completed.
	// Use cases could be logging, updating a GUI or handle errors while continuing the pipeline.
	// The function may return nil even if the Result contains an error, in which case the pipeline will continue.
	// This function is called before the next step's F is invoked.
	H ResultHandler
}

Step is an intermediary action and part of a Pipeline.

func NewStep

func NewStep(name string, action ActionFunc) Step

NewStep returns a new Step with given name and action.

func NewStepFromFunc added in v0.5.0

func NewStepFromFunc(name string, fn func(ctx Context) error) Step

NewStepFromFunc returns a new Step with given name using a function that expects an error.

func (Step) WithErrorHandler added in v0.10.0

func (s Step) WithErrorHandler(errorHandler func(ctx Context, err error) error) Step

WithErrorHandler wraps given errorHandler and sets the ResultHandler of this specific step and returns the step itself. The difference to WithResultHandler is that errorHandler only gets called if Result.Err is non-nil.

func (Step) WithResultHandler added in v0.3.0

func (s Step) WithResultHandler(handler ResultHandler) Step

WithResultHandler sets the ResultHandler of this specific step and returns the step itself.

Directories

Path Synopsis
Package parallel extends the command-pipeline core with concurrency steps.
Package parallel extends the command-pipeline core with concurrency steps.
Package predicate provides functions that wrap existing actions but executes them only on conditions ("predicates").
Package predicate provides functions that wrap existing actions but executes them only on conditions ("predicates").

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL