README ¶
Flow manager
Flow managers manage nodes within a given flow. Each node represents an action to be executed (ex: request, rollback). Nodes are executed concurrently from one another. Dependencies are based on references within the given node or if a node dependency is defined. A manager keeps track of all the processes being executed and tracks all the nodes which have been called. If an error is thrown inside one of the processes during the execution of the flow stopped and a rollback initiated.
Branches
Nodes are executed concurrently from one another. When a node is executed a check is performed to check whether the dependencies have been met. Only if all of the dependencies have been met is the node executed.
+------------+
| |
| Node +------------+
| | |
+------+-----+ |
| |
| |
+------v-----+ +------v-----+
| | | |
| Node | | Node |
| | | |
+------+-----+ +------+-----+
| |
| |
+------v-----+ |
| | |
| Node <------------+
| |
+------------+
Documentation ¶
Index ¶
- Variables
- func ConstructBranches(nodes []*Node)
- func ConstructDependency(node *Node, target string, nodes []*Node)
- func ExecuteFunctions(stack functions.Stack, store references.Store) error
- type AfterManager
- type AfterManagerHandler
- type AfterNode
- type AfterNodeHandler
- type BeforeManager
- type BeforeManagerHandler
- type BeforeNode
- type BeforeNodeHandler
- type Call
- type CallOptions
- type Condition
- type Expression
- type Manager
- func (manager *Manager) Do(ctx context.Context, refs references.Store) error
- func (manager *Manager) Errors() []transport.Error
- func (manager *Manager) GetName() string
- func (manager *Manager) NewStore() references.Store
- func (manager *Manager) Revert(executed Tracker, refs references.Store)
- func (manager *Manager) Wait()
- type ManagerMiddleware
- type Node
- func (node *Node) Do(ctx context.Context, tracker Tracker, processes *Processes, ...)
- func (node *Node) Rollback(ctx context.Context, tracker Tracker, processes *Processes, ...)
- func (node *Node) Skip(ctx context.Context, tracker Tracker)
- func (node *Node) Walk(result map[string]*Node, fn func(node *Node))
- type NodeArguments
- type NodeMiddleware
- type NodeOption
- type NodeOptions
- type Nodes
- type OnError
- type Processes
- type Request
- type Tracker
Constants ¶
This section is empty.
Variables ¶
var ErrAbortFlow = errors.New("abort flow")
ErrAbortFlow represents the error thrown when a flow has to be aborted
Functions ¶
func ConstructBranches ¶
func ConstructBranches(nodes []*Node)
ConstructBranches constructs the node branches based on the made references
func ConstructDependency ¶
ConstructDependency constructs a dependency for the given node
func ExecuteFunctions ¶
func ExecuteFunctions(stack functions.Stack, store references.Store) error
ExecuteFunctions executes the given functions and writes the results to the given store
Types ¶
type AfterManager ¶
type AfterManager func(ctx context.Context, manager *Manager, store references.Store) (context.Context, error)
AfterManager is called after a manager is called
type AfterManagerHandler ¶
type AfterManagerHandler func(AfterManager) AfterManager
AfterManagerHandler wraps the after call function to allow middleware to be chained
type AfterNode ¶
type AfterNode func(ctx context.Context, node *Node, tracker Tracker, processes *Processes, store references.Store) (context.Context, error)
AfterNode is called after a node is executed
type AfterNodeHandler ¶
AfterNodeHandler wraps the after node function to allow middleware to be chained
type BeforeManager ¶
type BeforeManager func(ctx context.Context, manager *Manager, store references.Store) (context.Context, error)
BeforeManager is called before a manager get's calles
type BeforeManagerHandler ¶
type BeforeManagerHandler func(BeforeManager) BeforeManager
BeforeManagerHandler wraps the before call function to allow middleware to be chained
type BeforeNode ¶
type BeforeNode func(ctx context.Context, node *Node, tracker Tracker, processes *Processes, store references.Store) (context.Context, error)
BeforeNode is called before a node is executed
type BeforeNodeHandler ¶
type BeforeNodeHandler func(BeforeNode) BeforeNode
BeforeNodeHandler wraps the before node function to allow middleware to be chained
type Call ¶
type Call interface {
Do(context.Context, references.Store) error
}
Call represents a transport caller implementation
type CallOptions ¶
type CallOptions struct { Transport transport.Call Method transport.Method Request *Request Response *Request Err *OnError ExpectedStatus []int }
CallOptions represents the available options that could be used to construct a new flow caller
type Condition ¶
type Condition struct {
// contains filtered or unexported fields
}
Condition represents a condition which could be evaluated and results in a boolean
func NewCondition ¶
func NewCondition(stack functions.Stack, expression Expression) *Condition
NewCondition constructs a new condition of the given functions stack and specs condition
type Expression ¶
type Expression interface { specs.Evaluable GetParameters() *specs.ParameterMap }
Expression represents expression that contains the list of parameters and can be evaluated
type Manager ¶
type Manager struct { BeforeDo BeforeManager BeforeRollback BeforeManager Name string Starting []*Node References int Nodes []*Node Ends int Error specs.ErrorHandle AfterFunctions functions.Stack AfterDo AfterManager AfterRollback AfterManager // contains filtered or unexported fields }
Manager is responsible for the handling of a flow and its steps
func NewManager ¶
func NewManager(parent *broker.Context, name string, nodes []*Node, err specs.ErrorHandle, after functions.Stack, middleware *ManagerMiddleware) *Manager
NewManager constructs a new manager for the given flow. Branches are constructed for the constructed nodes to optimalise performance. Various variables such as the amount of nodes, references and loose ends are collected to optimalise allocations during runtime.
func (*Manager) Do ¶
Do calls all the nodes inside the manager if a error is returned is a rollback of all the already executed steps triggered. Nodes are executed concurrently to one another.
func (*Manager) NewStore ¶
func (manager *Manager) NewStore() references.Store
NewStore constructs a new reference store for the given manager
type ManagerMiddleware ¶
type ManagerMiddleware struct { BeforeDo BeforeManager AfterDo AfterManager BeforeRollback BeforeManager AfterRollback AfterManager }
ManagerMiddleware holds the available middleware options for a flow manager
type Node ¶
type Node struct { NodeMiddleware Condition *Condition Name string Previous Nodes Functions functions.Stack Call Call Revert Call DependsOn specs.Dependencies References map[string]*specs.PropertyReference Next Nodes OnError specs.ErrorHandle // contains filtered or unexported fields }
Node represents a collection of callers and rollbacks which could be executed parallel.
func FetchStarting ¶
FetchStarting constructs the starting seeds for the given nodes
func NewNode ¶
NewNode constructs a new node for the given call. The service called inside the call endpoint is retrieved from the services collection. The call, codec and rollback are defined inside the node and used while processing requests.
func (*Node) Do ¶
func (node *Node) Do(ctx context.Context, tracker Tracker, processes *Processes, refs references.Store)
Do executes the given node an calls the next nodes. If one of the nodes fails is the error marked and are the processes aborted.
func (*Node) Rollback ¶
func (node *Node) Rollback(ctx context.Context, tracker Tracker, processes *Processes, refs references.Store)
Rollback executes the given node rollback an calls the previous nodes. If one of the nodes fails is the error marked but execution is not aborted.
type NodeArguments ¶
type NodeArguments []NodeOption
NodeArguments represents a collection of node options
func (*NodeArguments) Set ¶
func (arguments *NodeArguments) Set(option NodeOption)
Set sets the given option inside the given arguments
type NodeMiddleware ¶
type NodeMiddleware struct { BeforeDo BeforeNode AfterDo AfterNode BeforeRollback BeforeNode AfterRollback AfterNode }
NodeMiddleware holds all the available
type NodeOption ¶
type NodeOption func(*NodeOptions)
NodeOption is a wrapper function
func WithCondition ¶
func WithCondition(condition *Condition) NodeOption
WithCondition sets the given condition
func WithFunctions ¶
func WithFunctions(functions functions.Stack) NodeOption
WithFunctions sets the given functions stack
func WithNodeMiddleware ¶
func WithNodeMiddleware(middleware NodeMiddleware) NodeOption
WithNodeMiddleware sets the given middleware
func WithRollback ¶
func WithRollback(call Call) NodeOption
WithRollback sets the given call as rollback
type NodeOptions ¶
type NodeOptions struct {
// contains filtered or unexported fields
}
NodeOptions represent a set of options that could be set through option functions.
func NewNodeOptions ¶
func NewNodeOptions(opts ...NodeOption) NodeOptions
NewNodeOptions constructs a new node options object and collects the options
type OnError ¶
type OnError struct {
// contains filtered or unexported fields
}
OnError represents a error codec and metadata manager
type Processes ¶
type Processes struct {
// contains filtered or unexported fields
}
Processes tracks processes
func NewProcesses ¶
NewProcesses constructs a new processes tracker. The given delta will be added to the wait group counter.
func (*Processes) Add ¶
Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.
type Tracker ¶
type Tracker interface { // Flow returns the flow name of the assigned tracker Flow() string // Mark marks the given node as called Mark(node *Node) // Skip marks the given node as marked and flag the given node as skipped Skip(node *Node) // Skipped returns a boolean representing whether the given node has been skipped Skipped(node *Node) bool // Reached checks whether the required dependencies counter have been reached Reached(node *Node, nodes int) bool // Met checks whether the given nodes have been called Met(nodes ...*Node) bool // Lock locks the given node Lock(node *Node) // Unlock unlocks the given node Unlock(node *Node) }
Tracker represents a structure responsible of tracking nodes
func NewTracker ¶
NewTracker constructs a new tracker