Documentation ¶
Index ¶
- Constants
- func ComputePreviousCheckpointPath(ctx context.Context, length int, nCtx interfaces.NodeExecutionContext, ...) (storage.DataReference, error)
- func ComputeRawOutputPrefix(ctx context.Context, length int, nCtx interfaces.NodeExecutionContext, ...) (io.RawOutputPaths, string, error)
- func GetTaskExecutionIdentifier(nCtx interfaces.NodeExecutionContext) *core.TaskExecutionIdentifier
- func ToTaskEventPhase(p pluginCore.Phase) core.TaskExecution_Phase
- func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error)
- func ToTransitionType(ttype pluginCore.TransitionType) handler.TransitionType
- func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface, ...) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, ...)
- type CacheContents
- type CodecVersion
- type FutureFileReader
- func (f FutureFileReader) Cache(ctx context.Context, wf *v1alpha1.NebulaWorkflow, ...) error
- func (f FutureFileReader) CacheExists(ctx context.Context) (bool, error)
- func (f FutureFileReader) Exists(ctx context.Context) (bool, error)
- func (f FutureFileReader) GetLoc() storage.DataReference
- func (f FutureFileReader) Read(ctx context.Context) (*core.DynamicJobSpec, error)
- func (f FutureFileReader) RetrieveCache(ctx context.Context) (CacheContents, error)
- type Handler
- func (t Handler) Abort(ctx context.Context, nCtx interfaces.NodeExecutionContext, reason string) error
- func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext) error
- func (t *Handler) FinalizeRequired() bool
- func (t *Handler) GetCatalogKey(ctx context.Context, nCtx interfaces.NodeExecutionContext) (catalog.Key, error)
- func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContext) (handler.Transition, error)
- func (t *Handler) IsCacheable(ctx context.Context, nCtx interfaces.NodeExecutionContext) (bool, bool, error)
- func (t Handler) ResolvePlugin(ctx context.Context, ttype string, executionConfig v1alpha1.ExecutionConfig) (pluginCore.Plugin, error)
- func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error
- func (t *Handler) ValidateOutput(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, ...) (*io.ExecutionError, error)
- type MetricKey
- type PluginRegistryIface
- type RemoteFileWorkflowStore
- func (r RemoteFileWorkflowStore) Exists(ctx context.Context, path storage.DataReference) (bool, error)
- func (r RemoteFileWorkflowStore) GetCompiledWorkflow(ctx context.Context, source storage.DataReference) (*core.CompiledWorkflowClosure, error)
- func (r RemoteFileWorkflowStore) GetWorkflowCRD(ctx context.Context, source storage.DataReference) (*v1alpha1.NebulaWorkflow, error)
- func (r RemoteFileWorkflowStore) PutCompiledNebulaWorkflow(ctx context.Context, workflow *core.CompiledWorkflowClosure, ...) error
- func (r RemoteFileWorkflowStore) PutNebulaWorkflowCRD(ctx context.Context, wf *v1alpha1.NebulaWorkflow, target storage.DataReference) error
- type ToTaskExecutionEventInputs
Constants ¶
View Source
const IDMaxLength = 50
View Source
const MaxPluginStateSizeBytes = 256
TODO Configurable?
Variables ¶
This section is empty.
Functions ¶
func ComputePreviousCheckpointPath ¶
func ComputePreviousCheckpointPath(ctx context.Context, length int, nCtx interfaces.NodeExecutionContext, currentNodeUniqueID v1alpha1.NodeID, currentAttempt uint32) (storage.DataReference, error)
ComputePreviousCheckpointPath returns the checkpoint path for the previous attempt, if this is the first attempt then returns an empty path
func ComputeRawOutputPrefix ¶
func ComputeRawOutputPrefix(ctx context.Context, length int, nCtx interfaces.NodeExecutionContext, currentNodeUniqueID v1alpha1.NodeID, currentAttempt uint32) (io.RawOutputPaths, string, error)
ComputeRawOutputPrefix constructs the output directory, where raw outputs of a task can be stored by the task. NebulaPropeller may not have access to this location and can be passed in per execution. the function also returns the uniqueID generated
func GetTaskExecutionIdentifier ¶
func GetTaskExecutionIdentifier(nCtx interfaces.NodeExecutionContext) *core.TaskExecutionIdentifier
func ToTaskEventPhase ¶
func ToTaskEventPhase(p pluginCore.Phase) core.TaskExecution_Phase
func ToTaskExecutionEvent ¶
func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error)
func ToTransitionType ¶
func ToTransitionType(ttype pluginCore.TransitionType) handler.TransitionType
func WranglePluginsAndGenerateFinalList ¶
func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface, kubeClientset kubernetes.Interface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error)
Types ¶
type CacheContents ¶
type CacheContents struct { WorkflowCRD *v1alpha1.NebulaWorkflow CompiledWorkflow *core.CompiledWorkflowClosure }
type FutureFileReader ¶
type FutureFileReader struct { RemoteFileWorkflowStore // contains filtered or unexported fields }
func NewRemoteFutureFileReader ¶
func NewRemoteFutureFileReader(ctx context.Context, dataDir storage.DataReference, store *storage.DataStore) (FutureFileReader, error)
func (FutureFileReader) Cache ¶
func (f FutureFileReader) Cache(ctx context.Context, wf *v1alpha1.NebulaWorkflow, workflowClosure *core.CompiledWorkflowClosure) error
func (FutureFileReader) CacheExists ¶
func (f FutureFileReader) CacheExists(ctx context.Context) (bool, error)
func (FutureFileReader) Exists ¶
func (f FutureFileReader) Exists(ctx context.Context) (bool, error)
func (FutureFileReader) GetLoc ¶
func (f FutureFileReader) GetLoc() storage.DataReference
func (FutureFileReader) Read ¶
func (f FutureFileReader) Read(ctx context.Context) (*core.DynamicJobSpec, error)
func (FutureFileReader) RetrieveCache ¶
func (f FutureFileReader) RetrieveCache(ctx context.Context) (CacheContents, error)
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func New ¶
func New(ctx context.Context, kubeClient executors.Client, kubeClientset kubernetes.Interface, client catalog.Client, eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error)
func (Handler) Abort ¶
func (t Handler) Abort(ctx context.Context, nCtx interfaces.NodeExecutionContext, reason string) error
func (Handler) Finalize ¶
func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext) error
func (*Handler) FinalizeRequired ¶
func (*Handler) GetCatalogKey ¶
func (t *Handler) GetCatalogKey(ctx context.Context, nCtx interfaces.NodeExecutionContext) (catalog.Key, error)
func (Handler) Handle ¶
func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContext) (handler.Transition, error)
func (*Handler) IsCacheable ¶
func (t *Handler) IsCacheable(ctx context.Context, nCtx interfaces.NodeExecutionContext) (bool, bool, error)
func (Handler) ResolvePlugin ¶
func (t Handler) ResolvePlugin(ctx context.Context, ttype string, executionConfig v1alpha1.ExecutionConfig) (pluginCore.Plugin, error)
func (*Handler) Setup ¶
func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error
func (*Handler) ValidateOutput ¶
func (t *Handler) ValidateOutput(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, tr ioutils.SimpleTaskReader) (*io.ExecutionError, error)
type PluginRegistryIface ¶
type PluginRegistryIface interface { GetCorePlugins() []pluginCore.PluginEntry GetK8sPlugins() []pluginK8s.PluginEntry }
The plugin interface available especially for testing.
type RemoteFileWorkflowStore ¶
type RemoteFileWorkflowStore struct {
// contains filtered or unexported fields
}
func NewRemoteWorkflowStore ¶
func NewRemoteWorkflowStore(store *storage.DataStore) RemoteFileWorkflowStore
func (RemoteFileWorkflowStore) Exists ¶
func (r RemoteFileWorkflowStore) Exists(ctx context.Context, path storage.DataReference) (bool, error)
func (RemoteFileWorkflowStore) GetCompiledWorkflow ¶
func (r RemoteFileWorkflowStore) GetCompiledWorkflow(ctx context.Context, source storage.DataReference) (*core.CompiledWorkflowClosure, error)
func (RemoteFileWorkflowStore) GetWorkflowCRD ¶
func (r RemoteFileWorkflowStore) GetWorkflowCRD(ctx context.Context, source storage.DataReference) (*v1alpha1.NebulaWorkflow, error)
func (RemoteFileWorkflowStore) PutCompiledNebulaWorkflow ¶
func (r RemoteFileWorkflowStore) PutCompiledNebulaWorkflow(ctx context.Context, workflow *core.CompiledWorkflowClosure, target storage.DataReference) error
func (RemoteFileWorkflowStore) PutNebulaWorkflowCRD ¶
func (r RemoteFileWorkflowStore) PutNebulaWorkflowCRD(ctx context.Context, wf *v1alpha1.NebulaWorkflow, target storage.DataReference) error
type ToTaskExecutionEventInputs ¶
type ToTaskExecutionEventInputs struct { TaskExecContext pluginCore.TaskExecutionContext InputReader io.InputFilePaths Inputs *core.LiteralMap EventConfig *config.EventConfig OutputWriter io.OutputFilePaths Info pluginCore.PhaseInfo NodeExecutionMetadata interfaces.NodeExecutionMetadata ExecContext executors.ExecutionContext TaskType string PluginID string ResourcePoolInfo []*event.ResourcePoolInfo ClusterID string OccurredAt time.Time }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.