flytek8s

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2019 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const DefaultInformerResyncDuration = 30 * time.Second
View Source
const Gigabytes = 1024 * Megabytes
View Source
const Kilobytes = 1024 * 1
View Source
const MaxMetadataPayloadSizeBytes = 10 * Megabytes
View Source
const Megabytes = 1024 * Kilobytes
View Source
const PodKind = "pod"
View Source
const ResourceNvidiaGPU = "nvidia.com/gpu"

ResourceNvidiaGPU is the name of the Nvidia GPU resource. Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go

Variables

This section is empty.

Functions

func AddObjectMetadata

func AddObjectMetadata(taskCtx types.TaskContext, o K8sResource)

func ApplyResourceOverrides

func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements

func BuildIdentityPod

func BuildIdentityPod() *v1.Pod

func BuildPodWithSpec

func BuildPodWithSpec(podSpec *v1.PodSpec) *v1.Pod

func DecorateEnvVars

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, id types.TaskExecutionID) []v1.EnvVar

func DemystifyPending

func DemystifyPending(status v1.PodStatus) (types.TaskStatus, error)

Important considerations. Pending Status in Pod could be for various reasons and sometimes could signal a problem Case I: Pending because the Image pull is failing and it is backing off

This could be transient. So we can actually rely on the failure reason.
The failure transitions from ErrImagePull -> ImagePullBackoff

Case II: Not enough resources are available. This is tricky. It could be that the total number of

resources requested is beyond the capability of the system. for this we will rely on configuration
and hence input gates. We should not allow bad requests that request for large number of resource through.
In the case it makes through, we will fail after timeout

func GetContextEnvVars

func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar

func GetExecutionEnvVars

func GetExecutionEnvVars(id types.TaskExecutionID) []v1.EnvVar

func GetTolerationsForResources

func GetTolerationsForResources(resourceRequirements ...v1.ResourceRequirements) []v1.Toleration

func Initialize

func Initialize(ctx context.Context, watchNamespace string, resyncPeriod time.Duration) (err error)

func InitializeFake

func InitializeFake() client.Client

func InjectCache

func InjectCache(c cache.Cache) error

func InjectClient

func InjectClient(c client.Client) error

func IsK8sObjectNotExists

func IsK8sObjectNotExists(err error) bool

func RegisterResource

func RegisterResource(_ context.Context, resourceToWatch runtime.Object, handler Handler, metricsScope promutils.Scope) error

func ToK8sContainer

func ToK8sContainer(ctx context.Context, taskCtx types.TaskContext, taskContainer *core.Container, inputs *core.LiteralMap) (*v1.Container, error)

Returns a K8s Container for the execution

func ToK8sEnvVar

func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar

func ToK8sPod

func ToK8sPod(ctx context.Context, taskCtx types.TaskContext, taskContainer *core.Container, inputs *core.LiteralMap) (*v1.PodSpec, error)

func UnionMaps

func UnionMaps(maps ...map[string]string) map[string]string

This function unions a list of maps (each can be nil or populated) by allocating a new map. Conflicting keys will always defer to the later input map's corresponding value.

Types

type Handler

type Handler interface {
	Handle(context.Context, runtime.Object) error
}

type K8sObjectState added in v0.1.10

type K8sObjectState struct {
	Status        K8sObjectStatus `json:"s"`
	TerminalPhase types.TaskPhase `json:"tp"`
}

This status internal state of the object not read/updated by upstream components (eg. Node manager)

type K8sObjectStatus added in v0.1.10

type K8sObjectStatus int

This status internal state of the object not read/updated by upstream components (eg. Node manager)

func (K8sObjectStatus) String added in v0.1.10

func (q K8sObjectStatus) String() string

type K8sResource

type K8sResource interface {
	runtime.Object
	metav1.Object
	schema.ObjectKind
}

type K8sResourceHandler

type K8sResourceHandler interface {
	// Defines a func to create the full resource object that will be posted to k8s.
	BuildResource(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (K8sResource, error)

	// Defines a func to create a query object (typically just object and type meta portions) that's used to query k8s
	// resources.
	BuildIdentityResource(ctx context.Context, taskCtx types.TaskContext) (K8sResource, error)

	// Analyses the k8s resource and reports the status as TaskPhase.
	GetTaskStatus(ctx context.Context, taskCtx types.TaskContext, resource K8sResource) (types.TaskStatus, *events.TaskEventInfo, error)

	GetProperties() types.ExecutorProperties
}

Defines an interface that deals with k8s resources. Combined with K8sTaskExecutor, this provides an easier and more consistent way to write TaskExecutors that create k8s resources.

type K8sTaskExecutor

type K8sTaskExecutor struct {
	types.OutputsResolver
	// contains filtered or unexported fields
}

A generic task executor for k8s-resource reliant tasks.

func NewK8sTaskExecutorForResource

func NewK8sTaskExecutorForResource(id string, resourceToWatch runtime.Object, handler K8sResourceHandler,
	resyncPeriod time.Duration) *K8sTaskExecutor

Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources.

Example
package main

import (
	"fmt"
	"time"

	"github.com/lyft/flyteplugins/go/tasks/v1/events"
	"github.com/lyft/flyteplugins/go/tasks/v1/flytek8s"

	"context"

	"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
	"github.com/lyft/flyteplugins/go/tasks/v1/types"

	k8sBatch "k8s.io/api/batch/v1"
)

type k8sSampleHandler struct {
}

func (k8sSampleHandler) BuildResource(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (flytek8s.K8sResource, error) {
	panic("implement me")
}

func (k8sSampleHandler) BuildIdentityResource(ctx context.Context, taskCtx types.TaskContext) (flytek8s.K8sResource, error) {
	panic("implement me")
}

func (k8sSampleHandler) GetTaskStatus(ctx context.Context, taskCtx types.TaskContext, resource flytek8s.K8sResource) (types.TaskStatus, *events.TaskEventInfo, error) {
	panic("implement me")
}

func (k8sSampleHandler) GetProperties() types.ExecutorProperties {
	panic("implement me")
}

func main() {
	exec := flytek8s.NewK8sTaskExecutorForResource("SampleHandler", &k8sBatch.Job{}, k8sSampleHandler{}, time.Second*1)
	fmt.Printf("Created executor: %v\n", exec.GetID())

}
Output:

Created executor: SampleHandler

func (*K8sTaskExecutor) CheckTaskStatus

func (e *K8sTaskExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate) (
	types.TaskStatus, error)

func (*K8sTaskExecutor) ClearFinalizers

func (e *K8sTaskExecutor) ClearFinalizers(ctx context.Context, o K8sResource) error

func (*K8sTaskExecutor) GetID

func (*K8sTaskExecutor) GetProperties

func (e *K8sTaskExecutor) GetProperties() types.ExecutorProperties

func (K8sTaskExecutor) HandleTaskSuccess

func (e K8sTaskExecutor) HandleTaskSuccess(ctx context.Context, taskCtx types.TaskContext) (types.TaskStatus, error)

func (*K8sTaskExecutor) Initialize

func (*K8sTaskExecutor) KillTask

func (e *K8sTaskExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext, reason string) error

func (*K8sTaskExecutor) StartTask

func (e *K8sTaskExecutor) StartTask(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (
	types.TaskStatus, error)

type K8sTaskExecutorMetrics

type K8sTaskExecutorMetrics struct {
	Scope           promutils.Scope
	GetCacheMiss    labeled.StopWatch
	GetCacheHit     labeled.StopWatch
	GetAPILatency   labeled.StopWatch
	ResourceDeleted labeled.Counter
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL