Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool)
- func EnableIntegration(name string)
- func EnableIntegrationsForTest(tb testing.TB, names ...string) func()
- func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
- func ForEachIntegration(f func(name string, cb IntegrationCallbacks) error) error
- func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object
- func GetIntegrationsList() []string
- func GetMultiKueueAdapters() (map[string]MultiKueueAdapter, error)
- func GetOwnerKey(ownerGVK schema.GroupVersionKind) string
- func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo
- func GetWorkloadNameForOwnerWithGVK(ownerName string, ownerUID types.UID, ownerGVK schema.GroupVersionKind) string
- func IsOwnerManagedByKueue(owner *metav1.OwnerReference) bool
- func IsUnretryableError(e error) bool
- func PrebuiltWorkloadFor(job GenericJob) (string, bool)
- func QueueName(job GenericJob) string
- func QueueNameForObject(object client.Object) string
- func RegisterExternalJobType(kindArg string) error
- func RegisterIntegration(name string, cb IntegrationCallbacks) error
- func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error
- func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error
- func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error
- func UnretryableError(msg string) error
- func ValidateAnnotationAsCRDName(job GenericJob, crdNameAnnotation string) field.ErrorList
- func ValidateJobOnCreate(job GenericJob) field.ErrorList
- func ValidateJobOnUpdate(oldJob, newJob GenericJob) field.ErrorList
- func ValidateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList
- type ComposableJob
- type GenericJob
- type IntegrationCallbacks
- type JobReconciler
- type JobReconcilerInterface
- type JobWithCustomStop
- type JobWithCustomWorkloadConditions
- type JobWithFinalize
- type JobWithPriorityClass
- type JobWithReclaimablePods
- type JobWithSkip
- type MultiKueueAdapter
- type MultiKueueWatcher
- type Option
- func WithCache(c *cache.Cache) Option
- func WithEnabledExternalFrameworks(exFrameworks []string) Option
- func WithEnabledFrameworks(frameworks []string) Option
- func WithIntegrationOptions(integrationName string, opts any) Option
- func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option
- func WithLabelKeysToCopy(n []string) Option
- func WithManageJobsWithoutQueueName(f bool) Option
- func WithManagerName(n string) Option
- func WithQueues(q *queue.Manager) Option
- func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option
- type Options
- type ReconcilerFactory
- type ReconcilerSetup
- type StopReason
Constants ¶
const ( ReasonStarted = "Started" ReasonSuspended = "Suspended" ReasonStopped = "Stopped" ReasonCreatedWorkload = "CreatedWorkload" ReasonDeletedWorkload = "DeletedWorkload" ReasonUpdatedWorkload = "UpdatedWorkload" ReasonFinishedWorkload = "FinishedWorkload" ReasonErrWorkloadCompose = "ErrWorkloadCompose" ReasonUpdatedAdmissionCheck = "UpdatedAdmissionCheck" )
JobReconciler event reason list
const (
FailedToStartFinishedReason = "FailedToStart"
)
Variables ¶
Functions ¶
func ApplyDefaultForSuspend ¶
func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool)
func EnableIntegration ¶ added in v0.8.0
func EnableIntegration(name string)
EnableIntegration marks the integration identified by name as enabled.
func EnableIntegrationsForTest ¶ added in v0.8.0
EnableIntegrationsForTest - should be used only in tests Mark the frameworks identified by names and return a revert function.
func FindMatchingWorkloads ¶ added in v0.6.0
func ForEachIntegration ¶ added in v0.4.0
func ForEachIntegration(f func(name string, cb IntegrationCallbacks) error) error
ForEachIntegration loops through the registered list of frameworks calling f, if at any point f returns an error the loop is stopped and that error is returned.
func GetEmptyOwnerObject ¶ added in v0.4.0
func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object
GetEmptyOwnerObject returns an empty object of the owner's type, returns nil if the owner is not manageable by kueue.
func GetIntegrationsList ¶ added in v0.4.0
func GetIntegrationsList() []string
GetIntegrationsList returns the list of currently registered frameworks.
func GetMultiKueueAdapters ¶ added in v0.8.0
func GetMultiKueueAdapters() (map[string]MultiKueueAdapter, error)
GetMultiKueueAdapters returns the map containing the MultiKueue adapters for the registered integrations. An error is returned if more then one adapter is registers for one object type.
func GetOwnerKey ¶ added in v0.6.0
func GetOwnerKey(ownerGVK schema.GroupVersionKind) string
func GetPodSetsInfoFromWorkload ¶ added in v0.6.0
func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo
GetPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the provided workload's spec
func IsOwnerManagedByKueue ¶ added in v0.4.0
func IsOwnerManagedByKueue(owner *metav1.OwnerReference) bool
IsOwnerManagedByKueue returns true if the provided owner can be managed by kueue.
func IsUnretryableError ¶ added in v0.6.0
func PrebuiltWorkloadFor ¶ added in v0.6.0
func PrebuiltWorkloadFor(job GenericJob) (string, bool)
func QueueName ¶
func QueueName(job GenericJob) string
func QueueNameForObject ¶ added in v0.3.2
func RegisterExternalJobType ¶ added in v0.7.0
RegisterExternalJobType registers a new externally-managed Kind, returns an error if kindArg cannot be parsed as a Kind.version.group.
func RegisterIntegration ¶ added in v0.4.0
func RegisterIntegration(name string, cb IntegrationCallbacks) error
RegisterIntegration registers a new framework, returns an error when attempting to register multiple frameworks with the same name or if a mandatory callback is missing.
func SetupControllers ¶ added in v0.6.0
SetupControllers setups all controllers and webhooks for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup controllers and webhooks for the in-house custom jobs.
Note that the first argument, "mgr" must be initialized on the outside of this function. In addition, if the manager uses the kueue's internal cert management for the webhooks, this function needs to be called after the certs get ready because the controllers won't work until the webhooks are operating, and the webhook won't work until the certs are all in place.
func SetupIndexes ¶ added in v0.6.0
SetupIndexes setups the indexers for integrations. When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, they can easily setup indexers for the in-house custom jobs.
Note that the second argument, "indexer" needs to be the fieldIndexer obtained from the Manager.
func SetupWorkloadOwnerIndex ¶
func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error
func UnretryableError ¶ added in v0.6.0
UnretryableError is an error that doesn't require reconcile retry and will not be returned by the JobReconciler.
func ValidateAnnotationAsCRDName ¶
func ValidateAnnotationAsCRDName(job GenericJob, crdNameAnnotation string) field.ErrorList
func ValidateJobOnCreate ¶ added in v0.7.0
func ValidateJobOnCreate(job GenericJob) field.ErrorList
ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation
func ValidateJobOnUpdate ¶ added in v0.7.0
func ValidateJobOnUpdate(oldJob, newJob GenericJob) field.ErrorList
ValidateJobOnUpdate encapsulates all GenericJob validations that must be performed on a Update operation
func ValidateLabelAsCRDName ¶ added in v0.6.0
func ValidateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList
Types ¶
type ComposableJob ¶ added in v0.6.0
type ComposableJob interface { // Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed. Load(ctx context.Context, c client.Client, key *types.NamespacedName) (removeFinalizers bool, err error) // Run unsuspends all members of the ComposableJob and injects the node affinity with podSet // counts extracting from workload to all members of the ComposableJob. Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r record.EventRecorder, msg string) error // ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob. ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error) // ListChildWorkloads returns all workloads related to the composable job. ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error) // FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted. FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error) // Stop implements the custom stop procedure for ComposableJob. Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error) // ForEach calls f on each member of the ComposableJob. ForEach(f func(obj runtime.Object)) }
ComposableJob interface should be implemented by generic jobs that are composed out of multiple API objects.
type GenericJob ¶
type GenericJob interface { // Object returns the job instance. Object() client.Object // IsSuspended returns whether the job is suspended or not. IsSuspended() bool // Suspend will suspend the job. Suspend() // RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it. RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error // RestorePodSetsInfo will restore the original node affinity and podSet counts of the job. // Returns whether any change was done. RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool // Finished means whether the job is completed/failed or not, // condition represents the workload finished condition. // Observed generation of the workload is set by the jobframework. Finished() (message string, success, finished bool) // PodSets will build workload podSets corresponding to the job. PodSets() []kueue.PodSet // IsActive returns true if there are any running pods. IsActive() bool // PodsReady instructs whether job derived pods are all ready now. PodsReady() bool // GVK returns GVK (Group Version Kind) for the job. GVK() schema.GroupVersionKind }
GenericJob if the interface which needs to be implemented by all jobs managed by the kueue's jobframework.
type IntegrationCallbacks ¶ added in v0.4.0
type IntegrationCallbacks struct { // NewReconciler creates a new reconciler NewReconciler ReconcilerFactory // SetupWebhook sets up the framework's webhook with the controllers manager SetupWebhook func(mgr ctrl.Manager, opts ...Option) error // JobType holds an object of the type managed by the integration's webhook JobType runtime.Object // SetupIndexes registers any additional indexes with the controllers manager // (this callback is optional) SetupIndexes func(ctx context.Context, indexer client.FieldIndexer) error // AddToScheme adds any additional types to the controllers manager's scheme // (this callback is optional) AddToScheme func(s *runtime.Scheme) error // Returns true if the provided owner reference identifies an object // managed by this integration // (this callback is optional) IsManagingObjectsOwner func(ref *metav1.OwnerReference) bool // CanSupportIntegration returns true if the integration meets any additional condition // like the Kubernetes version. CanSupportIntegration func(opts ...Option) (bool, error) // The job's MultiKueue adapter (optional) MultiKueueAdapter MultiKueueAdapter }
IntegrationCallbacks groups a set of callbacks used to integrate a new framework.
func GetIntegration ¶ added in v0.4.0
func GetIntegration(name string) (IntegrationCallbacks, bool)
GetIntegration looks-up the framework identified by name in the currently registered list of frameworks returning its callbacks and true if found.
type JobReconciler ¶
type JobReconciler struct {
// contains filtered or unexported fields
}
JobReconciler reconciles a GenericJob object
func NewReconciler ¶
func NewReconciler( client client.Client, record record.EventRecorder, opts ...Option) *JobReconciler
func (*JobReconciler) IsParentJobManaged ¶ added in v0.4.0
func (r *JobReconciler) IsParentJobManaged(ctx context.Context, jobObj client.Object, namespace string) (bool, error)
IsParentJobManaged checks whether the parent job is managed by kueue.
func (*JobReconciler) ReconcileGenericJob ¶
func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error)
type JobReconcilerInterface ¶ added in v0.4.0
type JobReconcilerInterface interface { reconcile.Reconciler SetupWithManager(mgr ctrl.Manager) error }
type JobWithCustomStop ¶ added in v0.4.0
type JobWithCustomStop interface { // Stop implements a custom stop procedure. // The function should be idempotent: not do any API calls if the job is already stopped. // Returns whether the Job stopped with this call or an error Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) (bool, error) }
type JobWithCustomWorkloadConditions ¶ added in v0.7.0
type JobWithCustomWorkloadConditions interface { // CustomWorkloadConditions return custom workload conditions and status changed or not. CustomWorkloadConditions(wl *kueue.Workload) ([]metav1.Condition, bool) }
JobWithCustomWorkloadConditions interface should be implemented by generic jobs, when custom workload conditions should be updated after ensure that the workload exists.
type JobWithFinalize ¶ added in v0.5.0
JobWithFinalize interface should be implemented by generic jobs, when custom finalization logic is needed for a job, after it's finished.
type JobWithPriorityClass ¶ added in v0.4.0
type JobWithPriorityClass interface { // PriorityClass returns the job's priority class name. PriorityClass() string }
type JobWithReclaimablePods ¶ added in v0.4.0
type JobWithReclaimablePods interface { // ReclaimablePods returns the list of reclaimable pods. ReclaimablePods() ([]kueue.ReclaimablePod, error) }
type JobWithSkip ¶ added in v0.5.0
type JobWithSkip interface {
Skip() bool
}
JobWithSkip interface should be implemented by generic jobs, when reconciliation should be skipped depending on the job's state
type MultiKueueAdapter ¶ added in v0.8.0
type MultiKueueAdapter interface { // SyncJob creates the Job object in the worker cluster using remote client, if not already created. // Copy the status from the remote job if already exists. SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error // DeleteRemoteObject deletes the Job in the worker cluster. DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error // IsJobManagedByKueue returns: // - a bool indicating if the job object identified by key is managed by kueue and can be delegated. // - a reason indicating why the job is not managed by Kueue // - any API error encountered during the check IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) // KeepAdmissionCheckPending returns true if the state of the multikueue admission check should be // kept Pending while the job runs in a worker. This might be needed to keep the managers job // suspended and not start the execution locally. KeepAdmissionCheckPending() bool // GVK returns GVK (Group Version Kind) for the job. GVK() schema.GroupVersionKind }
MultiKueueAdapter interface needed for MultiKueue job delegation.
type MultiKueueWatcher ¶ added in v0.8.0
type MultiKueueWatcher interface { // GetEmptyList returns an empty list of objects GetEmptyList() client.ObjectList // WorkloadKeyFor returns the key of the workload of interest // - the object name for workloads // - the prebuilt workload for job types WorkloadKeyFor(runtime.Object) (types.NamespacedName, error) }
MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter to receive job related watch events from the worker cluster. If not implemented, MultiKueue will only receive events related to the job's workload.
type Option ¶
type Option func(*Options)
Option configures the reconciler.
func WithEnabledExternalFrameworks ¶ added in v0.7.0
WithEnabledExternalFrameworks adds framework names managed by external controller in the Config API.
func WithEnabledFrameworks ¶ added in v0.6.0
WithEnabledFrameworks adds framework names enabled in the ConfigAPI.
func WithIntegrationOptions ¶ added in v0.6.0
WithIntegrationOptions adds integrations options like podOptions. The second arg, `opts` should be recognized as any option struct.
func WithKubeServerVersion ¶ added in v0.5.0
func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option
func WithLabelKeysToCopy ¶ added in v0.7.0
WithLabelKeysToCopy adds the label keys
func WithManageJobsWithoutQueueName ¶
WithManageJobsWithoutQueueName indicates if the controller should reconcile jobs that don't set the queue name annotation.
func WithManagerName ¶ added in v0.6.0
WithManagerName adds the kueue's manager name.
func WithQueues ¶ added in v0.7.0
WithQueues adds the queue manager.
func WithWaitForPodsReady ¶
func WithWaitForPodsReady(w *configapi.WaitForPodsReady) Option
WithWaitForPodsReady indicates if the controller should add the PodsReady condition to the workload when the corresponding job has all pods ready or succeeded.
type Options ¶
type Options struct { ManageJobsWithoutQueueName bool WaitForPodsReady bool KubeServerVersion *kubeversion.ServerVersionFetcher // IntegrationOptions key is "$GROUP/$VERSION, Kind=$KIND". IntegrationOptions map[string]any EnabledFrameworks sets.Set[string] EnabledExternalFrameworks sets.Set[string] ManagerName string LabelKeysToCopy []string Queues *queue.Manager Cache *cache.Cache }
func ProcessOptions ¶ added in v0.6.0
type ReconcilerFactory ¶ added in v0.4.0
type ReconcilerFactory func(client client.Client, record record.EventRecorder, opts ...Option) JobReconcilerInterface
func NewGenericReconcilerFactory ¶ added in v0.6.0
func NewGenericReconcilerFactory(newJob func() GenericJob, setup ...ReconcilerSetup) ReconcilerFactory
NewGenericReconcilerFactory creates a new reconciler factory for a concrete GenericJob type. newJob should return a new empty job.
type ReconcilerSetup ¶ added in v0.6.0
type StopReason ¶ added in v0.6.0
type StopReason string
const ( StopReasonWorkloadDeleted StopReason = "WorkloadDeleted" StopReasonWorkloadEvicted StopReason = "WorkloadEvicted" StopReasonNoMatchingWorkload StopReason = "NoMatchingWorkload" StopReasonNotAdmitted StopReason = "NotAdmitted" )