Documentation ¶
Index ¶
- Constants
- Variables
- func GetPipelineDefinition(pipeline *Pipeline) string
- type Aggregator
- type BlankOperation
- type Condition
- type Context
- func (context *Context) Del(key string) error
- func (context *Context) Get(key string) (interface{}, error)
- func (context *Context) GetBool(key string) bool
- func (context *Context) GetBytes(key string) []byte
- func (context *Context) GetInt(key string) int
- func (context *Context) GetNode() string
- func (context *Context) GetRequestId() string
- func (context *Context) GetString(key string) string
- func (context *Context) Set(key string, data interface{}) error
- type Dag
- func (this *Dag) AddEdge(from, to string) error
- func (this *Dag) AddVertex(id string, operations []Operation) *Node
- func (this *Dag) Append(dag *Dag) error
- func (this *Dag) GetEndNode() *Node
- func (this *Dag) GetInitialNode() *Node
- func (this *Dag) GetNode(id string) *Node
- func (this *Dag) GetNodes(dynamicOption string) []string
- func (this *Dag) GetParentNode() *Node
- func (this *Dag) HasBranch() bool
- func (this *Dag) HasEdge() bool
- func (this *Dag) IsExecutionFlow() bool
- func (this *Dag) Validate() error
- type DagExporter
- type DataStore
- type EventHandler
- type ForEach
- type Forwarder
- type Logger
- type Node
- func (this *Node) AddAggregator(aggregator Aggregator)
- func (this *Node) AddCondition(condition Condition)
- func (this *Node) AddConditionalDag(condition string, dag *Dag)
- func (this *Node) AddForEach(foreach ForEach)
- func (this *Node) AddForEachDag(subDag *Dag) error
- func (this *Node) AddForwarder(children string, forwarder Forwarder)
- func (this *Node) AddOperation(operation Operation)
- func (this *Node) AddSubAggregator(aggregator Aggregator)
- func (this *Node) AddSubDag(subDag *Dag) error
- func (this *Node) Children() []*Node
- func (this *Node) Dependency() []*Node
- func (this *Node) Dynamic() bool
- func (this *Node) DynamicIndegree() int
- func (this *Node) GetAggregator() Aggregator
- func (this *Node) GetAllConditionalDags() map[string]*Dag
- func (this *Node) GetCondition() Condition
- func (this *Node) GetConditionalDag(condition string) *Dag
- func (this *Node) GetForEach() ForEach
- func (this *Node) GetForwarder(children string) Forwarder
- func (this *Node) GetSubAggregator() Aggregator
- func (this *Node) GetUniqueId() string
- func (this *Node) Indegree() int
- func (this *Node) Operations() []Operation
- func (this *Node) Outdegree() int
- func (this *Node) ParentDag() *Dag
- func (this *Node) SubDag() *Dag
- type NodeExporter
- type Operation
- type OperationExporter
- type Pipeline
- func (pipeline *Pipeline) ApplyState(state string)
- func (pipeline *Pipeline) CountNodes() int
- func (pipeline *Pipeline) GetAllNodesUniqueId() []string
- func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag)
- func (pipeline *Pipeline) GetInitialNodeId() string
- func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string
- func (pipeline *Pipeline) GetState() string
- func (pipeline *Pipeline) SetDag(dag *Dag)
- func (pipeline *Pipeline) UpdatePipelineExecutionPosition(depthAdjustment int, vertex string)
- type PipelineErrorHandler
- type PipelineHandler
- type StateStore
Constants ¶
const ( // StateSuccess denotes success state StateSuccess = "success" // StateFailure denotes failure state StateFailure = "failure" // StateOngoing denotes ongoing state StateOngoing = "ongoing" )
const ( DEPTH_INCREMENT = 1 DEPTH_DECREMENT = -1 DEPTH_SAME = 0 )
Variables ¶
var ( // ERR_NO_VERTEX ERR_NO_VERTEX = fmt.Errorf("dag has no vertex set") // ERR_CYCLIC denotes that dag has a cycle ERR_CYCLIC = fmt.Errorf("dag has cyclic dependency") // ERR_DUPLICATE_EDGE denotes that a dag edge is duplicate ERR_DUPLICATE_EDGE = fmt.Errorf("edge redefined") // ERR_DUPLICATE_VERTEX denotes that a dag edge is duplicate ERR_DUPLICATE_VERTEX = fmt.Errorf("vertex redefined") // ERR_MULTIPLE_START denotes that a dag has more than one start point ERR_MULTIPLE_START = fmt.Errorf("only one start vertex is allowed") // ERR_RECURSIVE_DEP denotes that dag has a recursive dependecy ERR_RECURSIVE_DEP = fmt.Errorf("dag has recursive dependency") // Default forwarder DefaultForwarder = func(data []byte) []byte { return data } )
Functions ¶
func GetPipelineDefinition ¶
GetPipelineDefinition generate pipeline DAG defintion as a json
Types ¶
type Aggregator ¶
Aggregator definition for the data aggregator of nodes
type BlankOperation ¶
type BlankOperation struct { }
func (*BlankOperation) Encode ¶
func (ops *BlankOperation) Encode() []byte
func (*BlankOperation) Execute ¶
func (ops *BlankOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error)
func (*BlankOperation) GetId ¶
func (ops *BlankOperation) GetId() string
func (*BlankOperation) GetProperties ¶
func (ops *BlankOperation) GetProperties() map[string][]string
type Context ¶
type Context struct { Query url.Values // provides request Query State string // state of the request Name string // name of the faas-flow NodeInput map[string][]byte // stores inputs form each node // contains filtered or unexported fields }
Context execution context and execution state
func CreateContext ¶
CreateContext create request context (used by template)
func (*Context) GetRequestId ¶
GetRequestId returns the request id
type Dag ¶
type Dag struct { Id string // contains filtered or unexported fields }
Dag The whole dag
func (*Dag) AddEdge ¶
AddEdge add a directed edge as (from)->(to) If vertex doesn't exists creates them
func (*Dag) Append ¶
Append appends another dag into an existing dag Its a way to define and reuse subdags append causes disconnected dag which must be linked with edge in order to execute
func (*Dag) GetInitialNode ¶
GetInitialNode gets the initial node
func (*Dag) GetParentNode ¶
GetParentNode returns parent node for a subdag
func (*Dag) IsExecutionFlow ¶
IsExecutionFlow check if a dag doesn't use intermediate data
type DagExporter ¶
type DagExporter struct { Id string `json:"id"` StartNode string `json:"start-node"` EndNode string `json:"end-node"` HasBranch bool `json:"has-branch"` HasEdge bool `json:"has-edge"` ExecutionOnlyDag bool `json:"exec-only-dag"` Nodes map[string]*NodeExporter `json:"nodes"` IsValid bool `json:"is-valid"` ValidationError string `json:"validation-error,omitempty"` }
type DataStore ¶
type DataStore interface { // Configure the DaraStore with flow name and request ID Configure(flowName string, requestId string) // Initialize the DataStore (called only once in a request span) Init() error // Set store a value for key, in failure returns error Set(key string, value []byte) error // Get retrieves a value by key, if failure returns error Get(key string) ([]byte, error) // Del deletes a value by a key Del(key string) error // Cleanup all the resources in DataStore Cleanup() error //Copy a DataSoure CopyStore() (DataStore, error) }
DataStore for Storing Data
type EventHandler ¶
type EventHandler interface { // Configure the EventHandler with flow name and request ID Configure(flowName string, requestId string) // Initialize an EventHandler (called only once in a request span) Init() error //copy Store Copy() (EventHandler, error) // ReportRequestStart report a start of request ReportRequestStart(requestId string) // ReportRequestEnd reports an end of request ReportRequestEnd(requestId string) // ReportRequestFailure reports a failure of a request with error ReportRequestFailure(requestId string, err error) // ReportExecutionForward report that an execution is forwarded ReportExecutionForward(nodeId string, requestId string) // ReportExecutionContinuation report that an execution is being continued ReportExecutionContinuation(requestId string) // ReportNodeStart report a start of a Node execution ReportNodeStart(nodeId string, requestId string) // ReportNodeStart report an end of a node execution ReportNodeEnd(nodeId string, requestId string) // ReportNodeFailure report a Node execution failure with error ReportNodeFailure(nodeId string, requestId string, err error) // ReportOperationStart reports start of an operation ReportOperationStart(operationId string, nodeId string, requestId string) // ReportOperationEnd reports an end of an operation ReportOperationEnd(operationId string, nodeId string, requestId string) // ReportOperationFailure reports failure of an operation with error ReportOperationFailure(operationId string, nodeId string, requestId string, err error) // Flush flush the reports Flush() }
EventHandler handle flow events
type Logger ¶
type Logger interface { // Configure configure a logger with flowname and requestID Configure(flowName string, requestId string) // Init initialize a logger Init() error // Log logs a flow log Log(str string) }
Logger logs the flow logs
type Node ¶
type Node struct { Id string // The id of the vertex // contains filtered or unexported fields }
Node The vertex
func (*Node) AddAggregator ¶
func (this *Node) AddAggregator(aggregator Aggregator)
AddAggregator add a aggregator to a node
func (*Node) AddCondition ¶
AddCondition add a condition to a node
func (*Node) AddConditionalDag ¶
AddConditionalDag adds conditional dag to node
func (*Node) AddForEach ¶
AddForEach add a aggregator to a node
func (*Node) AddForEachDag ¶
AddForEachDag adds a foreach subdag to the node
func (*Node) AddForwarder ¶
AddForwarder adds a forwarder for a specific children
func (*Node) AddOperation ¶
AddOperation adds an operation
func (*Node) AddSubAggregator ¶
func (this *Node) AddSubAggregator(aggregator Aggregator)
AddSubAggregator add a foreach aggregator to a node
func (*Node) Dependency ¶
Dependency get all dependency node for a node
func (*Node) DynamicIndegree ¶
DynamicIndegree returns the no of dynamic input in a node
func (*Node) GetAggregator ¶
func (this *Node) GetAggregator() Aggregator
GetAggregator get a aggregator from a node
func (*Node) GetAllConditionalDags ¶
GetAllConditionalDags get all the subdags for all conditions
func (*Node) GetCondition ¶
GetCondition get the condition function
func (*Node) GetConditionalDag ¶
GetConditionalDag get the sundag for a specific condition
func (*Node) GetForEach ¶
GetForEach get the foreach function
func (*Node) GetForwarder ¶
GetForwarder gets a forwarder for a children
func (*Node) GetSubAggregator ¶
func (this *Node) GetSubAggregator() Aggregator
GetSubAggregator gets the subaggregator for condition and foreach
func (*Node) GetUniqueId ¶
GetUniqueId returns a unique ID of the node
func (*Node) Operations ¶
Value provides the ordered list of functions for a node
type NodeExporter ¶
type NodeExporter struct { Id string `json:"id"` Index int `json:"node-index"` UniqueId string `json:"unique-id"` // required to fetch intermediate data and state IsDynamic bool `json:"is-dynamic"` IsCondition bool `json:"is-condition"` IsForeach bool `json:"is-foreach"` HasAggregator bool `json:"has-aggregator"` HasSubAggregator bool `json:"has-sub-aggregator"` HasSubDag bool `json:"has-subdag"` InDegree int `json:"in-degree"` OutDegree int `json:"out-degree"` SubDag *DagExporter `json:"sub-dag,omitempty"` ForeachDag *DagExporter `json:"foreach-dag,omitempty"` ConditionalDags map[string]*DagExporter `json:"conditional-dags,omitempty"` DynamicExecOnly bool `json:"dynamic-exec-only"` Operations []*OperationExporter `json:"operations,omitempty"` Children []string `json:"childrens,omitempty"` ChildrenExecOnly map[string]bool `json:"child-exec-only"` }
type OperationExporter ¶
type Pipeline ¶
type Pipeline struct { Dag *Dag `json:"-"` // Dag that will be executed ExecutionPosition map[string]string `json:"pipeline-execution-position"` // Denotes the node that is executing now ExecutionDepth int `json:"pipeline-execution-depth"` // Denotes the depth of subgraph its executing CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id FailureHandler PipelineErrorHandler `json:"-"` Finally PipelineHandler `json:"-"` }
func (*Pipeline) ApplyState ¶
ApplyState apply a state to a pipeline by from encoded JSON pipeline
func (*Pipeline) CountNodes ¶
CountNodes counts the no of node added in the Pipeline Dag. It doesn't count subdags node
func (*Pipeline) GetAllNodesUniqueId ¶
GetAllNodesId returns a recursive list of all nodes that belongs to the pipeline
func (*Pipeline) GetCurrentNodeDag ¶
GetCurrentNodeDag returns the current node and current dag based on execution position
func (*Pipeline) GetInitialNodeId ¶
GetInitialNodeId Get the very first node of the pipeline
func (*Pipeline) GetNodeExecutionUniqueId ¶
GetNodeExecutionUniqueId provide a ID that is unique in an execution
func (*Pipeline) UpdatePipelineExecutionPosition ¶
UpdatePipelineExecutionPosition updates pipeline execution position specified depthAdjustment and vertex denotes how the ExecutionPosition must be altered
type PipelineErrorHandler ¶
PipelineErrorHandler the error handler OnFailure() registration on pipeline
type PipelineHandler ¶
type PipelineHandler func(string)
PipelineHandler definition for the Finally() registration on pipeline
type StateStore ¶
type StateStore interface { // Configure the StateStore with flow name and request ID Configure(flowName string, requestId string) // Initialize the StateStore (called only once in a request span) Init() error // Set a value (override existing, or create one) Set(key string, value string) error // Get a value Get(key string) (string, error) // Increase the value of key with a given increment Incr(key string, value int64) (int64, error) // Compare and Update a value Update(key string, oldValue string, newValue string) error // Cleanup all the resources in StateStore (called only once in a request span) Cleanup() error //copy Store CopyStore() (StateStore, error) }
StateStore for saving execution state