jobframework

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: Apache-2.0 Imports: 62 Imported by: 2

Documentation

Index

Constants

View Source
const (
	ReasonStarted               = "Started"
	ReasonSuspended             = "Suspended"
	ReasonStopped               = "Stopped"
	ReasonCreatedWorkload       = "CreatedWorkload"
	ReasonDeletedWorkload       = "DeletedWorkload"
	ReasonUpdatedWorkload       = "UpdatedWorkload"
	ReasonFinishedWorkload      = "FinishedWorkload"
	ReasonErrWorkloadCompose    = "ErrWorkloadCompose"
	ReasonUpdatedAdmissionCheck = "UpdatedAdmissionCheck"
)

JobReconciler event reason list

View Source
const (
	FailedToStartFinishedReason = "FailedToStart"
)

Variables

View Source
var (
	ErrUnknownWorkloadOwner     = errors.New("workload owner is unknown")
	ErrWorkloadOwnerNotFound    = errors.New("workload owner not found")
	ErrNoMatchingWorkloads      = errors.New("no matching workloads")
	ErrExtraWorkloads           = errors.New("extra workloads")
	ErrPrebuildWorkloadNotFound = errors.New("prebuild workload not found")
)

Functions

func ApplyDefaultForSuspend

func ApplyDefaultForSuspend(ctx context.Context, job GenericJob, k8sClient client.Client,
	manageJobsWithoutQueueName bool, managedJobsNamespaceSelector labels.Selector) error

func ApplyDefaultLocalQueue added in v0.10.0

func ApplyDefaultLocalQueue(jobObj client.Object, defaultQueueExist func(string) bool)

func BaseWebhookFactory added in v0.9.0

func BaseWebhookFactory(job GenericJob, fromObject func(runtime.Object) GenericJob) func(ctrl.Manager, ...Option) error

func EnableIntegration added in v0.8.0

func EnableIntegration(name string)

EnableIntegration marks the integration identified by name as enabled.

func EnableIntegrationsForTest added in v0.8.0

func EnableIntegrationsForTest(tb testing.TB, names ...string) func()

EnableIntegrationsForTest - should be used only in tests Mark the frameworks identified by names and return a revert function.

func FindMatchingWorkloads added in v0.6.0

func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob) (match *kueue.Workload, toDelete []*kueue.Workload, err error)

func ForEachIntegration added in v0.4.0

func ForEachIntegration(f func(name string, cb IntegrationCallbacks) error) error

ForEachIntegration loops through the registered list of frameworks calling f, if at any point f returns an error the loop is stopped and that error is returned.

func GetEmptyOwnerObject added in v0.4.0

func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object

GetEmptyOwnerObject returns an empty object of the owner's type, returns nil if the owner is not manageable by kueue.

func GetIntegrationsList added in v0.4.0

func GetIntegrationsList() []string

GetIntegrationsList returns the list of currently registered frameworks.

func GetMultiKueueAdapters added in v0.8.0

func GetMultiKueueAdapters(enabledIntegrations sets.Set[string]) (map[string]MultiKueueAdapter, error)

GetMultiKueueAdapters returns the map containing the MultiKueue adapters for the registered and enabled integrations. An error is returned if more then one adapter is registers for one object type.

func GetOwnerKey added in v0.6.0

func GetOwnerKey(ownerGVK schema.GroupVersionKind) string

func GetPodSetsInfoFromWorkload added in v0.6.0

func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo

GetPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the provided workload's spec

func GetWorkloadNameForOwnerWithGVK

func GetWorkloadNameForOwnerWithGVK(ownerName string, ownerUID types.UID, ownerGVK schema.GroupVersionKind) string

func IsOwnerIntegrationEnabled added in v0.10.0

func IsOwnerIntegrationEnabled(owner *metav1.OwnerReference) bool

IsOwnerIntegrationEnabled returns true if the provided owner is managed by an enabled integration.

func IsOwnerManagedByKueue added in v0.4.0

func IsOwnerManagedByKueue(owner *metav1.OwnerReference) bool

IsOwnerManagedByKueue returns true if the provided owner can be managed by kueue.

func IsUnretryableError added in v0.6.0

func IsUnretryableError(e error) bool

func MaximumExecutionTimeSeconds added in v0.9.0

func MaximumExecutionTimeSeconds(job GenericJob) *int32

func PodSetTopologyRequest added in v0.9.0

func PodSetTopologyRequest(meta *metav1.ObjectMeta, podIndexLabel *string, subGroupIndexLabel *string, subGroupCount *int32) *kueue.PodSetTopologyRequest

func PrebuiltWorkloadFor added in v0.6.0

func PrebuiltWorkloadFor(job GenericJob) (string, bool)

func QueueName

func QueueName(job GenericJob) string

func QueueNameForObject added in v0.3.2

func QueueNameForObject(object client.Object) string

func RegisterExternalJobType added in v0.7.0

func RegisterExternalJobType(kindArg string) error

RegisterExternalJobType registers a new externally-managed Kind, returns an error if kindArg cannot be parsed as a Kind.version.group.

func RegisterIntegration added in v0.4.0

func RegisterIntegration(name string, cb IntegrationCallbacks) error

RegisterIntegration registers a new framework, returns an error when attempting to register multiple frameworks with the same name or if a mandatory callback is missing.

func SetupControllers added in v0.6.0

func SetupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error

SetupControllers setups all controllers and webhooks for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup controllers and webhooks for the in-house custom jobs.

Note that the first argument, "mgr" must be initialized on the outside of this function. In addition, if the manager uses the kueue's internal cert management for the webhooks, this function needs to be called after the certs get ready because the controllers won't work until the webhooks are operating, and the webhook won't work until the certs are all in place.

func SetupIndexes added in v0.6.0

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error

SetupIndexes setups the indexers for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup indexers for the in-house custom jobs.

Note that the second argument, "indexer" needs to be the fieldIndexer obtained from the Manager.

func SetupWorkloadOwnerIndex

func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error

func UnretryableError added in v0.6.0

func UnretryableError(msg string) error

UnretryableError is an error that doesn't require reconcile retry and will not be returned by the JobReconciler.

func ValidateAnnotationAsCRDName

func ValidateAnnotationAsCRDName(obj client.Object, crdNameAnnotation string) field.ErrorList

func ValidateJobOnCreate added in v0.7.0

func ValidateJobOnCreate(job GenericJob) field.ErrorList

ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation

func ValidateJobOnUpdate added in v0.7.0

func ValidateJobOnUpdate(oldJob, newJob GenericJob) field.ErrorList

ValidateJobOnUpdate encapsulates all GenericJob validations that must be performed on a Update operation

func ValidateLabelAsCRDName added in v0.6.0

func ValidateLabelAsCRDName(obj client.Object, crdNameLabel string) field.ErrorList

func ValidateQueueName added in v0.9.2

func ValidateQueueName(obj client.Object) field.ErrorList

func ValidateTASPodSetRequest added in v0.9.0

func ValidateTASPodSetRequest(replicaPath *field.Path, replicaMetadata *metav1.ObjectMeta) field.ErrorList

func WorkloadShouldBeSuspended added in v0.10.0

func WorkloadShouldBeSuspended(ctx context.Context, jobObj client.Object, k8sClient client.Client,
	manageJobsWithoutQueueName bool, managedJobsNamespaceSelector labels.Selector) (bool, error)

WorkloadShouldBeSuspended determines whether jobObj should be default suspended on creation

Types

type BaseWebhook added in v0.8.2

type BaseWebhook struct {
	Client                       client.Client
	ManageJobsWithoutQueueName   bool
	ManagedJobsNamespaceSelector labels.Selector
	FromObject                   func(runtime.Object) GenericJob
	Queues                       *queue.Manager
}

BaseWebhook applies basic defaulting and validation for jobs.

func (*BaseWebhook) Default added in v0.8.2

func (w *BaseWebhook) Default(ctx context.Context, obj runtime.Object) error

Default implements webhook.CustomDefaulter so a webhook will be registered for the type

func (*BaseWebhook) ValidateCreate added in v0.8.2

func (w *BaseWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error)

ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type

func (*BaseWebhook) ValidateDelete added in v0.8.2

ValidateDelete implements webhook.CustomValidator so a webhook will be registered for the type

func (*BaseWebhook) ValidateUpdate added in v0.8.2

func (w *BaseWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error)

ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type

type ComposableJob added in v0.6.0

type ComposableJob interface {
	// Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed.
	Load(ctx context.Context, c client.Client, key *types.NamespacedName) (removeFinalizers bool, err error)
	// Run unsuspends all members of the ComposableJob and injects the node affinity with podSet
	// counts extracting from workload to all members of the ComposableJob.
	Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r record.EventRecorder, msg string) error
	// ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob.
	ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error)
	// ListChildWorkloads returns all workloads related to the composable job.
	ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error)
	// FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted.
	FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
	// Stop implements the custom stop procedure for ComposableJob.
	Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error)
	// ForEach calls f on each member of the ComposableJob.
	ForEach(f func(obj runtime.Object))
}

ComposableJob interface should be implemented by generic jobs that are composed out of multiple API objects.

type GenericJob

type GenericJob interface {
	// Object returns the job instance.
	Object() client.Object
	// IsSuspended returns whether the job is suspended or not.
	IsSuspended() bool
	// Suspend will suspend the job.
	Suspend()
	// RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it.
	RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error
	// RestorePodSetsInfo will restore the original node affinity and podSet counts of the job.
	// Returns whether any change was done.
	RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool
	// Finished means whether the job is completed/failed or not,
	// condition represents the workload finished condition.
	// Observed generation of the workload is set by the jobframework.
	Finished() (message string, success, finished bool)
	// PodSets will build workload podSets corresponding to the job.
	PodSets() []kueue.PodSet
	// IsActive returns true if there are any running pods.
	IsActive() bool
	// PodsReady instructs whether job derived pods are all ready now.
	PodsReady() bool
	// GVK returns GVK (Group Version Kind) for the job.
	GVK() schema.GroupVersionKind
}

GenericJob if the interface which needs to be implemented by all jobs managed by the kueue's jobframework.

type IntegrationCallbacks added in v0.4.0

type IntegrationCallbacks struct {
	// NewJob creates a new instance of job
	NewJob func() GenericJob
	// GVK holds the schema information for the job
	// (this callback is optional)
	GVK schema.GroupVersionKind
	// NewReconciler creates a new reconciler
	NewReconciler ReconcilerFactory
	// SetupWebhook sets up the framework's webhook with the controllers manager
	SetupWebhook func(mgr ctrl.Manager, opts ...Option) error
	// JobType holds an object of the type managed by the integration's webhook
	JobType runtime.Object
	// SetupIndexes registers any additional indexes with the controllers manager
	// (this callback is optional)
	SetupIndexes func(ctx context.Context, indexer client.FieldIndexer) error
	// AddToScheme adds any additional types to the controllers manager's scheme
	// (this callback is optional)
	AddToScheme func(s *runtime.Scheme) error
	// Returns true if the provided owner reference identifies an object
	// managed by this integration
	// (this callback is optional)
	IsManagingObjectsOwner func(ref *metav1.OwnerReference) bool
	// CanSupportIntegration returns true if the integration meets any additional condition
	// like the Kubernetes version.
	CanSupportIntegration func(opts ...Option) (bool, error)
	// The job's MultiKueue adapter (optional)
	MultiKueueAdapter MultiKueueAdapter
	// The list of integration that need to be enabled along with the current one.
	DependencyList []string
}

IntegrationCallbacks groups a set of callbacks used to integrate a new framework.

func GetIntegration added in v0.4.0

func GetIntegration(name string) (IntegrationCallbacks, bool)

GetIntegration looks-up the framework identified by name in the currently registered list of frameworks returning its callbacks and true if found.

func GetIntegrationByGVK added in v0.9.0

func GetIntegrationByGVK(gvk schema.GroupVersionKind) (IntegrationCallbacks, bool)

GetIntegrationByGVK looks-up the framework identified by GroupVersionKind in the currently registered list of frameworks returning its callbacks and true if found.

type JobReconciler

type JobReconciler struct {
	// contains filtered or unexported fields
}

JobReconciler reconciles a GenericJob object

func NewReconciler

func NewReconciler(
	client client.Client,
	record record.EventRecorder,
	opts ...Option) *JobReconciler

func (*JobReconciler) IsParentJobManaged added in v0.4.0

func (r *JobReconciler) IsParentJobManaged(ctx context.Context, jobObj client.Object, namespace string) (bool, error)

IsParentJobManaged checks whether the parent job is managed by kueue.

func (*JobReconciler) ReconcileGenericJob

func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error)

type JobReconcilerInterface added in v0.4.0

type JobReconcilerInterface interface {
	reconcile.Reconciler
	SetupWithManager(mgr ctrl.Manager) error
}

type JobWithCustomStop added in v0.4.0

type JobWithCustomStop interface {
	// Stop implements a custom stop procedure.
	// The function should be idempotent: not do any API calls if the job is already stopped.
	// Returns whether the Job stopped with this call or an error
	Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) (bool, error)
}

type JobWithCustomValidation added in v0.9.1

type JobWithCustomValidation interface {
	// ValidateOnCreate returns list of webhook create validation errors.
	ValidateOnCreate() field.ErrorList
	// ValidateOnUpdate returns list of webhook update validation errors.
	ValidateOnUpdate(oldJob GenericJob) field.ErrorList
}

JobWithCustomValidation optional interface that allows custom webhook validation for Jobs that use BaseWebhook.

type JobWithCustomWorkloadConditions added in v0.7.0

type JobWithCustomWorkloadConditions interface {
	// CustomWorkloadConditions return custom workload conditions and status changed or not.
	CustomWorkloadConditions(wl *kueue.Workload) ([]metav1.Condition, bool)
}

JobWithCustomWorkloadConditions interface should be implemented by generic jobs, when custom workload conditions should be updated after ensure that the workload exists.

type JobWithFinalize added in v0.5.0

type JobWithFinalize interface {
	Finalize(ctx context.Context, c client.Client) error
}

JobWithFinalize interface should be implemented by generic jobs, when custom finalization logic is needed for a job, after it's finished.

type JobWithPodLabelSelector added in v0.9.0

type JobWithPodLabelSelector interface {
	// PodLabelSelector returns the label selector used by pods for the job.
	PodLabelSelector() string
}

type JobWithPriorityClass added in v0.4.0

type JobWithPriorityClass interface {
	// PriorityClass returns the job's priority class name.
	PriorityClass() string
}

type JobWithReclaimablePods added in v0.4.0

type JobWithReclaimablePods interface {
	// ReclaimablePods returns the list of reclaimable pods.
	ReclaimablePods() ([]kueue.ReclaimablePod, error)
}

type JobWithSkip added in v0.5.0

type JobWithSkip interface {
	Skip() bool
}

JobWithSkip interface should be implemented by generic jobs, when reconciliation should be skipped depending on the job's state

type MultiKueueAdapter added in v0.8.0

type MultiKueueAdapter interface {
	// SyncJob creates the Job object in the worker cluster using remote client, if not already created.
	// Copy the status from the remote job if already exists.
	SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error
	// DeleteRemoteObject deletes the Job in the worker cluster.
	DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error
	// IsJobManagedByKueue returns:
	// - a bool indicating if the job object identified by key is managed by kueue and can be delegated.
	// - a reason indicating why the job is not managed by Kueue
	// - any API error encountered during the check
	IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error)
	// KeepAdmissionCheckPending returns true if the state of the multikueue admission check should be
	// kept Pending while the job runs in a worker. This might be needed to keep the managers job
	// suspended and not start the execution locally.
	KeepAdmissionCheckPending() bool
	// GVK returns GVK (Group Version Kind) for the job.
	GVK() schema.GroupVersionKind
}

MultiKueueAdapter interface needed for MultiKueue job delegation.

type MultiKueueWatcher added in v0.8.0

type MultiKueueWatcher interface {
	// GetEmptyList returns an empty list of objects
	GetEmptyList() client.ObjectList
	// WorkloadKeyFor returns the key of the workload of interest
	// - the object name for workloads
	// - the prebuilt workload for job types
	WorkloadKeyFor(runtime.Object) (types.NamespacedName, error)
}

MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter to receive job related watch events from the worker cluster. If not implemented, MultiKueue will only receive events related to the job's workload.

type NoopReconciler added in v0.9.0

type NoopReconciler struct {
	// contains filtered or unexported fields
}

func (NoopReconciler) Reconcile added in v0.9.0

func (NoopReconciler) SetupWithManager added in v0.9.0

func (r NoopReconciler) SetupWithManager(ctrl.Manager) error

type Option

type Option func(*Options)

Option configures the reconciler.

func WithCache added in v0.7.0

func WithCache(c *cache.Cache) Option

WithCache adds the cache manager.

func WithClock added in v0.9.0

func WithClock(_ testing.TB, c clock.Clock) Option

WithClock sets the clock of the reconciler. It default to system's clock and should only be changed in testing.

func WithEnabledExternalFrameworks added in v0.7.0

func WithEnabledExternalFrameworks(exFrameworks []string) Option

WithEnabledExternalFrameworks adds framework names managed by external controller in the Config API.

func WithEnabledFrameworks added in v0.6.0

func WithEnabledFrameworks(frameworks []string) Option

WithEnabledFrameworks adds framework names enabled in the ConfigAPI.

func WithIntegrationOptions added in v0.6.0

func WithIntegrationOptions(integrationName string, opts any) Option

WithIntegrationOptions adds integrations options like podOptions. The second arg, `opts` should be recognized as any option struct.

func WithKubeServerVersion added in v0.5.0

func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option

func WithLabelKeysToCopy added in v0.7.0

func WithLabelKeysToCopy(n []string) Option

WithLabelKeysToCopy adds the label keys

func WithManageJobsWithoutQueueName

func WithManageJobsWithoutQueueName(f bool) Option

WithManageJobsWithoutQueueName indicates if the controller should reconcile jobs that don't set the queue name annotation.

func WithManagedJobsNamespaceSelector added in v0.10.0

func WithManagedJobsNamespaceSelector(ls labels.Selector) Option

WithManagedJobsNamespaceSelector is used for namespace-based filtering of ManagedJobsWithoutQueueName

func WithManagerName added in v0.6.0

func WithManagerName(n string) Option

WithManagerName adds the kueue's manager name.

func WithQueues added in v0.7.0

func WithQueues(q *queue.Manager) Option

WithQueues adds the queue manager.

func WithWaitForPodsReady

func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option

WithWaitForPodsReady indicates if the controller should add the PodsReady condition to the workload when the corresponding job has all pods ready or succeeded.

type Options

type Options struct {
	ManageJobsWithoutQueueName   bool
	ManagedJobsNamespaceSelector labels.Selector
	WaitForPodsReady             bool
	KubeServerVersion            *kubeversion.ServerVersionFetcher
	IntegrationOptions           map[string]any // IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND".
	EnabledFrameworks            sets.Set[string]
	EnabledExternalFrameworks    sets.Set[string]
	ManagerName                  string
	LabelKeysToCopy              []string
	Queues                       *queue.Manager
	Cache                        *cache.Cache
	Clock                        clock.Clock
}

func ProcessOptions added in v0.6.0

func ProcessOptions(opts ...Option) Options

type ReconcilerFactory added in v0.4.0

type ReconcilerFactory func(client client.Client, record record.EventRecorder, opts ...Option) JobReconcilerInterface

func NewGenericReconcilerFactory added in v0.6.0

func NewGenericReconcilerFactory(newJob func() GenericJob, setup ...ReconcilerSetup) ReconcilerFactory

NewGenericReconcilerFactory creates a new reconciler factory for a concrete GenericJob type. newJob should return a new empty job.

func NewNoopReconcilerFactory added in v0.9.0

func NewNoopReconcilerFactory(gvk schema.GroupVersionKind) ReconcilerFactory

type ReconcilerSetup added in v0.6.0

type ReconcilerSetup func(*builder.Builder, client.Client) *builder.Builder

type StopReason added in v0.6.0

type StopReason string
const (
	StopReasonWorkloadDeleted    StopReason = "WorkloadDeleted"
	StopReasonWorkloadEvicted    StopReason = "WorkloadEvicted"
	StopReasonNoMatchingWorkload StopReason = "NoMatchingWorkload"
	StopReasonNotAdmitted        StopReason = "NotAdmitted"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL