interfaces

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NodeStatusComplete = NodeStatus{NodePhase: NodePhaseComplete}
View Source
var NodeStatusPending = NodeStatus{NodePhase: NodePhasePending}
View Source
var NodeStatusQueued = NodeStatus{NodePhase: NodePhaseQueued}
View Source
var NodeStatusRecovered = NodeStatus{NodePhase: NodePhaseRecovered}
View Source
var NodeStatusRunning = NodeStatus{NodePhase: NodePhaseRunning}
View Source
var NodeStatusSuccess = NodeStatus{NodePhase: NodePhaseSuccess}
View Source
var NodeStatusTimedOut = NodeStatus{NodePhase: NodePhaseTimedOut}
View Source
var NodeStatusUndefined = NodeStatus{NodePhase: NodePhaseUndefined}

Functions

This section is empty.

Types

type CacheableNodeHandler

type CacheableNodeHandler interface {
	NodeHandler

	// GetCatalogKey returns the unique key for the node represented by the NodeExecutionContext
	GetCatalogKey(ctx context.Context, executionContext NodeExecutionContext) (catalog.Key, error)

	// IsCacheable returns two booleans representing if the node represented by the
	// NodeExecutionContext is cacheable and cache serializable respectively.
	IsCacheable(ctx context.Context, executionContext NodeExecutionContext) (bool, bool, error)
}

CacheableNodeHandler is a node that supports caching

type EventRecorder

type EventRecorder interface {
	events.TaskEventRecorder
	events.NodeEventRecorder
}

type HandlerFactory

type HandlerFactory interface {
	GetHandler(kind v1alpha1.NodeKind) (NodeHandler, error)
	Setup(ctx context.Context, executor Node, setup SetupContext) error
}

type Node

type Node interface {
	// This method is used specifically to set inputs for start node. This is because start node does not retrieve inputs
	// from predecessors, but the inputs are inputs to the workflow or inputs to the parent container (workflow) node.
	SetInputsForStartNode(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructureWithStartNode,
		nl executors.NodeLookup, inputs *core.LiteralMap) (NodeStatus, error)

	// This is the main entrypoint to execute a node. It recursively depth-first goes through all ready nodes and starts their execution
	// This returns either
	// - 1. It finds a blocking node (not ready, or running)
	// - 2. A node fails and hence the workflow will fail
	// - 3. The final/end node has completed and the workflow should be stopped
	RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
		nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error)

	// This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them
	AbortHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
		nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error

	FinalizeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure,
		nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) error

	// This method should be used to initialize Node executor
	Initialize(ctx context.Context) error

	// GetNodeExecutionContextBuilder returns the current NodeExecutionContextBuilder
	GetNodeExecutionContextBuilder() NodeExecutionContextBuilder

	// WithNodeExecutionContextBuilder returns a new Node with the given NodeExecutionContextBuilder
	WithNodeExecutionContextBuilder(NodeExecutionContextBuilder) Node
}

Core Node Executor that is used to execute a node. This is a recursive node executor and understands node dependencies

type NodeExecutionContext

type NodeExecutionContext interface {
	// This path is never read by propeller, but allows using some container or prefix in a specific container for all output from tasks
	// Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is
	// available to the task at High Bandwidth (for example the base path of a sharded s3 bucket.
	// This with a prefix based sharded strategy, could improve the throughput from S3 manifold)
	RawOutputPrefix() storage.DataReference

	// Sharding strategy for the output data for this node execution.
	OutputShardSelector() ioutils.ShardSelector

	DataStore() *storage.DataStore
	InputReader() io.InputReader
	EventsRecorder() EventRecorder
	NodeID() v1alpha1.NodeID
	Node() v1alpha1.ExecutableNode
	CurrentAttempt() uint32
	TaskReader() TaskReader

	NodeStateReader() NodeStateReader
	NodeStateWriter() NodeStateWriter

	NodeExecutionMetadata() NodeExecutionMetadata
	MaxDatasetSizeBytes() int64

	EnqueueOwnerFunc() func() error

	ContextualNodeLookup() executors.NodeLookup
	ExecutionContext() executors.ExecutionContext
	// TODO We should not need to pass NodeStatus, we probably only need it for DataDir, which should actually be sent using an OutputWriter interface
	// Deprecated
	NodeStatus() v1alpha1.ExecutableNodeStatus
}

type NodeExecutionContextBuilder

type NodeExecutionContextBuilder interface {
	BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext,
		nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (NodeExecutionContext, error)
}

NodeExecutionContextBuilder defines how a NodeExecutionContext is built

type NodeExecutionMetadata

type NodeExecutionMetadata interface {
	GetOwnerID() types.NamespacedName
	GetNodeExecutionID() *core.NodeExecutionIdentifier
	GetNamespace() string
	GetOwnerReference() v1.OwnerReference
	GetLabels() map[string]string
	GetAnnotations() map[string]string
	GetK8sServiceAccount() string
	GetSecurityContext() core.SecurityContext
	IsInterruptible() bool
	GetInterruptibleFailureThreshold() int32
}

type NodeExecutor

type NodeExecutor interface {
	HandleNode(ctx context.Context, dag executors.DAGStructure, nCtx NodeExecutionContext, h NodeHandler) (NodeStatus, error)
	Abort(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext, reason string, finalTransition bool) error
	Finalize(ctx context.Context, h NodeHandler, nCtx NodeExecutionContext) error
}

NodeExecutor defines the interface for handling a single Kozmo Node of any Node type.

type NodeHandler

type NodeHandler interface {
	// Method to indicate that finalize is required for this handler
	FinalizeRequired() bool

	// Setup should be called, before invoking any other methods of this handler in a single thread context
	Setup(ctx context.Context, setupContext SetupContext) error

	// Core method that should handle this node
	Handle(ctx context.Context, executionContext NodeExecutionContext) (handler.Transition, error)

	// This method should be invoked to indicate the node needs to be aborted.
	Abort(ctx context.Context, executionContext NodeExecutionContext, reason string) error

	// This method is always called before completing the node, if FinalizeRequired returns true.
	// It is guaranteed that Handle -> (happens before) -> Finalize. Abort -> finalize may be repeated multiple times
	Finalize(ctx context.Context, executionContext NodeExecutionContext) error
}

Interface that should be implemented for a node type.

type NodePhase

type NodePhase int

p of the node

const (
	// Indicates that the node is not yet ready to be executed and is pending any previous nodes completion
	NodePhasePending NodePhase = iota
	// Indicates that the node was queued and will start running soon
	NodePhaseQueued
	// Indicates that the payload associated with this node is being executed and is not yet done
	NodePhaseRunning
	// Indicates that the nodes payload has been successfully completed, but any downstream nodes from this node may not yet have completed
	// We could make Success = running, but this enables more granular control
	NodePhaseSuccess
	// Complete indicates successful completion of a node. For singular nodes (nodes that have only one execution) success = complete, but, the executor
	// will always signal completion
	NodePhaseComplete
	// Node failed in execution, either this node or anything in the downstream chain
	NodePhaseFailed
	// Internal error observed. This state should always be accompanied with an `error`. if not the behavior is undefined
	NodePhaseUndefined
	// Finalize node failing due to timeout
	NodePhaseTimingOut
	// Node failed because execution timed out
	NodePhaseTimedOut
	// Node recovered from a prior execution.
	NodePhaseRecovered
)

func (NodePhase) String

func (p NodePhase) String() string

type NodeStateReader

type NodeStateReader interface {
	HasTaskNodeState() bool
	GetTaskNodeState() handler.TaskNodeState
	HasBranchNodeState() bool
	GetBranchNodeState() handler.BranchNodeState
	HasDynamicNodeState() bool
	GetDynamicNodeState() handler.DynamicNodeState
	HasWorkflowNodeState() bool
	GetWorkflowNodeState() handler.WorkflowNodeState
	HasGateNodeState() bool
	GetGateNodeState() handler.GateNodeState
	HasArrayNodeState() bool
	GetArrayNodeState() handler.ArrayNodeState
}

type NodeStateWriter

type NodeStateWriter interface {
	PutTaskNodeState(s handler.TaskNodeState) error
	PutBranchNode(s handler.BranchNodeState) error
	PutDynamicNodeState(s handler.DynamicNodeState) error
	PutWorkflowNodeState(s handler.WorkflowNodeState) error
	PutGateNodeState(s handler.GateNodeState) error
	PutArrayNodeState(s handler.ArrayNodeState) error
	ClearNodeStatus()
}

type NodeStatus

type NodeStatus struct {
	NodePhase NodePhase
	Err       *core.ExecutionError
}

Helper struct to allow passing of status between functions

func NodeStatusFailed

func NodeStatusFailed(err *core.ExecutionError) NodeStatus

func (*NodeStatus) HasFailed

func (n *NodeStatus) HasFailed() bool

func (*NodeStatus) HasTimedOut

func (n *NodeStatus) HasTimedOut() bool

func (*NodeStatus) IsComplete

func (n *NodeStatus) IsComplete() bool

func (*NodeStatus) PartiallyComplete

func (n *NodeStatus) PartiallyComplete() bool

type SetupContext

type SetupContext interface {
	EnqueueOwner() func(string)
	OwnerKind() string
	MetricsScope() promutils.Scope
}

type TaskReader

type TaskReader interface {
	Read(ctx context.Context) (*core.TaskTemplate, error)
	GetTaskType() v1alpha1.TaskType
	GetTaskID() *core.Identifier
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL