Documentation ¶
Index ¶
- Variables
- type CacheableNodeHandler
- type EventRecorder
- type HandlerFactory
- type Node
- type NodeExecutionContext
- type NodeExecutionContextBuilder
- type NodeExecutionMetadata
- type NodeExecutor
- type NodeHandler
- type NodePhase
- type NodeStateReader
- type NodeStateWriter
- type NodeStatus
- type SetupContext
- type TaskReader
Constants ¶
This section is empty.
Variables ¶
View Source
var NodeStatusComplete = NodeStatus{NodePhase: NodePhaseComplete}
View Source
var NodeStatusPending = NodeStatus{NodePhase: NodePhasePending}
View Source
var NodeStatusQueued = NodeStatus{NodePhase: NodePhaseQueued}
View Source
var NodeStatusRecovered = NodeStatus{NodePhase: NodePhaseRecovered}
View Source
var NodeStatusRunning = NodeStatus{NodePhase: NodePhaseRunning}
View Source
var NodeStatusSuccess = NodeStatus{NodePhase: NodePhaseSuccess}
View Source
var NodeStatusTimedOut = NodeStatus{NodePhase: NodePhaseTimedOut}
View Source
var NodeStatusUndefined = NodeStatus{NodePhase: NodePhaseUndefined}
Functions ¶
This section is empty.
Types ¶
type CacheableNodeHandler ¶
type CacheableNodeHandler interface { NodeHandler // GetCatalogKey returns the unique key for the node represented by the NodeExecutionContext GetCatalogKey(ctx context.Context, executionContext NodeExecutionContext) (catalog.Key, error) // IsCacheable returns two booleans representing if the node represented by the // NodeExecutionContext is cacheable and cache serializable respectively. IsCacheable(ctx context.Context, executionContext NodeExecutionContext) (bool, bool, error) }
CacheableNodeHandler is a node that supports caching
type EventRecorder ¶
type EventRecorder interface { events.TaskEventRecorder events.NodeEventRecorder }
type HandlerFactory ¶
type HandlerFactory interface { GetHandler(kind v1alpha1.NodeKind) (NodeHandler, error) Setup(ctx context.Context, executor Node, setup SetupContext) error }
type Node ¶
type Node interface { // This method is used specifically to set inputs for start node. This is because start node does not retrieve inputs // from predecessors, but the inputs are inputs to the workflow or inputs to the parent container (workflow) node. SetInputsForStartNode(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructureWithStartNode, nl executors.NodeLookup, inputs *core.LiteralMap) (NodeStatus, error) // This is the main entrypoint to execute a node. It recursively depth-first goes through all ready nodes and starts their execution // This returns either // - 1. It finds a blocking node (not ready, or running) // - 2. A node fails and hence the workflow will fail // - 3. The final/end node has completed and the workflow should be stopped RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) // This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them AbortHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error FinalizeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) error // This method should be used to initialize Node executor Initialize(ctx context.Context) error // GetNodeExecutionContextBuilder returns the current NodeExecutionContextBuilder GetNodeExecutionContextBuilder() NodeExecutionContextBuilder // WithNodeExecutionContextBuilder returns a new Node with the given NodeExecutionContextBuilder WithNodeExecutionContextBuilder(NodeExecutionContextBuilder) Node }
Core Node Executor that is used to execute a node. This is a recursive node executor and understands node dependencies
type NodeExecutionContext ¶
type NodeExecutionContext interface { // This path is never read by propeller, but allows using some container or prefix in a specific container for all output from tasks // Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is // available to the task at High Bandwidth (for example the base path of a sharded s3 bucket. // This with a prefix based sharded strategy, could improve the throughput from S3 manifold) RawOutputPrefix() storage.DataReference // Sharding strategy for the output data for this node execution. OutputShardSelector() ioutils.ShardSelector DataStore() *storage.DataStore InputReader() io.InputReader EventsRecorder() EventRecorder NodeID() v1alpha1.NodeID Node() v1alpha1.ExecutableNode CurrentAttempt() uint32 TaskReader() TaskReader NodeStateReader() NodeStateReader NodeStateWriter() NodeStateWriter NodeExecutionMetadata() NodeExecutionMetadata MaxDatasetSizeBytes() int64 EnqueueOwnerFunc() func() error ContextualNodeLookup() executors.NodeLookup ExecutionContext() executors.ExecutionContext // TODO We should not need to pass NodeStatus, we probably only need it for DataDir, which should actually be sent using an OutputWriter interface // Deprecated NodeStatus() v1alpha1.ExecutableNodeStatus }
type NodeExecutionContextBuilder ¶
type NodeExecutionContextBuilder interface { BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (NodeExecutionContext, error) }
NodeExecutionContextBuilder defines how a NodeExecutionContext is built
type NodeExecutionMetadata ¶
type NodeExecutionMetadata interface { GetOwnerID() types.NamespacedName GetNodeExecutionID() *core.NodeExecutionIdentifier GetNamespace() string GetOwnerReference() v1.OwnerReference GetLabels() map[string]string GetAnnotations() map[string]string GetK8sServiceAccount() string GetSecurityContext() core.SecurityContext IsInterruptible() bool GetInterruptibleFailureThreshold() int32 }
type NodeExecutor ¶
type NodeExecutor interface { HandleNode(ctx context.Context, dag executors.DAGStructure, nCtx NodeExecutionContext, h NodeHandler) (NodeStatus, error) Abort(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext, reason string, finalTransition bool) error Finalize(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext) error }
NodeExecutor defines the interface for handling a single Kozmo Node of any Node type.
type NodeHandler ¶
type NodeHandler interface { // Method to indicate that finalize is required for this handler FinalizeRequired() bool // Setup should be called, before invoking any other methods of this handler in a single thread context Setup(ctx context.Context, setupContext SetupContext) error // Core method that should handle this node Handle(ctx context.Context, executionContext NodeExecutionContext) (handler.Transition, error) // This method should be invoked to indicate the node needs to be aborted. Abort(ctx context.Context, executionContext NodeExecutionContext, reason string) error // This method is always called before completing the node, if FinalizeRequired returns true. // It is guaranteed that Handle -> (happens before) -> Finalize. Abort -> finalize may be repeated multiple times Finalize(ctx context.Context, executionContext NodeExecutionContext) error }
Interface that should be implemented for a node type.
type NodePhase ¶
type NodePhase int
p of the node
const ( // Indicates that the node is not yet ready to be executed and is pending any previous nodes completion NodePhasePending NodePhase = iota // Indicates that the node was queued and will start running soon NodePhaseQueued // Indicates that the payload associated with this node is being executed and is not yet done NodePhaseRunning // Indicates that the nodes payload has been successfully completed, but any downstream nodes from this node may not yet have completed // We could make Success = running, but this enables more granular control NodePhaseSuccess // Complete indicates successful completion of a node. For singular nodes (nodes that have only one execution) success = complete, but, the executor // will always signal completion NodePhaseComplete // Node failed in execution, either this node or anything in the downstream chain NodePhaseFailed // Internal error observed. This state should always be accompanied with an `error`. if not the behavior is undefined NodePhaseUndefined // Finalize node failing due to timeout NodePhaseTimingOut // Node failed because execution timed out NodePhaseTimedOut // Node recovered from a prior execution. NodePhaseRecovered )
type NodeStateReader ¶
type NodeStateReader interface { HasTaskNodeState() bool GetTaskNodeState() handler.TaskNodeState HasBranchNodeState() bool GetBranchNodeState() handler.BranchNodeState HasDynamicNodeState() bool GetDynamicNodeState() handler.DynamicNodeState HasWorkflowNodeState() bool GetWorkflowNodeState() handler.WorkflowNodeState HasGateNodeState() bool GetGateNodeState() handler.GateNodeState HasArrayNodeState() bool GetArrayNodeState() handler.ArrayNodeState }
type NodeStateWriter ¶
type NodeStateWriter interface { PutTaskNodeState(s handler.TaskNodeState) error PutBranchNode(s handler.BranchNodeState) error PutDynamicNodeState(s handler.DynamicNodeState) error PutWorkflowNodeState(s handler.WorkflowNodeState) error PutGateNodeState(s handler.GateNodeState) error PutArrayNodeState(s handler.ArrayNodeState) error ClearNodeStatus() }
type NodeStatus ¶
type NodeStatus struct { NodePhase NodePhase Err *core.ExecutionError }
Helper struct to allow passing of status between functions
func NodeStatusFailed ¶
func NodeStatusFailed(err *core.ExecutionError) NodeStatus
func (*NodeStatus) HasFailed ¶
func (n *NodeStatus) HasFailed() bool
func (*NodeStatus) HasTimedOut ¶
func (n *NodeStatus) HasTimedOut() bool
func (*NodeStatus) IsComplete ¶
func (n *NodeStatus) IsComplete() bool
func (*NodeStatus) PartiallyComplete ¶
func (n *NodeStatus) PartiallyComplete() bool
type SetupContext ¶
type TaskReader ¶
type TaskReader interface { Read(ctx context.Context) (*core.TaskTemplate, error) GetTaskType() v1alpha1.TaskType GetTaskID() *core.Identifier }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.