Documentation ¶
Index ¶
- Variables
- type CatalogClient
- type CustomState
- type EnqueueOwner
- type EventRecorder
- type Executor
- type ExecutorInitializationParameters
- type ExecutorProperties
- type OutputsResolver
- type TaskContext
- type TaskExecutionID
- type TaskExecutorName
- type TaskOverrides
- type TaskPhase
- type TaskStatus
- type TaskType
- type VarName
- type WorkflowID
Constants ¶
This section is empty.
Variables ¶
var TaskStatusNotReady = TaskStatus{Phase: TaskPhaseNotReady}
var TaskStatusQueued = TaskStatus{Phase: TaskPhaseQueued}
var TaskStatusRunning = TaskStatus{Phase: TaskPhaseRunning}
var TaskStatusSucceeded = TaskStatus{Phase: TaskPhaseSucceeded}
var TaskStatusUndefined = TaskStatus{Phase: TaskPhaseUndefined}
var TaskStatusUnknown = TaskStatus{Phase: TaskPhaseUnknown}
Functions ¶
This section is empty.
Types ¶
type CatalogClient ¶
type CatalogClient interface { Get(ctx context.Context, task *core.TaskTemplate, inputPath storage.DataReference) (*core.LiteralMap, error) Put(ctx context.Context, task *core.TaskTemplate, execID *core.TaskExecutionIdentifier, inputPath storage.DataReference, outputPath storage.DataReference) error }
Defines the Catalog client interface exposed for plugins
type CustomState ¶
type CustomState = map[string]interface{}
Represents a free-form state that allows plugins to store custom information between invocations.
type EnqueueOwner ¶
type EnqueueOwner func(name types.NamespacedName) error
type EventRecorder ¶
type EventRecorder interface {
RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error
}
Defines the exposed interface for plugins to record task events. TODO: Add a link to explain how events are structred and linked together.
type Executor ¶
type Executor interface { // Gets a unique identifier for the executor. No two executors can have the same ID. GetID() TaskExecutorName // Gets optional properties about this executor. These properties are not task-specific. GetProperties() ExecutorProperties // Initializes the executor. The executor should not have any heavy initialization logic in its constructor and should // delay all initialization logic till this method is called. Initialize(ctx context.Context, param ExecutorInitializationParameters) error // Start the task with an initial state that could be empty and return the new state of the task once it started StartTask(ctx context.Context, taskCtx TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) ( status TaskStatus, err error) // ChecksTaskStatus is called every time client needs to know the latest status of a given task this specific // executor launched. It passes the same task context that was used when StartTask was called as well as the last // known state of the task. Note that there is no strict guarantee that the previous state is literally the last // status returned due to the nature of eventual consistency in the system. The system guarantees idempotency as long // as it's within kubernetes boundaries or if external services support idempotency. CheckTaskStatus(ctx context.Context, taskCtx TaskContext, task *core.TaskTemplate) (status TaskStatus, err error) // The engine will ensure kill task is called in abort scenarios. KillTask will not be called in case CheckTaskStatus // ever returned a terminal phase. KillTask(ctx context.Context, taskCtx TaskContext, reason string) error // ResolveOutputs is responsible for retrieving outputs variables from a task. For simple tasks, adding OutputsResolver // in the executor is enough to get a default implementation. ResolveOutputs(ctx context.Context, taskCtx TaskContext, outputVariables ...VarName) ( values map[VarName]*core.Literal, err error) }
Defines a task executor interface.
type ExecutorInitializationParameters ¶
type ExecutorInitializationParameters struct { CatalogClient CatalogClient EventRecorder EventRecorder DataStore *storage.DataStore EnqueueOwner EnqueueOwner OwnerKind string MetricsScope promutils.Scope }
Defines the all-optional initialization parameters passed to plugins.
type ExecutorProperties ¶
type ExecutorProperties struct { // If the executor needs to clean-up external resources that won't automatically be garbage-collected by the fact that // the containing-k8s object is being deleted, it should set this value to true. This ensures that the containing-k8s // object is not deleted until all executors of non-terminal phase tasks report success for KillTask calls. RequiresFinalizer bool // If set, the execution engine will not perform node-level task caching and retrieval. This can be useful for more // fine-grained executors that implement their own logic for caching. DisableNodeLevelCaching bool }
Defines optional properties for the executor.
type OutputsResolver ¶
type OutputsResolver struct {
// contains filtered or unexported fields
}
Provides a default implementation for ResolveOutputs method by reading 'outputs.pb' from task directory into a LiteralMap.
func NewOutputsResolver ¶
func NewOutputsResolver(store storage.ComposedProtobufStore) OutputsResolver
Creates a default outputs resolver that expects a LiteralMap to exist in the task's outputFile location.
func (OutputsResolver) ResolveOutputs ¶
func (r OutputsResolver) ResolveOutputs(ctx context.Context, taskCtx TaskContext, outputVariables ...VarName) ( values map[VarName]*core.Literal, err error)
type TaskContext ¶
type TaskContext interface { GetOwnerID() types.NamespacedName GetTaskExecutionID() TaskExecutionID GetDataDir() storage.DataReference GetInputsFile() storage.DataReference GetOutputsFile() storage.DataReference GetErrorFile() storage.DataReference GetNamespace() string GetOwnerReference() metaV1.OwnerReference GetOverrides() TaskOverrides GetLabels() map[string]string GetAnnotations() map[string]string GetCustomState() CustomState GetK8sServiceAccount() string GetPhase() TaskPhase GetPhaseVersion() uint32 }
TaskContext represents any execution information for a Task. It is used to communicate meta information about the execution or any previously stored information
type TaskExecutionID ¶
type TaskExecutionID interface { GetGeneratedName() string GetID() core.TaskExecutionIdentifier }
Simple Interface to expose the ExecutionID of the running Task
type TaskExecutorName ¶
type TaskExecutorName = string
type TaskOverrides ¶
type TaskOverrides interface { GetResources() *typesv1.ResourceRequirements GetConfig() *typesv1.ConfigMap }
Interface to expose any overrides that have been set for this task (like resource overrides etc)
type TaskPhase ¶
type TaskPhase int
const ( TaskPhaseQueued TaskPhase = iota TaskPhaseRunning TaskPhaseRetryableFailure TaskPhasePermanentFailure TaskPhaseSucceeded TaskPhaseUndefined TaskPhaseNotReady TaskPhaseUnknown )
NOTE: if we add a status here, we should make sure it converts correctly when reporting Task event See events_publisher.go
func (TaskPhase) IsPermanentFailure ¶
func (TaskPhase) IsRetryableFailure ¶
func (TaskPhase) IsTerminal ¶
type TaskStatus ¶
type TaskStatus struct { Phase TaskPhase PhaseVersion uint32 Err error State CustomState OccurredAt time.Time }
func TaskStatusNotReadyFailure ¶
func TaskStatusNotReadyFailure(err error) TaskStatus
This failure can be used to indicate that the task wasn't accepted due to resource quota or similar constraints.
func TaskStatusPermanentFailure ¶
func TaskStatusPermanentFailure(err error) TaskStatus
PermanentFailure should be used to signal that either
- The user wants to signal that the task has failed with something NON-RECOVERABLE
- The plugin writer wants to signal that the task has failed with NON-RECOVERABLE
Essentially a permanent failure will force the statemachine to shutdown and stop the task from being retried further, even if retries exist. If it is desirable to retry the task (a separate execution) then, use RetryableFailure
func TaskStatusRetryableFailure ¶
func TaskStatusRetryableFailure(err error) TaskStatus
This failure can be used to indicate that the task failed with an error that is most probably transient and if the task retries (retry strategy) permits, it is safe to retry this task again. The same task execution will not be retried, but a new task execution will be created.
func (TaskStatus) String ¶
func (t TaskStatus) String() string
func (TaskStatus) WithOccurredAt ¶
func (t TaskStatus) WithOccurredAt(time time.Time) TaskStatus
func (TaskStatus) WithPhaseVersion ¶
func (t TaskStatus) WithPhaseVersion(version uint32) TaskStatus
func (TaskStatus) WithState ¶
func (t TaskStatus) WithState(state CustomState) TaskStatus
type WorkflowID ¶
type WorkflowID = string