flow

package
v0.0.0-...-3cc3ff2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2021 License: MIT Imports: 16 Imported by: 0

README

Flow manager

Flow managers manage nodes within a given flow. Each node represents an action to be executed (ex: request, rollback). Nodes are executed concurrently from one another. Dependencies are based on references within the given node or if a node dependency is defined. A manager keeps track of all the processes being executed and tracks all the nodes which have been called. If an error is thrown inside one of the processes during the execution of the flow stopped and a rollback initiated.

Branches

Nodes are executed concurrently from one another. When a node is executed a check is performed to check whether the dependencies have been met. Only if all of the dependencies have been met is the node executed.

+------------+
|            |
|    Node    +------------+
|            |            |
+------+-----+            |
       |                  |
       |                  |
+------v-----+     +------v-----+
|            |     |            |
|    Node    |     |    Node    |
|            |     |            |
+------+-----+     +------+-----+
       |                  |
       |                  |
+------v-----+            |
|            |            |
|    Node    <------------+
|            |
+------------+

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAbortFlow = errors.New("abort flow")

ErrAbortFlow represents the error thrown when a flow has to be aborted

Functions

func ConstructBranches

func ConstructBranches(nodes []*Node)

ConstructBranches constructs the node branches based on the made references

func ConstructDependency

func ConstructDependency(node *Node, target string, nodes []*Node)

ConstructDependency constructs a dependency for the given node

func ExecuteFunctions

func ExecuteFunctions(stack functions.Stack, store references.Store) error

ExecuteFunctions executes the given functions and writes the results to the given store

Types

type AfterManager

type AfterManager func(ctx context.Context, manager *Manager, store references.Store) (context.Context, error)

AfterManager is called after a manager is called

type AfterManagerHandler

type AfterManagerHandler func(AfterManager) AfterManager

AfterManagerHandler wraps the after call function to allow middleware to be chained

type AfterNode

type AfterNode func(ctx context.Context, node *Node, tracker Tracker, processes *Processes, store references.Store) (context.Context, error)

AfterNode is called after a node is executed

type AfterNodeHandler

type AfterNodeHandler func(AfterNode) AfterNode

AfterNodeHandler wraps the after node function to allow middleware to be chained

type BeforeManager

type BeforeManager func(ctx context.Context, manager *Manager, store references.Store) (context.Context, error)

BeforeManager is called before a manager get's calles

type BeforeManagerHandler

type BeforeManagerHandler func(BeforeManager) BeforeManager

BeforeManagerHandler wraps the before call function to allow middleware to be chained

type BeforeNode

type BeforeNode func(ctx context.Context, node *Node, tracker Tracker, processes *Processes, store references.Store) (context.Context, error)

BeforeNode is called before a node is executed

type BeforeNodeHandler

type BeforeNodeHandler func(BeforeNode) BeforeNode

BeforeNodeHandler wraps the before node function to allow middleware to be chained

type Call

type Call interface {
	Do(context.Context, references.Store) error
}

Call represents a transport caller implementation

func NewCall

func NewCall(parent *broker.Context, node *specs.Node, options *CallOptions) Call

NewCall constructs a new flow caller from the given transport caller and options

type CallOptions

type CallOptions struct {
	Transport      transport.Call
	Method         transport.Method
	Request        *Request
	Response       *Request
	Err            *OnError
	ExpectedStatus []int
}

CallOptions represents the available options that could be used to construct a new flow caller

type Condition

type Condition struct {
	// contains filtered or unexported fields
}

Condition represents a condition which could be evaluated and results in a boolean

func NewCondition

func NewCondition(stack functions.Stack, expression Expression) *Condition

NewCondition constructs a new condition of the given functions stack and specs condition

func (*Condition) Eval

func (condition *Condition) Eval(ctx *broker.Context, store references.Store) (bool, error)

Eval evaluates the given condition with the given reference store

type Expression

type Expression interface {
	specs.Evaluable

	GetParameters() *specs.ParameterMap
}

Expression represents expression that contains the list of parameters and can be evaluated

type Manager

type Manager struct {
	BeforeDo       BeforeManager
	BeforeRollback BeforeManager

	Name       string
	Starting   []*Node
	References int
	Nodes      []*Node
	Ends       int
	Error      specs.ErrorHandle

	AfterFunctions functions.Stack
	AfterDo        AfterManager
	AfterRollback  AfterManager
	// contains filtered or unexported fields
}

Manager is responsible for the handling of a flow and its steps

func NewManager

func NewManager(parent *broker.Context, name string, nodes []*Node, err specs.ErrorHandle, after functions.Stack, middleware *ManagerMiddleware) *Manager

NewManager constructs a new manager for the given flow. Branches are constructed for the constructed nodes to optimalise performance. Various variables such as the amount of nodes, references and loose ends are collected to optimalise allocations during runtime.

func (*Manager) Do

func (manager *Manager) Do(ctx context.Context, refs references.Store) error

Do calls all the nodes inside the manager if a error is returned is a rollback of all the already executed steps triggered. Nodes are executed concurrently to one another.

func (*Manager) Errors

func (manager *Manager) Errors() []transport.Error

Errors returns the available error objects within the given flow

func (*Manager) GetName

func (manager *Manager) GetName() string

GetName returns the name of the given flow manager

func (*Manager) NewStore

func (manager *Manager) NewStore() references.Store

NewStore constructs a new reference store for the given manager

func (*Manager) Revert

func (manager *Manager) Revert(executed Tracker, refs references.Store)

Revert reverts the executed nodes found inside the given tracker. All nodes that have not been executed will be ignored.

func (*Manager) Wait

func (manager *Manager) Wait()

Wait awaits till all calls and rollbacks are completed

type ManagerMiddleware

type ManagerMiddleware struct {
	BeforeDo       BeforeManager
	AfterDo        AfterManager
	BeforeRollback BeforeManager
	AfterRollback  AfterManager
}

ManagerMiddleware holds the available middleware options for a flow manager

type Node

type Node struct {
	NodeMiddleware
	Condition *Condition

	Name       string
	Previous   Nodes
	Functions  functions.Stack
	Call       Call
	Revert     Call
	DependsOn  specs.Dependencies
	References map[string]*specs.PropertyReference
	Next       Nodes
	OnError    specs.ErrorHandle
	// contains filtered or unexported fields
}

Node represents a collection of callers and rollbacks which could be executed parallel.

func FetchStarting

func FetchStarting(nodes []*Node) (result []*Node)

FetchStarting constructs the starting seeds for the given nodes

func NewNode

func NewNode(parent *broker.Context, node *specs.Node, opts ...NodeOption) *Node

NewNode constructs a new node for the given call. The service called inside the call endpoint is retrieved from the services collection. The call, codec and rollback are defined inside the node and used while processing requests.

func (*Node) Do

func (node *Node) Do(ctx context.Context, tracker Tracker, processes *Processes, refs references.Store)

Do executes the given node an calls the next nodes. If one of the nodes fails is the error marked and are the processes aborted.

func (*Node) Rollback

func (node *Node) Rollback(ctx context.Context, tracker Tracker, processes *Processes, refs references.Store)

Rollback executes the given node rollback an calls the previous nodes. If one of the nodes fails is the error marked but execution is not aborted.

func (*Node) Skip

func (node *Node) Skip(ctx context.Context, tracker Tracker)

Skip skips the given node and all it's dependencies and nested conditions

func (*Node) Walk

func (node *Node) Walk(result map[string]*Node, fn func(node *Node))

Walk iterates over all nodes and returns the lose ends nodes

type NodeArguments

type NodeArguments []NodeOption

NodeArguments represents a collection of node options

func (*NodeArguments) Set

func (arguments *NodeArguments) Set(option NodeOption)

Set sets the given option inside the given arguments

type NodeMiddleware

type NodeMiddleware struct {
	BeforeDo       BeforeNode
	AfterDo        AfterNode
	BeforeRollback BeforeNode
	AfterRollback  AfterNode
}

NodeMiddleware holds all the available

type NodeOption

type NodeOption func(*NodeOptions)

NodeOption is a wrapper function

func WithCall

func WithCall(call Call) NodeOption

WithCall sets the given call

func WithCondition

func WithCondition(condition *Condition) NodeOption

WithCondition sets the given condition

func WithFunctions

func WithFunctions(functions functions.Stack) NodeOption

WithFunctions sets the given functions stack

func WithNodeMiddleware

func WithNodeMiddleware(middleware NodeMiddleware) NodeOption

WithNodeMiddleware sets the given middleware

func WithRollback

func WithRollback(call Call) NodeOption

WithRollback sets the given call as rollback

type NodeOptions

type NodeOptions struct {
	// contains filtered or unexported fields
}

NodeOptions represent a set of options that could be set through option functions.

func NewNodeOptions

func NewNodeOptions(opts ...NodeOption) NodeOptions

NewNodeOptions constructs a new node options object and collects the options

type Nodes

type Nodes []*Node

Nodes represents a node collection

func (Nodes) Has

func (nodes Nodes) Has(name string) bool

Has checks whether the given node collection has a node with the given name inside

type OnError

type OnError struct {
	// contains filtered or unexported fields
}

OnError represents a error codec and metadata manager

func NewOnError

func NewOnError(stack functions.Stack, codec codec.Manager, metadata *metadata.Manager, err *specs.OnError) *OnError

NewOnError constructs a new error for the given codec and header manager

type Processes

type Processes struct {
	// contains filtered or unexported fields
}

Processes tracks processes

func NewProcesses

func NewProcesses(delta int) *Processes

NewProcesses constructs a new processes tracker. The given delta will be added to the wait group counter.

func (*Processes) Add

func (processes *Processes) Add(delta int)

Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.

func (*Processes) Done

func (processes *Processes) Done()

Done marks a given process as done

func (*Processes) Err

func (processes *Processes) Err() transport.Error

Err returns the thrown error if thrown

func (*Processes) Fatal

func (processes *Processes) Fatal(err transport.Error)

Fatal marks the given error and is returned on Err()

func (*Processes) Wait

func (processes *Processes) Wait()

Wait awaits till all processes are completed

type Request

type Request struct {
	Functions functions.Stack
	Codec     codec.Manager
	Metadata  *metadata.Manager
}

Request represents a codec and metadata manager

type Tracker

type Tracker interface {
	// Flow returns the flow name of the assigned tracker
	Flow() string
	// Mark marks the given node as called
	Mark(node *Node)
	// Skip marks the given node as marked and flag the given node as skipped
	Skip(node *Node)
	// Skipped returns a boolean representing whether the given node has been skipped
	Skipped(node *Node) bool
	// Reached checks whether the required dependencies counter have been reached
	Reached(node *Node, nodes int) bool
	// Met checks whether the given nodes have been called
	Met(nodes ...*Node) bool
	// Lock locks the given node
	Lock(node *Node)
	// Unlock unlocks the given node
	Unlock(node *Node)
}

Tracker represents a structure responsible of tracking nodes

func NewTracker

func NewTracker(flow string, nodes int) Tracker

NewTracker constructs a new tracker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL