Documentation ¶
Overview ¶
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor. This executor is the starting point for executing any node in the workflow. Since Nodes in a workflow are composable, i.e., one node may contain other nodes, the Node Handler is recursive in nature. This executor handles the core logic for all nodes, but specific logic for handling different kinds of nodes is delegated to the respective node handlers
Available node handlers are
- Task: Arguably the most important handler as it handles all tasks. These include all plugins. The goal of the workflow is is to run tasks, thus every workflow will contain atleast one TaskNode (except for the case, where the workflow is purely a meta-workflow and can run other workflows
- SubWorkflow: This is one of the most important handlers. It can execute Workflows that are nested inside a workflow
- DynamicTask Handler: This is just a decorator on the Task Handler. It handles cases, in which the Task returns a futures file. Every Task is actually executed through the DynamicTaskHandler
- Branch Handler: This handler is used to execute branches
- Start & End Node handler: these are nominal handlers for the start and end node and do no really carry a lot of logic
Index ¶
- Constants
- func CreateAliasMap(aliases []v1alpha1.Alias) map[string]string
- func GetParentNodeMaxEndTime(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, ...) (t v1.Time, err error)
- func IsMaxParallelismAchieved(ctx context.Context, currentNode v1alpha1.ExecutableNode, ...) bool
- func IsTerminalNodePhase(p core.NodeExecution_Phase) bool
- func IsTerminalTaskPhase(p core.TaskExecution_Phase) bool
- func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, ...) (executors.Node, error)
- func ParseVarName(varName string) (idx *int, name string, err error)
- func Resolve(ctx context.Context, outputResolver OutputResolver, nl executors.NodeLookup, ...) (*core.LiteralMap, error)
- func ResolveBindingData(ctx context.Context, outputResolver OutputResolver, nl executors.NodeLookup, ...) (*core.Literal, error)
- func ToK8sTime(t time.Time) v1.Time
- func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase
- func ToNodeExecOutput(info *handler.OutputInfo) *event.NodeExecutionEvent_OutputUri
- func ToNodeExecTaskNodeMetadata(info *handler.TaskNodeInfo) *event.NodeExecutionEvent_TaskNodeMetadata
- func ToNodeExecWorkflowNodeMetadata(info *handler.WorkflowNodeInfo) *event.NodeExecutionEvent_WorkflowNodeMetadata
- func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, info handler.PhaseInfo, ...) (*event.NodeExecutionEvent, error)
- func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error)
- func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateManager, ...)
- type HandlerFactory
- type OutputResolver
- type PredicatePhase
- type VarName
Constants ¶
const NodeIDLabel = "node-id"
const NodeInterruptibleLabel = "interruptible"
const TaskNameLabel = "task-name"
Variables ¶
This section is empty.
Functions ¶
func CreateAliasMap ¶ added in v0.1.13
func GetParentNodeMaxEndTime ¶
func IsMaxParallelismAchieved ¶ added in v0.15.4
func IsMaxParallelismAchieved(ctx context.Context, currentNode v1alpha1.ExecutableNode, currentPhase v1alpha1.NodePhase, execContext executors.ExecutionContext) bool
IsMaxParallelismAchieved checks if we have already achieved max parallelism. It returns true, if the desired max parallelism value is achieved, false otherwise MaxParallelism is defined as the maximum number of TaskNodes and LaunchPlans (together) that can be executed concurrently by one workflow execution. A setting of `0` indicates that it is disabled.
func IsTerminalNodePhase ¶ added in v0.12.10
func IsTerminalNodePhase(p core.NodeExecution_Phase) bool
IsTerminalNodePhase returns true if node phase is one of the terminal phases, else false
func IsTerminalTaskPhase ¶ added in v0.12.10
func IsTerminalTaskPhase(p core.TaskExecution_Phase) bool
IsTerminalTaskPhase returns true if task phase is terminal, else false
func NewExecutor ¶
func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, scope promutils.Scope) (executors.Node, error)
func ParseVarName ¶ added in v0.1.13
Parses var names
func Resolve ¶
func Resolve(ctx context.Context, outputResolver OutputResolver, nl executors.NodeLookup, nodeID v1alpha1.NodeID, bindings []*v1alpha1.Binding) (*core.LiteralMap, error)
func ResolveBindingData ¶
func ResolveBindingData(ctx context.Context, outputResolver OutputResolver, nl executors.NodeLookup, bindingData *core.BindingData) (*core.Literal, error)
func ToNodeExecEventPhase ¶ added in v0.1.13
func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase
func ToNodeExecOutput ¶ added in v0.1.13
func ToNodeExecOutput(info *handler.OutputInfo) *event.NodeExecutionEvent_OutputUri
func ToNodeExecTaskNodeMetadata ¶ added in v0.7.0
func ToNodeExecTaskNodeMetadata(info *handler.TaskNodeInfo) *event.NodeExecutionEvent_TaskNodeMetadata
func ToNodeExecWorkflowNodeMetadata ¶ added in v0.7.0
func ToNodeExecWorkflowNodeMetadata(info *handler.WorkflowNodeInfo) *event.NodeExecutionEvent_WorkflowNodeMetadata
func ToNodeExecutionEvent ¶ added in v0.1.13
func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, info handler.PhaseInfo, inputPath string, status v1alpha1.ExecutableNodeStatus, eventVersion v1alpha1.EventVersion, parentInfo executors.ImmutableParentInfo, node v1alpha1.ExecutableNode, clusterID string) (*event.NodeExecutionEvent, error)
func UpdateNodeStatus ¶ added in v0.1.13
Types ¶
type HandlerFactory ¶
type HandlerFactory interface { GetHandler(kind v1alpha1.NodeKind) (handler.Node, error) Setup(ctx context.Context, setup handler.SetupContext) error }
func NewHandlerFactory ¶
func NewHandlerFactory(ctx context.Context, executor executors.Node, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, kubeClient executors.Client, client catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, scope promutils.Scope) (HandlerFactory, error)
type OutputResolver ¶ added in v0.1.13
type OutputResolver interface { // Extracts a subset of node outputs to literals. ExtractOutput(ctx context.Context, nl executors.NodeLookup, n v1alpha1.ExecutableNode, bindToVar VarName) (values *core.Literal, err error) }
func NewRemoteFileOutputResolver ¶ added in v0.1.13
func NewRemoteFileOutputResolver(store *storage.DataStore) OutputResolver
Creates a simple output resolver that expects an outputs.pb at the data directory of the node.
type PredicatePhase ¶
type PredicatePhase int
Special enum to indicate if the node under consideration is ready to be executed or should be skipped
const ( // Indicates node is not yet ready to be executed PredicatePhaseNotReady PredicatePhase = iota // Indicates node is ready to be executed - execution should proceed PredicatePhaseReady // Indicates that the node execution should be skipped as one of its parents was skipped or the branch was not taken PredicatePhaseSkip // Indicates failure during Predicate check PredicatePhaseUndefined )
func CanExecute ¶
func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.BaseNode) ( PredicatePhase, error)
CanExecute method informs the callee if the given node can begin execution. This is dependent on primarily that all nodes upstream to the given node are successful and the results are available.
func (PredicatePhase) String ¶ added in v0.1.13
func (p PredicatePhase) String() string