chain

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: MIT Imports: 5 Imported by: 0

README

Chain: A Flexible Action-Pipeline Library

The chain package provides a flexible framework for building and executing sequential workflows (pipelines) of actions. It allows you to define a series of steps, each represented by an Action, which processes inputs and produces outputs. The package supports conditional branching, customizable execution paths, and error handling, enabling complex workflows with minimal boilerplate.

Features

  • Action and Pipeline Composition
    Create modular, reusable units of work (Action) and orchestrate them into robust workflows using Pipeline.

  • DAG-based Execution Plans
    Ensure predictable execution and robust validation with built-in cycle detection to enforce acyclic workflows.

  • Nested Pipelines
    Use pipelines as actions within other pipelines, enabling modular and hierarchical workflow designs.

  • Conditional Branching
    Support for Success, Failure, Abort and custom direction-based branching within your execution flows.

  • AggregateAction Support
    Simplify the orchestration of complex workflows by combining Actions or Pipelines of different types into a unified control flow using AggregateAction.

Getting Started

Installation

To install the chain package, use the following command:

go get github.com/JSYoo5B/chain
Key Concepts
Action

An Action represents a single task in the pipeline. Each action can process input data and return output or an error.

type Action[T any] interface {
    Name() string
    Run(ctx context.Context, input T) (output T, err error)
}
BranchAction

A BranchAction extends Action and supports conditional branching. It can change the execution flow based on the results of the action, allowing for multiple execution paths.

type BranchAction[T any] interface {
    Name() string
    Run(ctx context.Context, input T) (output T, err error)
    Directions() []string
    NextDirection(ctx context.Context, output T) (direction string, err error)
}
Pipeline

A Pipeline is a sequence of Actions executed in order. It orchestrates the flow of data between actions and handles branching, success, error, and abort conditions.

ActionPlan

An ActionPlan is a map that associates a direction (e.g., success, error, abort) with the next Action to execute, defining the flow of a pipeline.

type ActionPlan[T any] map[string]Action[T]

Examples

Practical examples for using the chain package will be added in future updates. Stay tuned!

Documentation

Index

Constants

View Source
const (
	// Success represents the direction indicating that the action completed successfully
	// and the pipeline should continue.
	Success = "success"
	// Error represents the direction indicating that an error occurred,
	// and the pipeline should handle it accordingly.
	Error = "error"
	// Abort represents the direction indicating that
	// the pipeline execution should be aborted immediately.
	// This can occur due to a specific Abort condition or
	// in cases of unexpected errors or panics that cause the pipeline to halt.
	Abort = "abort"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action[T any] interface {
	// Name provides the identifier of this Action.
	Name() string

	// Run executes the Action, processing the input and returning output or an error.
	Run(ctx context.Context, input T) (output T, err error)
}

Action is the basic unit of execution in a package. It represents a single task that processes input and produces output.

func NewAggregateAction

func NewAggregateAction[T any, U any](
	action Action[U],
	getter AggregateGetter[T, U],
	setter AggregateSetter[T, U],
) Action[T]

NewAggregateAction creates an Action that works with a composite data structure (T), where T is a complex type (e.g., a struct with multiple fields) and U is the data type that the Action operates on. The AggregateGetter and AggregateSetter functions are used to extract U from T and re-integrate the processed result back into T.

func NewSimpleAction

func NewSimpleAction[T any](name string, runFunc RunFunc[T]) Action[T]

NewSimpleAction creates a new Action with a custom Run function, which can be a pure function or closure. The provided runFunc must match the RunFunc signature, where T is a generic type representing the input and output types for the Action's execution.

This allows for the creation of simple Actions without manually defining a separate struct that implements the Action interface.

func Terminate

func Terminate[T any]() Action[T]

Terminate explicitly ends execution in a Pipeline by returning nil. It signals that no further actions will be executed.

Use in ActionPlan to clearly indicate termination intent.

type ActionPlan

type ActionPlan[T any] map[string]Action[T]

ActionPlan represents a map that associates a direction (Success, Error, Abort, and other custom branching directions) with the next Action to execute. It is used to define the flow of actions in a pipeline based on the direction of execution.

func DefaultPlan

func DefaultPlan[T any](success, error Action[T]) ActionPlan[T]

DefaultPlan returns a standard ActionPlan with valid next actions for Success and Error, and Termination for Abort.

func DefaultPlanWithAbort

func DefaultPlanWithAbort[T any](success, error, abort Action[T]) ActionPlan[T]

DefaultPlanWithAbort returns an ActionPlan with valid next actions for Success, Error, and Abort.

func SuccessOnlyPlan

func SuccessOnlyPlan[T any](success Action[T]) ActionPlan[T]

SuccessOnlyPlan returns an ActionPlan where only a success direction has a valid next action, and Error and Abort both lead to termination.

func TerminationPlan

func TerminationPlan[T any]() ActionPlan[T]

TerminationPlan returns an ActionPlan with all directions leading to termination immediately, providing a clear indication of termination rather than using nil.

type AggregateGetter

type AggregateGetter[T any, U any] func(T) U

AggregateGetter extracts a subpart (U) from a composite data structure (T). It allows access to a part of the structure without modifying the entire one.

type AggregateSetter

type AggregateSetter[T any, U any] func(T, U) T

AggregateSetter updates a composite data structure (T) with a new subpart (U). It reintegrates the modified part back into the structure and returns the updated structure.

type BranchAction

type BranchAction[T any] interface {
	// Name returns the name of the BranchAction.
	Name() string

	// Run executes the branch action, optionally modifying the input and returning an output.
	// If the input doesn't need changes, it can be passed through as output. The method also
	// returns an error if the action cannot be executed successfully.
	Run(ctx context.Context, input T) (output T, err error)

	// Directions returns a list of possible directions that the pipeline can take.
	// These directions are used for validation and must include all possible values that
	// NextDirection can return.
	Directions() []string

	// NextDirection determines the next execution path based on the result of Run.
	// It is called only if Run succeeds (err == nil).
	// The method returns a direction from the list defined by Directions.
	NextDirection(ctx context.Context, output T) (direction string, err error)
}

BranchAction is an interface for actions that control branching in the execution flow of a Pipeline. It extends the Action interface and adds methods for handling conditional branching based on the execution results.

func NewSimpleBranchAction

func NewSimpleBranchAction[T any](name string, runFunc RunFunc[T], directions []string, branchFunc BranchFunc[T]) BranchAction[T]

NewSimpleBranchAction creates a new BranchAction with customizable directions. It accepts a name for the action, a slice of directions that define the possible control flow, and a BranchFunc that contains the branching logic, which dictates the next direction based on the action's output.

Additionally, a custom runFunc can be provided to define the execution logic of the action. This function must match the RunFunc signature, where T is the generic type representing the input and output types for the action. If no specific execution logic is needed, the runFunc can be provided as `nil`. In this case, the action will simply pass the input through to the output without modification.

This allows for the creation of simple BranchActions without manually defining a separate struct that implements the BranchAction interface.

type BranchFunc

type BranchFunc[T any] func(ctx context.Context, output T) (direction string, err error)

BranchFunc represents the signature for the function that defines the branching logic for a BranchAction in the package. It takes the running context and output as input, and returns the direction for the next step in the process along with any potential error.

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline represents a sequence of Actions that are executed in a structured flow. It executes each of its constituent Actions in sequence, with each Action following its own Run method. The flow proceeds based on the defined structure of the Pipeline, allowing flexible and organized execution of actions to build workflows that can be as simple or complex as needed.

Pipeline implements the Action interface, meaning it can be treated as an Action itself. This allows Pipelines to be composed hierarchically, enabling more complex workflows by nesting Pipelines within other Pipelines.

func NewPipeline

func NewPipeline[T any](name string, memberActions ...Action[T]) *Pipeline[T]

NewPipeline creates a new Pipeline by taking a series of Actions as its members. These Actions will be executed sequentially in the order they are provided, with the output of one Action being passed as input to the next, forming a unidirectional flow of execution.

func (*Pipeline[T]) Name

func (p *Pipeline[T]) Name() string

Name provides the identifier of this Pipeline.

func (*Pipeline[T]) Run

func (p *Pipeline[T]) Run(ctx context.Context, input T) (output T, err error)

Run executes the Pipeline by running Actions in the order they were configured, starting from the initAction, which is the first one of the memberActions provided by the constructor such as NewPipeline. The actions are executed in order, passing the output of one action as input to the next.

func (*Pipeline[T]) RunAt

func (p *Pipeline[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)

RunAt starts the execution of the pipeline from a given Action (initAction). It follows the action plan, executing actions sequentially based on the specified directions. If an action returns an error, the pipeline will proceed to the next action according to the defined plan, potentially directing the flow to an action mapped for the Error direction. The Abort direction, when encountered, will immediately halt the pipeline execution unless the plan specifies otherwise. If no action plan is found for a given direction, the pipeline will terminate with the appropriate error.

func (*Pipeline[T]) SetRunPlan

func (p *Pipeline[T]) SetRunPlan(currentAction Action[T], plan ActionPlan[T])

SetRunPlan updates the execution flow for the given currentAction in the pipeline, by associating it with a specified ActionPlan. The currentAction will be validated to ensure it is a member of the pipeline. The ActionPlan defines the directions (such as Success, Error, Abort) and their corresponding next actions in the execution flow.

If the currentAction is nil or not part of the pipeline, a panic will occur. The plan can be nil, in which case the currentAction will be set to terminate for any direction not explicitly specified in the plan. If a direction is encountered in the plan that is not valid for the currentAction, or if it leads to an invalid action, another panic will occur.

Additionally, self-loops are not allowed in the plan. If the next action for a direction is the current action itself, a panic will be triggered.

func (*Pipeline[T]) ValidateGraph

func (p *Pipeline[T]) ValidateGraph() error

ValidateGraph ensures the pipeline's graph is connected and acyclic. It checks for cycles first, then verifies that all nodes connected as a single graph.

type RunFunc

type RunFunc[T any] func(ctx context.Context, input T) (output T, err error)

RunFunc defines the signature of a function used to implement an Action's execution logic. It is a function that takes an input of type T (a generic type) and returns an output of type T along with any error encountered during execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL