k8s

package
v1.9.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2023 License: Apache-2.0 Imports: 43 Imported by: 2

Documentation

Index

Constants

View Source
const (
	ErrBuildPodTemplate       stdErrors.ErrorCode = "POD_TEMPLATE_FAILED"
	ErrReplaceCmdTemplate     stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED"
	FlyteK8sArrayIndexVarName string              = "FLYTE_K8S_ARRAY_INDEX"

	JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)

Variables

This section is empty.

Functions

func GetK8sClient

func GetK8sClient(config ClusterConfig) (client.Client, error)

func GetNewExecutorPlugin

func GetNewExecutorPlugin(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)

func IsResourceConfigSet

func IsResourceConfigSet(resourceConfig ResourceConfig) bool

func LaunchAndCheckSubTasksState

func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient,
	config *Config, dataStore *storage.DataStore, outputPrefix, baseOutputDataSandbox storage.DataReference, currentState *arrayCore.State) (
	newState *arrayCore.State, externalResources []*core.ExternalResource, err error)

LaunchAndCheckSubTasksState iterates over each subtask performing operations to transition them to a terminal state. This may include creating new k8s resources, monitoring existing k8s resources, retrying failed attempts, or declaring a permanent failure among others.

func NewKubeClientObj

func NewKubeClientObj(c client.Client) core.KubeClient

func RemoteClusterConfig

func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error)

RemoteClusterConfig reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. TODO: Move logic to flytestdlib

func TerminateSubTasks

func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config,
	terminateFunction func(context.Context, SubTaskExecutionContext, *Config, core.KubeClient) error, currentState *arrayCore.State) error

TerminateSubTasks performs operations to gracefully terminate all subtasks. This may include aborting and finalizing active k8s resources.

Types

type Auth

type Auth struct {
	Type      string `json:"type" pflag:", Authentication type"`
	TokenPath string `json:"tokenPath" pflag:", Token path"`
	CertPath  string `json:"certPath" pflag:", Certificate path"`
}

func (Auth) GetCA

func (auth Auth) GetCA() ([]byte, error)

func (Auth) GetToken

func (auth Auth) GetToken() (string, error)

type ClusterConfig

type ClusterConfig struct {
	Name     string `json:"name" pflag:",Friendly name of the remote cluster"`
	Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
	Auth     Auth   `json:"auth" pflag:"-, Auth setting for the cluster"`
	Enabled  bool   `json:"enabled" pflag:", Boolean flag to enable or disable"`
}

type Config

type Config struct {
	DefaultScheduler     string            `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."`
	MaxErrorStringLength int               `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."`
	MaxArrayJobSize      int64             `json:"maxArrayJobSize" pflag:",Maximum size of array job."`
	ResourceConfig       ResourceConfig    `json:"resourceConfig" pflag:"-,ResourceConfiguration to limit number of resources used by k8s-array."`
	RemoteClusterConfig  ClusterConfig     `json:"remoteClusterConfig" pflag:"-,Configuration of remote K8s cluster for array jobs"`
	NodeSelector         map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."`
	Tolerations          []v1.Toleration   `json:"tolerations"  pflag:"-,Tolerations to be applied for k8s-array pods"`
	NamespaceTemplate    string            `json:"namespaceTemplate"  pflag:"-,Namespace pattern to spawn array-jobs in. Defaults to parent namespace if not set"`
	OutputAssembler      workqueue.Config
	ErrorAssembler       workqueue.Config
	LogConfig            LogConfig `json:"logs" pflag:",Config for log links for k8s array jobs."`
}

Config defines custom config for K8s Array plugin

func GetConfig

func GetConfig() *Config

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(kubeClient core.KubeClient, cfg *Config, scope promutils.Scope) (Executor, error)

func (Executor) Abort

func (Executor) Finalize

func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error

func (Executor) GetID

func (e Executor) GetID() string

func (Executor) GetProperties

func (Executor) GetProperties() core.PluginProperties

func (Executor) Handle

func (Executor) Start

func (e Executor) Start(ctx context.Context) error

type KubeClientObj

type KubeClientObj struct {
	// contains filtered or unexported fields
}

func (KubeClientObj) GetCache

func (k KubeClientObj) GetCache() cache.Cache

func (KubeClientObj) GetClient

func (k KubeClientObj) GetClient() client.Client

type LogConfig

type LogConfig struct {
	Config logs.LogConfig `json:"config" pflag:",Defines the log config for k8s logs."`
}

type ResourceConfig

type ResourceConfig struct {
	PrimaryLabel string `json:"primaryLabel" pflag:",PrimaryLabel of a given service cluster"`
	Limit        int    `json:"limit" pflag:",Resource quota (in the number of outstanding requests) for the cluster"`
}

type SubTaskExecutionContext

type SubTaskExecutionContext struct {
	pluginsCore.TaskExecutionContext
	// contains filtered or unexported fields
}

SubTaskExecutionContext wraps the core TaskExecutionContext so that the k8s array task context can be used within the pod plugin

func NewSubTaskExecutionContext

func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, taskTemplate *core.TaskTemplate,
	executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionContext, error)

NewSubtaskExecutionContext constructs a SubTaskExecutionContext using the provided parameters

func (SubTaskExecutionContext) InputReader

func (s SubTaskExecutionContext) InputReader() io.InputReader

InputReader overrides the base TaskExecutionContext to return a custom InputReader

func (SubTaskExecutionContext) OutputWriter

func (s SubTaskExecutionContext) OutputWriter() io.OutputWriter

OutputWriter overrides the base TaskExecutionContext to return a custom OutputWriter

func (SubTaskExecutionContext) PluginStateReader

PluginStateReader overrides the default behavior to return a custom pluginStateReader.

func (SubTaskExecutionContext) TaskExecutionMetadata

func (s SubTaskExecutionContext) TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

TaskExecutionMetadata overrides the base TaskExecutionContext to return custom TaskExecutionMetadata

func (SubTaskExecutionContext) TaskReader

TaskReader overrides the base TaskExecutionContext to return a custom TaskReader

type SubTaskExecutionID

type SubTaskExecutionID struct {
	pluginsCore.TaskExecutionID
	// contains filtered or unexported fields
}

SubTaskExecutionID wraps the core TaskExecutionID to customize the generated pod name

func NewSubTaskExecutionID

func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex int, retryAttempt uint64) SubTaskExecutionID

NewSubtaskExecutionID constructs a SubTaskExecutionID using the provided parameters

func (SubTaskExecutionID) GetGeneratedName

func (s SubTaskExecutionID) GetGeneratedName() string

GetGeneratedName overrides the base TaskExecutionID to append the subtask index and retryAttempt

func (SubTaskExecutionID) GetLogSuffix

func (s SubTaskExecutionID) GetLogSuffix() string

GetLogSuffix returns the suffix which should be appended to subtask log names

func (SubTaskExecutionID) TemplateVarsByScheme

func (s SubTaskExecutionID) TemplateVarsByScheme() *tasklog.TemplateVarsByScheme

type SubTaskExecutionMetadata

type SubTaskExecutionMetadata struct {
	pluginsCore.TaskExecutionMetadata
	// contains filtered or unexported fields
}

SubTaskExecutionMetadata wraps the core TaskExecutionMetadata to customize the TaskExecutionID

func NewSubTaskExecutionMetadata

func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskTemplate *core.TaskTemplate,
	executionIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error)

NewSubtaskExecutionMetadata constructs a SubTaskExecutionMetadata using the provided parameters

func (SubTaskExecutionMetadata) GetAnnotations

func (s SubTaskExecutionMetadata) GetAnnotations() map[string]string

GetAnnotations overrides the base TaskExecutionMetadata to return a custom map

func (SubTaskExecutionMetadata) GetLabels

func (s SubTaskExecutionMetadata) GetLabels() map[string]string

GetLabels overrides the base TaskExecutionMetadata to return a custom map

func (SubTaskExecutionMetadata) GetTaskExecutionID

func (s SubTaskExecutionMetadata) GetTaskExecutionID() pluginsCore.TaskExecutionID

GetTaskExecutionID overrides the base TaskExecutionMetadata to return a custom TaskExecutionID

func (SubTaskExecutionMetadata) IsInterruptible

func (s SubTaskExecutionMetadata) IsInterruptible() bool

IsInterruptbile overrides the base NodeExecutionMetadata to return a subtask specific identifier

type SubTaskReader

type SubTaskReader struct {
	pluginsCore.TaskReader
	// contains filtered or unexported fields
}

SubTaskReader wraps the core TaskReader to customize the task template task type and version

func (SubTaskReader) Read

Read overrides the base TaskReader to return a custom TaskTemplate

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL