Documentation ¶
Index ¶
- Constants
- func AddObjectMetadata(taskCtx types.TaskContext, o K8sResource)
- func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements
- func BuildIdentityPod() *v1.Pod
- func BuildPodWithSpec(podSpec *v1.PodSpec) *v1.Pod
- func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, id types.TaskExecutionID) []v1.EnvVar
- func DemystifyPending(status v1.PodStatus) (types.TaskStatus, error)
- func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar
- func GetExecutionEnvVars(id types.TaskExecutionID) []v1.EnvVar
- func GetTolerationsForResources(resourceRequirements ...v1.ResourceRequirements) []v1.Toleration
- func Initialize(ctx context.Context, watchNamespace string, resyncPeriod time.Duration) (err error)
- func InitializeFake() client.Client
- func InjectCache(c cache.Cache) error
- func InjectClient(c client.Client) error
- func IsK8sObjectNotExists(err error) bool
- func RegisterResource(ctx context.Context, resourceToWatch runtime.Object, handler Handler) error
- func ToK8sContainer(ctx context.Context, taskCtx types.TaskContext, taskContainer *core.Container, ...) (*v1.Container, error)
- func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar
- func ToK8sPod(ctx context.Context, taskCtx types.TaskContext, taskContainer *core.Container, ...) (*v1.PodSpec, error)
- func UnionMaps(maps ...map[string]string) map[string]string
- type Handler
- type K8sResource
- type K8sResourceHandler
- type K8sTaskExecutor
- func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate) (types.TaskStatus, error)
- func (e *K8sTaskExecutor) ClearFinalizers(ctx context.Context, o K8sResource) error
- func (e *K8sTaskExecutor) GetID() types.TaskExecutorName
- func (e *K8sTaskExecutor) GetProperties() types.ExecutorProperties
- func (e K8sTaskExecutor) HandleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error)
- func (e *K8sTaskExecutor) Initialize(ctx context.Context, params types.ExecutorInitializationParameters) error
- func (e *K8sTaskExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext, reason string) error
- func (e *K8sTaskExecutor) StartTask(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, ...) (types.TaskStatus, error)
- type K8sTaskExecutorMetrics
Examples ¶
Constants ¶
const DefaultInformerResyncDuration = 30 * time.Second
const Gigabytes = 1024 * Megabytes
const Kilobytes = 1024 * 1
const MaxMetadataPayloadSizeBytes = 10 * Megabytes
const Megabytes = 1024 * Kilobytes
const PodKind = "pod"
const ResourceNvidiaGPU = "nvidia.com/gpu"
ResourceNvidiaGPU is the name of the Nvidia GPU resource. Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go
Variables ¶
This section is empty.
Functions ¶
func AddObjectMetadata ¶
func AddObjectMetadata(taskCtx types.TaskContext, o K8sResource)
func ApplyResourceOverrides ¶
func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements
func BuildIdentityPod ¶
func DecorateEnvVars ¶
func DemystifyPending ¶
func DemystifyPending(status v1.PodStatus) (types.TaskStatus, error)
Important considerations. Pending Status in Pod could be for various reasons and sometimes could signal a problem Case I: Pending because the Image pull is failing and it is backing off
This could be transient. So we can actually rely on the failure reason. The failure transitions from ErrImagePull -> ImagePullBackoff
Case II: Not enough resources are available. This is tricky. It could be that the total number of
resources requested is beyond the capability of the system. for this we will rely on configuration and hence input gates. We should not allow bad requests that request for large number of resource through. In the case it makes through, we will fail after timeout
func GetExecutionEnvVars ¶
func GetExecutionEnvVars(id types.TaskExecutionID) []v1.EnvVar
func GetTolerationsForResources ¶
func GetTolerationsForResources(resourceRequirements ...v1.ResourceRequirements) []v1.Toleration
func Initialize ¶
func InitializeFake ¶
func InjectCache ¶
func InjectClient ¶
func IsK8sObjectNotExists ¶
func RegisterResource ¶
func ToK8sContainer ¶
func ToK8sContainer(ctx context.Context, taskCtx types.TaskContext, taskContainer *core.Container, inputs *core.LiteralMap) (*v1.Container, error)
Returns a K8s Container for the execution
func ToK8sEnvVar ¶
func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar
Types ¶
type K8sResource ¶
type K8sResourceHandler ¶
type K8sResourceHandler interface { // Defines a func to create the full resource object that will be posted to k8s. BuildResource(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (K8sResource, error) // Defines a func to create a query object (typically just object and type meta portions) that's used to query k8s // resources. BuildIdentityResource(ctx context.Context, taskCtx types.TaskContext) (K8sResource, error) // Analyses the k8s resource and reports the status as TaskPhase. GetTaskStatus(ctx context.Context, taskCtx types.TaskContext, resource K8sResource) (types.TaskStatus, *events.TaskEventInfo, error) }
Defines an interface that deals with k8s resources. Combined with K8sTaskExecutor, this provides an easier and more consistent way to write TaskExecutors that create k8s resources.
type K8sTaskExecutor ¶
type K8sTaskExecutor struct { types.OutputsResolver // contains filtered or unexported fields }
A generic task executor for k8s-resource reliant tasks.
func NewK8sTaskExecutorForResource ¶
func NewK8sTaskExecutorForResource(id string, resourceToWatch runtime.Object, handler K8sResourceHandler, resyncPeriod time.Duration) *K8sTaskExecutor
Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources.
Example ¶
package main import ( "fmt" "time" "github.com/lyft/flyteplugins/go/tasks/v1/events" "github.com/lyft/flyteplugins/go/tasks/v1/flytek8s" "context" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteplugins/go/tasks/v1/types" k8sBatch "k8s.io/api/batch/v1" ) type k8sSampleHandler struct { } func (k8sSampleHandler) BuildResource(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (flytek8s.K8sResource, error) { panic("implement me") } func (k8sSampleHandler) BuildIdentityResource(ctx context.Context, taskCtx types.TaskContext) (flytek8s.K8sResource, error) { panic("implement me") } func (k8sSampleHandler) GetTaskStatus(ctx context.Context, taskCtx types.TaskContext, resource flytek8s.K8sResource) (types.TaskStatus, *events.TaskEventInfo, error) { panic("implement me") } func main() { exec := flytek8s.NewK8sTaskExecutorForResource("SampleHandler", &k8sBatch.Job{}, k8sSampleHandler{}, time.Second*1) fmt.Printf("Created executor: %v\n", exec.GetID()) }
Output: Created executor: SampleHandler
func (*K8sTaskExecutor) CheckTaskStatus ¶
func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate) ( types.TaskStatus, error)
func (*K8sTaskExecutor) ClearFinalizers ¶
func (e *K8sTaskExecutor) ClearFinalizers(ctx context.Context, o K8sResource) error
func (*K8sTaskExecutor) GetID ¶
func (e *K8sTaskExecutor) GetID() types.TaskExecutorName
func (*K8sTaskExecutor) GetProperties ¶
func (e *K8sTaskExecutor) GetProperties() types.ExecutorProperties
func (K8sTaskExecutor) HandleTaskSuccess ¶
func (e K8sTaskExecutor) HandleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error)
func (*K8sTaskExecutor) Initialize ¶
func (e *K8sTaskExecutor) Initialize(ctx context.Context, params types.ExecutorInitializationParameters) error
func (*K8sTaskExecutor) KillTask ¶
func (e *K8sTaskExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext, reason string) error
func (*K8sTaskExecutor) StartTask ¶
func (e *K8sTaskExecutor) StartTask(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) ( types.TaskStatus, error)