Documentation ¶
Index ¶
- Constants
- Variables
- func GetK8sClient(config ClusterConfig) (client.Client, error)
- func GetNewExecutorPlugin(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)
- func IsResourceConfigSet(resourceConfig ResourceConfig) bool
- func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, ...) (newState *arrayCore.State, externalResources []*core.ExternalResource, ...)
- func NewKubeClientObj(c client.Client) core.KubeClient
- func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error)
- func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, ...) (*arrayCore.State, []*core.ExternalResource, error)
- func TerminateSubTasksOnAbort(ctx context.Context, tCtx core.TaskExecutionContext, ...) error
- type Auth
- type ClusterConfig
- type Config
- type Executor
- func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (e Executor) GetID() string
- func (Executor) GetProperties() core.PluginProperties
- func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
- func (e Executor) Start(ctx context.Context) error
- type KubeClientObj
- type LogConfig
- type ResourceConfig
- type SubTaskExecutionContext
- func (s SubTaskExecutionContext) InputReader() io.InputReader
- func (s SubTaskExecutionContext) OutputWriter() io.OutputWriter
- func (s SubTaskExecutionContext) PluginStateReader() pluginsCore.PluginStateReader
- func (s SubTaskExecutionContext) TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
- func (s SubTaskExecutionContext) TaskReader() pluginsCore.TaskReader
- type SubTaskExecutionID
- type SubTaskExecutionMetadata
- type SubTaskReader
Constants ¶
Variables ¶
var LogTemplateRegexes = struct { ExecutionIndex *regexp.Regexp ParentName *regexp.Regexp RetryAttempt *regexp.Regexp ParentRetryAttempt *regexp.Regexp }{ tasklog.MustCreateRegex("subtaskExecutionIndex"), tasklog.MustCreateRegex("subtaskParentName"), tasklog.MustCreateRegex("subtaskRetryAttempt"), tasklog.MustCreateRegex("subtaskParentRetryAttempt"), }
Functions ¶
func GetK8sClient ¶
func GetK8sClient(config ClusterConfig) (client.Client, error)
func GetNewExecutorPlugin ¶
func IsResourceConfigSet ¶
func IsResourceConfigSet(resourceConfig ResourceConfig) bool
func LaunchAndCheckSubTasksState ¶
func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config, dataStore *storage.DataStore, outputPrefix, baseOutputDataSandbox storage.DataReference, currentState *arrayCore.State) ( newState *arrayCore.State, externalResources []*core.ExternalResource, err error)
LaunchAndCheckSubTasksState iterates over each subtask performing operations to transition them to a terminal state. This may include creating new k8s resources, monitoring existing k8s resources, retrying failed attempts, or declaring a permanent failure among others.
func NewKubeClientObj ¶
func NewKubeClientObj(c client.Client) core.KubeClient
func RemoteClusterConfig ¶
func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error)
RemoteClusterConfig reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. TODO: Move logic to flytestdlib
func TerminateSubTasks ¶
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config, terminateFunction func(context.Context, SubTaskExecutionContext, *Config, core.KubeClient) error, currentState *arrayCore.State) (*arrayCore.State, []*core.ExternalResource, error)
TerminateSubTasks performs operations to gracefully terminate all subtasks. This may include aborting and finalizing active k8s resources.
func TerminateSubTasksOnAbort ¶ added in v1.10.7
func TerminateSubTasksOnAbort(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config, terminateFunction func(context.Context, SubTaskExecutionContext, *Config, core.KubeClient) error, currentState *arrayCore.State) error
Types ¶
type Auth ¶
type ClusterConfig ¶
type ClusterConfig struct { Name string `json:"name" pflag:",Friendly name of the remote cluster"` Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"` Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"` Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"` }
type Config ¶
type Config struct { DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."` MaxErrorStringLength int `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."` MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."` ResourceConfig ResourceConfig `json:"resourceConfig" pflag:"-,ResourceConfiguration to limit number of resources used by k8s-array."` RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"-,Configuration of remote K8s cluster for array jobs"` NodeSelector map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."` Tolerations []v1.Toleration `json:"tolerations" pflag:"-,Tolerations to be applied for k8s-array pods"` NamespaceTemplate string `json:"namespaceTemplate" pflag:"-,Namespace pattern to spawn array-jobs in. Defaults to parent namespace if not set"` OutputAssembler workqueue.Config ErrorAssembler workqueue.Config LogConfig LogConfig `json:"logs" pflag:",Config for log links for k8s array jobs."` }
Config defines custom config for K8s Array plugin
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func (Executor) GetProperties ¶
func (Executor) GetProperties() core.PluginProperties
func (Executor) Handle ¶
func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
type KubeClientObj ¶
type KubeClientObj struct {
// contains filtered or unexported fields
}
func (KubeClientObj) GetCache ¶
func (k KubeClientObj) GetCache() cache.Cache
func (KubeClientObj) GetClient ¶
func (k KubeClientObj) GetClient() client.Client
type ResourceConfig ¶
type SubTaskExecutionContext ¶
type SubTaskExecutionContext struct { pluginsCore.TaskExecutionContext // contains filtered or unexported fields }
SubTaskExecutionContext wraps the core TaskExecutionContext so that the k8s array task context can be used within the pod plugin
func NewSubTaskExecutionContext ¶
func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, taskTemplate *core.TaskTemplate, executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionContext, error)
NewSubtaskExecutionContext constructs a SubTaskExecutionContext using the provided parameters
func (SubTaskExecutionContext) InputReader ¶
func (s SubTaskExecutionContext) InputReader() io.InputReader
InputReader overrides the base TaskExecutionContext to return a custom InputReader
func (SubTaskExecutionContext) OutputWriter ¶
func (s SubTaskExecutionContext) OutputWriter() io.OutputWriter
OutputWriter overrides the base TaskExecutionContext to return a custom OutputWriter
func (SubTaskExecutionContext) PluginStateReader ¶
func (s SubTaskExecutionContext) PluginStateReader() pluginsCore.PluginStateReader
PluginStateReader overrides the default behavior to return a custom pluginStateReader.
func (SubTaskExecutionContext) TaskExecutionMetadata ¶
func (s SubTaskExecutionContext) TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
TaskExecutionMetadata overrides the base TaskExecutionContext to return custom TaskExecutionMetadata
func (SubTaskExecutionContext) TaskReader ¶
func (s SubTaskExecutionContext) TaskReader() pluginsCore.TaskReader
TaskReader overrides the base TaskExecutionContext to return a custom TaskReader
type SubTaskExecutionID ¶
type SubTaskExecutionID struct { pluginsCore.TaskExecutionID // contains filtered or unexported fields }
SubTaskExecutionID wraps the core TaskExecutionID to customize the generated pod name
func NewSubTaskExecutionID ¶
func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex int, retryAttempt uint64) SubTaskExecutionID
NewSubtaskExecutionID constructs a SubTaskExecutionID using the provided parameters
func (SubTaskExecutionID) GetGeneratedName ¶
func (s SubTaskExecutionID) GetGeneratedName() string
GetGeneratedName overrides the base TaskExecutionID to append the subtask index and retryAttempt
func (SubTaskExecutionID) GetLogSuffix ¶
func (s SubTaskExecutionID) GetLogSuffix() string
GetLogSuffix returns the suffix which should be appended to subtask log names
func (SubTaskExecutionID) TemplateVarsByScheme ¶
func (s SubTaskExecutionID) TemplateVarsByScheme() []tasklog.TemplateVar
type SubTaskExecutionMetadata ¶
type SubTaskExecutionMetadata struct { pluginsCore.TaskExecutionMetadata // contains filtered or unexported fields }
SubTaskExecutionMetadata wraps the core TaskExecutionMetadata to customize the TaskExecutionID
func NewSubTaskExecutionMetadata ¶
func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskTemplate *core.TaskTemplate, executionIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error)
NewSubtaskExecutionMetadata constructs a SubTaskExecutionMetadata using the provided parameters
func (SubTaskExecutionMetadata) GetAnnotations ¶
func (s SubTaskExecutionMetadata) GetAnnotations() map[string]string
GetAnnotations overrides the base TaskExecutionMetadata to return a custom map
func (SubTaskExecutionMetadata) GetLabels ¶
func (s SubTaskExecutionMetadata) GetLabels() map[string]string
GetLabels overrides the base TaskExecutionMetadata to return a custom map
func (SubTaskExecutionMetadata) GetTaskExecutionID ¶
func (s SubTaskExecutionMetadata) GetTaskExecutionID() pluginsCore.TaskExecutionID
GetTaskExecutionID overrides the base TaskExecutionMetadata to return a custom TaskExecutionID
func (SubTaskExecutionMetadata) IsInterruptible ¶
func (s SubTaskExecutionMetadata) IsInterruptible() bool
IsInterruptbile overrides the base NodeExecutionMetadata to return a subtask specific identifier
type SubTaskReader ¶
type SubTaskReader struct { pluginsCore.TaskReader // contains filtered or unexported fields }
SubTaskReader wraps the core TaskReader to customize the task template task type and version
func (SubTaskReader) Read ¶
func (s SubTaskReader) Read(ctx context.Context) (*core.TaskTemplate, error)
Read overrides the base TaskReader to return a custom TaskTemplate