Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool)
- func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
- func ForEachIntegration(f func(name string, cb IntegrationCallbacks) error) error
- func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object
- func GetIntegrationsList() []string
- func GetOwnerKey(ownerGVK schema.GroupVersionKind) string
- func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo
- func GetWorkloadNameForOwnerRef(owner *metav1.OwnerReference) (string, error)
- func GetWorkloadNameForOwnerWithGVK(ownerName string, ownerGVK schema.GroupVersionKind) string
- func IsOwnerManagedByKueue(owner *metav1.OwnerReference) bool
- func IsUnretryableError(e error) bool
- func ParentWorkloadName(job GenericJob) string
- func PrebuiltWorkloadFor(job GenericJob) (string, bool)
- func QueueName(job GenericJob) string
- func QueueNameForObject(object client.Object) string
- func RegisterIntegration(name string, cb IntegrationCallbacks) error
- func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error
- func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error
- func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error
- func UnretryableError(msg string) error
- func ValidateAnnotationAsCRDName(job GenericJob, crdNameAnnotation string) field.ErrorList
- func ValidateCreateForParentWorkload(job GenericJob) field.ErrorList
- func ValidateCreateForQueueName(job GenericJob) field.ErrorList
- func ValidateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList
- func ValidateUpdateForParentWorkload(oldJob, newJob GenericJob) field.ErrorList
- func ValidateUpdateForQueueName(oldJob, newJob GenericJob) field.ErrorList
- func ValidateUpdateForWorkloadPriorityClassName(oldJob, newJob GenericJob) field.ErrorList
- type ComposableJob
- type GenericJob
- type IntegrationCallbacks
- type JobReconciler
- type JobReconcilerInterface
- type JobWithCustomStop
- type JobWithFinalize
- type JobWithPriorityClass
- type JobWithReclaimablePods
- type JobWithSkip
- type Option
- func WithEnabledFrameworks(i *configapi.Integrations) Option
- func WithIntegrationOptions(integrationName string, opts any) Option
- func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option
- func WithManageJobsWithoutQueueName(f bool) Option
- func WithManagerName(n string) Option
- func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option
- type Options
- type ReconcilerFactory
- type ReconcilerSetup
- type StopReason
Constants ¶
const ( ReasonStarted = "Started" ReasonSuspended = "Suspended" ReasonStopped = "Stopped" ReasonCreatedWorkload = "CreatedWorkload" ReasonDeletedWorkload = "DeletedWorkload" ReasonUpdatedWorkload = "UpdatedWorkload" ReasonFinishedWorkload = "FinishedWorkload" ReasonErrWorkloadCompose = "ErrWorkloadCompose" )
JobReconciler event reason list
const (
FailedToStartFinishedReason = "FailedToStart"
)
Variables ¶
var ( ErrChildJobOwnerNotFound = fmt.Errorf("owner isn't set even though %s annotation is set", controllerconsts.ParentWorkloadAnnotation) ErrUnknownWorkloadOwner = errors.New("workload owner is unknown") ErrWorkloadOwnerNotFound = errors.New("workload owner not found") ErrNoMatchingWorkloads = errors.New("no matching workloads") ErrExtraWorkloads = errors.New("extra workloads") )
Functions ¶
func ApplyDefaultForSuspend ¶
func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool)
func FindMatchingWorkloads ¶ added in v0.6.0
func ForEachIntegration ¶ added in v0.4.0
func ForEachIntegration(f func(name string, cb IntegrationCallbacks) error) error
ForEachIntegration loops through the registered list of frameworks calling f, if at any point f returns an error the loop is stopped and that error is returned.
func GetEmptyOwnerObject ¶ added in v0.4.0
func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object
GetEmptyOwnerObject returns an empty object of the owner's type, returns nil if the owner is not manageable by kueue.
func GetIntegrationsList ¶ added in v0.4.0
func GetIntegrationsList() []string
GetIntegrationsList returns the list of currently registered frameworks.
func GetOwnerKey ¶ added in v0.6.0
func GetOwnerKey(ownerGVK schema.GroupVersionKind) string
func GetPodSetsInfoFromWorkload ¶ added in v0.6.0
func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo
GetPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the provided workload's spec
func GetWorkloadNameForOwnerRef ¶
func GetWorkloadNameForOwnerRef(owner *metav1.OwnerReference) (string, error)
func GetWorkloadNameForOwnerWithGVK ¶
func GetWorkloadNameForOwnerWithGVK(ownerName string, ownerGVK schema.GroupVersionKind) string
func IsOwnerManagedByKueue ¶ added in v0.4.0
func IsOwnerManagedByKueue(owner *metav1.OwnerReference) bool
IsOwnerManagedByKueue returns true if the provided owner can be managed by kueue.
func IsUnretryableError ¶ added in v0.6.0
func ParentWorkloadName ¶
func ParentWorkloadName(job GenericJob) string
func PrebuiltWorkloadFor ¶ added in v0.6.0
func PrebuiltWorkloadFor(job GenericJob) (string, bool)
func QueueName ¶
func QueueName(job GenericJob) string
func QueueNameForObject ¶ added in v0.3.2
func RegisterIntegration ¶ added in v0.4.0
func RegisterIntegration(name string, cb IntegrationCallbacks) error
RegisterIntegration registers a new framework, returns an error when attempting to register multiple frameworks with the same name of if a mandatory callback is missing.
func SetupControllers ¶ added in v0.6.0
SetupControllers setups all controllers and webhooks for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup controllers and webhooks for the in-house custom jobs.
Note that the first argument, "mgr" must be initialized on the outside of this function. In addition, if the manager uses the kueue's internal cert management for the webhooks, this function needs to be called after the certs get ready because the controllers won't work until the webhooks are operating, and the webhook won't work until the certs are all in place.
func SetupIndexes ¶ added in v0.6.0
SetupIndexes setups the indexers for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup indexers for the in-house custom jobs.
Note that the second argument, "indexer" needs to be the fieldIndexer obtained from the Manager.
func SetupWorkloadOwnerIndex ¶
func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error
func UnretryableError ¶ added in v0.6.0
UnretryableError is an error that doesn't require reconcile retry and will not be returned by the JobReconciler.
func ValidateAnnotationAsCRDName ¶
func ValidateAnnotationAsCRDName(job GenericJob, crdNameAnnotation string) field.ErrorList
func ValidateCreateForParentWorkload ¶ added in v0.4.0
func ValidateCreateForParentWorkload(job GenericJob) field.ErrorList
func ValidateCreateForQueueName ¶
func ValidateCreateForQueueName(job GenericJob) field.ErrorList
func ValidateLabelAsCRDName ¶ added in v0.6.0
func ValidateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList
func ValidateUpdateForParentWorkload ¶
func ValidateUpdateForParentWorkload(oldJob, newJob GenericJob) field.ErrorList
func ValidateUpdateForQueueName ¶
func ValidateUpdateForQueueName(oldJob, newJob GenericJob) field.ErrorList
func ValidateUpdateForWorkloadPriorityClassName ¶ added in v0.5.0
func ValidateUpdateForWorkloadPriorityClassName(oldJob, newJob GenericJob) field.ErrorList
Types ¶
type ComposableJob ¶ added in v0.6.0
type ComposableJob interface { // Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed. Load(ctx context.Context, c client.Client, key *types.NamespacedName) (removeFinalizers bool, err error) // Run unsuspends all members of the ComposableJob and injects the node affinity with podSet // counts extracting from workload to all members of the ComposableJob. Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r record.EventRecorder, msg string) error // ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob. ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error) // ListChildWorkloads returns all workloads related to the composable job ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error) // FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted. FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error) // Stop implements the custom stop procedure for ComposableJob Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error) // Ensure all members of the ComposableJob are owning the workload EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error }
ComposableJob interface should be implemented by generic jobs that are composed out of multiple API objects.
type GenericJob ¶
type GenericJob interface { // Object returns the job instance. Object() client.Object // IsSuspended returns whether the job is suspended or not. IsSuspended() bool // Suspend will suspend the job. Suspend() // RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it. RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error // RestorePodSetsInfo will restore the original node affinity and podSet counts of the job. // Returns whether any change was done. RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool // Finished means whether the job is completed/failed or not, // condition represents the workload finished condition. Finished() (condition metav1.Condition, finished bool) // PodSets will build workload podSets corresponding to the job. PodSets() []kueue.PodSet // IsActive returns true if there are any running pods. IsActive() bool // PodsReady instructs whether job derived pods are all ready now. PodsReady() bool // GVK returns GVK (Group Version Kind) for the job. GVK() schema.GroupVersionKind }
GenericJob if the interface which needs to be implemented by all jobs managed by the kueue's jobframework.
type IntegrationCallbacks ¶ added in v0.4.0
type IntegrationCallbacks struct { // NewReconciler creates a new reconciler NewReconciler ReconcilerFactory // SetupWebhook sets up the framework's webhook with the controllers manager SetupWebhook func(mgr ctrl.Manager, opts ...Option) error // JobType holds an object of the type managed by the integration's webhook JobType runtime.Object // SetupIndexes registers any additional indexes with the controllers manager // (this callback is optional) SetupIndexes func(ctx context.Context, indexer client.FieldIndexer) error // AddToScheme adds any additional types to the controllers manager's scheme // (this callback is optional) AddToScheme func(s *runtime.Scheme) error // Returns true if the provided owner reference identifies an object // managed by this integration // (this callback is optional) IsManagingObjectsOwner func(ref *metav1.OwnerReference) bool // CanSupportIntegration returns true if the integration meets any additional condition // like the Kubernetes version. CanSupportIntegration func(opts ...Option) (bool, error) }
IntegrationCallbacks groups a set of callbacks used to integrate a new framework.
func GetIntegration ¶ added in v0.4.0
func GetIntegration(name string) (IntegrationCallbacks, bool)
GetIntegration looks-up the framework identified by name in the currently registered list of frameworks returning its callbacks and true if found.
type JobReconciler ¶
type JobReconciler struct {
// contains filtered or unexported fields
}
JobReconciler reconciles a GenericJob object
func NewReconciler ¶
func NewReconciler( client client.Client, record record.EventRecorder, opts ...Option) *JobReconciler
func (*JobReconciler) IsParentJobManaged ¶ added in v0.4.0
func (r *JobReconciler) IsParentJobManaged(ctx context.Context, jobObj client.Object, namespace string) (bool, error)
IsParentJobManaged checks whether the parent job is managed by kueue.
func (*JobReconciler) ReconcileGenericJob ¶
func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error)
type JobReconcilerInterface ¶ added in v0.4.0
type JobReconcilerInterface interface { reconcile.Reconciler SetupWithManager(mgr ctrl.Manager) error }
type JobWithCustomStop ¶ added in v0.4.0
type JobWithCustomStop interface { // Stop implements a custom stop procedure. // The function should be idempotent: not do any API calls if the job is already stopped. // Returns whether the Job stopped with this call or an error Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) (bool, error) }
type JobWithFinalize ¶ added in v0.5.0
JobWithFinalize interface should be implemented by generic jobs, when custom finalization logic is needed for a job, after it's finished.
type JobWithPriorityClass ¶ added in v0.4.0
type JobWithPriorityClass interface { // PriorityClass returns the job's priority class name. PriorityClass() string }
type JobWithReclaimablePods ¶ added in v0.4.0
type JobWithReclaimablePods interface { // ReclaimablePods returns the list of reclaimable pods. ReclaimablePods() ([]kueue.ReclaimablePod, error) }
type JobWithSkip ¶ added in v0.5.0
type JobWithSkip interface {
Skip() bool
}
JobWithSkip interface should be implemented by generic jobs, when reconciliation should be skipped depending on the job's state
type Option ¶
type Option func(*Options)
Option configures the reconciler.
func WithEnabledFrameworks ¶ added in v0.6.0
func WithEnabledFrameworks(i *configapi.Integrations) Option
WithEnabledFrameworks adds framework names enabled in the ConfigAPI.
func WithIntegrationOptions ¶ added in v0.6.0
WithIntegrationOptions adds integrations options like podOptions. The second arg, `opts` should be recognized as any option struct.
func WithKubeServerVersion ¶ added in v0.5.0
func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option
func WithManageJobsWithoutQueueName ¶
WithManageJobsWithoutQueueName indicates if the controller should reconcile jobs that don't set the queue name annotation.
func WithManagerName ¶ added in v0.6.0
WithManagerName adds the kueue's manager name.
func WithWaitForPodsReady ¶
func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option
WithWaitForPodsReady indicates if the controller should add the PodsReady condition to the workload when the corresponding job has all pods ready or succeeded.
type Options ¶
type Options struct { ManageJobsWithoutQueueName bool WaitForPodsReady bool KubeServerVersion *kubeversion.ServerVersionFetcher // IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND". IntegrationOptions map[string]any EnabledFrameworks sets.Set[string] ManagerName string }
func ProcessOptions ¶ added in v0.6.0
type ReconcilerFactory ¶ added in v0.4.0
type ReconcilerFactory func(client client.Client, record record.EventRecorder, opts ...Option) JobReconcilerInterface
func NewGenericReconcilerFactory ¶ added in v0.6.0
func NewGenericReconcilerFactory(newJob func() GenericJob, setup ...ReconcilerSetup) ReconcilerFactory
NewGenericReconcilerFactory creates a new reconciler factory for a concrete GenericJob type. newJob should return a new empty job.
type ReconcilerSetup ¶ added in v0.6.0
type StopReason ¶ added in v0.6.0
type StopReason int
const ( StopReasonWorkloadDeleted StopReason = iota StopReasonWorkloadEvicted StopReasonNoMatchingWorkload StopReasonNotAdmitted )