queue

package
v0.6.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Key

func Key(q *kueue.LocalQueue) string

Key is the key used to index the queue.

Types

type ClusterQueue

type ClusterQueue interface {
	// Update updates the properties of this ClusterQueue.
	Update(*kueue.ClusterQueue) error
	// Cohort returns the Cohort of this ClusterQueue.
	Cohort() string

	// AddFromLocalQueue pushes all workloads belonging to this queue to
	// the ClusterQueue. If at least one workload is added, returns true,
	// otherwise returns false.
	AddFromLocalQueue(*LocalQueue) bool
	// DeleteFromLocalQueue removes all workloads belonging to this queue from
	// the ClusterQueue.
	DeleteFromLocalQueue(*LocalQueue)

	// PushOrUpdate pushes the workload to ClusterQueue.
	// If the workload is already present, updates with the new one.
	PushOrUpdate(*workload.Info)
	// Delete removes the workload from ClusterQueue.
	Delete(*kueue.Workload)
	// Pop removes the head of the queue and returns it. It returns nil if the
	// queue is empty.
	Pop() *workload.Info

	// RequeueIfNotPresent inserts a workload that was not
	// admitted back into the ClusterQueue. If the boolean is true,
	// the workloads should be put back in the queue immediately,
	// because we couldn't determine if the workload was admissible
	// in the last cycle. If the boolean is false, the implementation might
	// choose to keep it in temporary placeholder stage where it doesn't
	// compete with other workloads, until cluster events free up quota.
	// The workload should not be reinserted if it's already in the ClusterQueue.
	// Returns true if the workload was inserted.
	RequeueIfNotPresent(*workload.Info, RequeueReason) bool
	// QueueInadmissibleWorkloads moves all workloads put in temporary placeholder stage
	// to the ClusterQueue. If at least one workload is moved,
	// returns true, otherwise returns false.
	QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool

	// Pending returns the total number of pending workloads.
	Pending() int

	// PendingActive returns the number of active pending workloads,
	// workloads that are in the admission queue.
	PendingActive() int
	// PendingInadmissible returns the number of inadmissible pending workloads,
	// workloads that were already tried and are waiting for cluster conditions
	// to change to potentially become admissible.
	PendingInadmissible() int

	// Dump produces a dump of the current workloads in the heap of
	// this ClusterQueue. It returns false if the queue is empty,
	// otherwise returns true.
	Dump() ([]string, bool)
	DumpInadmissible() ([]string, bool)
	// Snapshot returns a copy of the current workloads in the heap of
	// this ClusterQueue.
	Snapshot() []*workload.Info
	// Info returns workload.Info for the workload key.
	// Users of this method should not modify the returned object.
	Info(string) *workload.Info

	// Returns true if the queue is active
	Active() bool
}

ClusterQueue is an interface for a cluster queue to store workloads waiting to be scheduled.

type ClusterQueueBestEffortFIFO

type ClusterQueueBestEffortFIFO struct {
	// contains filtered or unexported fields
}

ClusterQueueBestEffortFIFO is the implementation for the ClusterQueue for BestEffortFIFO.

func (ClusterQueueBestEffortFIFO) Active added in v0.6.0

func (c ClusterQueueBestEffortFIFO) Active() bool

func (ClusterQueueBestEffortFIFO) AddFromLocalQueue added in v0.3.0

func (c ClusterQueueBestEffortFIFO) AddFromLocalQueue(q *LocalQueue) bool

func (ClusterQueueBestEffortFIFO) Cohort added in v0.3.0

func (c ClusterQueueBestEffortFIFO) Cohort() string

func (ClusterQueueBestEffortFIFO) Delete

func (c ClusterQueueBestEffortFIFO) Delete(w *kueue.Workload)

func (ClusterQueueBestEffortFIFO) DeleteFromLocalQueue added in v0.3.0

func (c ClusterQueueBestEffortFIFO) DeleteFromLocalQueue(q *LocalQueue)

func (ClusterQueueBestEffortFIFO) Dump added in v0.3.0

func (c ClusterQueueBestEffortFIFO) Dump() ([]string, bool)

func (ClusterQueueBestEffortFIFO) DumpInadmissible added in v0.3.0

func (c ClusterQueueBestEffortFIFO) DumpInadmissible() ([]string, bool)

func (ClusterQueueBestEffortFIFO) Info added in v0.3.0

func (c ClusterQueueBestEffortFIFO) Info(key string) *workload.Info

func (ClusterQueueBestEffortFIFO) Pending added in v0.1.1

func (c ClusterQueueBestEffortFIFO) Pending() int

func (ClusterQueueBestEffortFIFO) PendingActive added in v0.3.0

func (c ClusterQueueBestEffortFIFO) PendingActive() int

func (ClusterQueueBestEffortFIFO) PendingInadmissible added in v0.3.0

func (c ClusterQueueBestEffortFIFO) PendingInadmissible() int

func (ClusterQueueBestEffortFIFO) Pop added in v0.3.0

func (c ClusterQueueBestEffortFIFO) Pop() *workload.Info

func (ClusterQueueBestEffortFIFO) PushOrUpdate

func (c ClusterQueueBestEffortFIFO) PushOrUpdate(wInfo *workload.Info)

func (ClusterQueueBestEffortFIFO) QueueInadmissibleWorkloads

func (c ClusterQueueBestEffortFIFO) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool

QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. If at least one workload is moved, returns true, otherwise returns false.

func (*ClusterQueueBestEffortFIFO) RequeueIfNotPresent

func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool

func (ClusterQueueBestEffortFIFO) Snapshot added in v0.5.0

func (c ClusterQueueBestEffortFIFO) Snapshot() []*workload.Info

func (ClusterQueueBestEffortFIFO) Update added in v0.3.0

func (c ClusterQueueBestEffortFIFO) Update(apiCQ *kueue.ClusterQueue) error

type ClusterQueueStrictFIFO

type ClusterQueueStrictFIFO struct {
	// contains filtered or unexported fields
}

ClusterQueueStrictFIFO is the implementation for the ClusterQueue for StrictFIFO.

func (ClusterQueueStrictFIFO) Active added in v0.6.0

func (c ClusterQueueStrictFIFO) Active() bool

func (ClusterQueueStrictFIFO) AddFromLocalQueue added in v0.3.0

func (c ClusterQueueStrictFIFO) AddFromLocalQueue(q *LocalQueue) bool

func (ClusterQueueStrictFIFO) Cohort added in v0.3.0

func (c ClusterQueueStrictFIFO) Cohort() string

func (ClusterQueueStrictFIFO) Delete added in v0.3.0

func (c ClusterQueueStrictFIFO) Delete(w *kueue.Workload)

func (ClusterQueueStrictFIFO) DeleteFromLocalQueue added in v0.3.0

func (c ClusterQueueStrictFIFO) DeleteFromLocalQueue(q *LocalQueue)

func (ClusterQueueStrictFIFO) Dump added in v0.3.0

func (c ClusterQueueStrictFIFO) Dump() ([]string, bool)

func (ClusterQueueStrictFIFO) DumpInadmissible added in v0.3.0

func (c ClusterQueueStrictFIFO) DumpInadmissible() ([]string, bool)

func (ClusterQueueStrictFIFO) Info added in v0.3.0

func (c ClusterQueueStrictFIFO) Info(key string) *workload.Info

func (ClusterQueueStrictFIFO) Pending added in v0.3.0

func (c ClusterQueueStrictFIFO) Pending() int

func (ClusterQueueStrictFIFO) PendingActive added in v0.3.0

func (c ClusterQueueStrictFIFO) PendingActive() int

func (ClusterQueueStrictFIFO) PendingInadmissible added in v0.3.0

func (c ClusterQueueStrictFIFO) PendingInadmissible() int

func (ClusterQueueStrictFIFO) Pop added in v0.3.0

func (c ClusterQueueStrictFIFO) Pop() *workload.Info

func (ClusterQueueStrictFIFO) PushOrUpdate added in v0.3.0

func (c ClusterQueueStrictFIFO) PushOrUpdate(wInfo *workload.Info)

func (ClusterQueueStrictFIFO) QueueInadmissibleWorkloads added in v0.3.0

func (c ClusterQueueStrictFIFO) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool

QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. If at least one workload is moved, returns true, otherwise returns false.

func (*ClusterQueueStrictFIFO) RequeueIfNotPresent added in v0.3.0

func (cq *ClusterQueueStrictFIFO) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool

RequeueIfNotPresent requeues if the workload is not present. If the reason for requeue is that the workload doesn't match the CQ's namespace selector, then the requeue is not immediate.

func (ClusterQueueStrictFIFO) Snapshot added in v0.5.0

func (c ClusterQueueStrictFIFO) Snapshot() []*workload.Info

func (ClusterQueueStrictFIFO) Update added in v0.3.0

func (c ClusterQueueStrictFIFO) Update(apiCQ *kueue.ClusterQueue) error

type LocalQueue added in v0.2.0

type LocalQueue struct {
	Key          string
	ClusterQueue string
	// contains filtered or unexported fields
}

LocalQueue is the internal implementation of kueue.LocalQueue.

func (*LocalQueue) AddOrUpdate added in v0.2.0

func (q *LocalQueue) AddOrUpdate(info *workload.Info)

type Manager

type Manager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewManager

func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Manager

func (*Manager) AddClusterQueue

func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error

func (*Manager) AddLocalQueue added in v0.2.0

func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error

func (*Manager) AddOrUpdateWorkload

func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool

AddOrUpdateWorkload adds or updates workload to the corresponding queue. Returns whether the queue existed.

func (*Manager) Broadcast added in v0.2.0

func (m *Manager) Broadcast()

func (*Manager) CleanUpOnContext

func (m *Manager) CleanUpOnContext(ctx context.Context)

CleanUpOnContext tracks the context. When closed, it wakes routines waiting on elements to be available. It should be called before doing any calls to Heads.

func (*Manager) ClusterQueueForWorkload

func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool)

ClusterQueueForWorkload returns the name of the ClusterQueue where the workload should be queued and whether it exists. Returns empty string if the queue doesn't exist.

func (*Manager) ClusterQueueFromLocalQueue added in v0.6.0

func (m *Manager) ClusterQueueFromLocalQueue(lqName string) (string, error)

func (*Manager) DeleteClusterQueue

func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue)

func (*Manager) DeleteLocalQueue added in v0.2.0

func (m *Manager) DeleteLocalQueue(q *kueue.LocalQueue)

func (*Manager) DeleteSnapshot added in v0.5.0

func (m *Manager) DeleteSnapshot(cq *kueue.ClusterQueue)

func (*Manager) DeleteWorkload

func (m *Manager) DeleteWorkload(w *kueue.Workload)

func (*Manager) Dump

func (m *Manager) Dump() map[string][]string

Dump is a dump of the queues and it's elements (unordered). Only use for testing purposes.

func (*Manager) DumpInadmissible added in v0.2.0

func (m *Manager) DumpInadmissible() map[string][]string

DumpInadmissible is a dump of the inadmissible workloads list. Only use for testing purposes.

func (*Manager) GetClusterQueueNames added in v0.5.0

func (m *Manager) GetClusterQueueNames() []string

func (*Manager) GetSnapshot added in v0.5.0

func (m *Manager) GetSnapshot(cqName string) []kueue.ClusterQueuePendingWorkload

func (*Manager) Heads

func (m *Manager) Heads(ctx context.Context) []workload.Info

Heads returns the heads of the queues, along with their associated ClusterQueue. It blocks if the queues empty until they have elements or the context terminates.

func (*Manager) LogDump added in v0.6.0

func (m *Manager) LogDump(log logr.Logger)

LogDump dumps the pending and inadmissible workloads for each ClusterQueue into the log, one line per ClusterQueue.

func (*Manager) Pending

func (m *Manager) Pending(cq *kueue.ClusterQueue) int

func (*Manager) PendingWorkloads

func (m *Manager) PendingWorkloads(q *kueue.LocalQueue) (int32, error)

func (*Manager) PendingWorkloadsInfo added in v0.6.0

func (m *Manager) PendingWorkloadsInfo(cqName string) []*workload.Info

func (*Manager) QueueAssociatedInadmissibleWorkloadsAfter added in v0.3.0

func (m *Manager) QueueAssociatedInadmissibleWorkloadsAfter(ctx context.Context, w *kueue.Workload, action func())

QueueAssociatedInadmissibleWorkloadsAfter requeues into the heaps all previously inadmissible workloads in the same ClusterQueue and cohort (if they exist) as the provided admitted workload to the heaps. An optional action can be executed at the beginning of the function, while holding the lock, to provide atomicity with the operations in the queues.

func (*Manager) QueueForWorkloadExists

func (m *Manager) QueueForWorkloadExists(wl *kueue.Workload) bool

func (*Manager) QueueInadmissibleWorkloads added in v0.2.0

func (m *Manager) QueueInadmissibleWorkloads(ctx context.Context, cqNames sets.Set[string])

QueueInadmissibleWorkloads moves all inadmissibleWorkloads in corresponding ClusterQueues to heap. If at least one workload queued, we will broadcast the event.

func (*Manager) RequeueWorkload

func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, reason RequeueReason) bool

RequeueWorkload requeues the workload ensuring that the queue and the workload still exist in the client cache and not admitted. It won't requeue if the workload is already in the queue (possible if the workload was updated).

func (*Manager) UpdateClusterQueue

func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error

func (*Manager) UpdateLocalQueue added in v0.2.0

func (m *Manager) UpdateLocalQueue(q *kueue.LocalQueue) error

func (*Manager) UpdateSnapshot added in v0.5.0

func (m *Manager) UpdateSnapshot(cqName string, maxCount int32) bool

UpdateSnapshot computes the new snapshot and replaces if it differs from the previous version. It returns true if the snapshot was actually updated.

func (*Manager) UpdateWorkload

func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) bool

UpdateWorkload updates the workload to the corresponding queue or adds it if it didn't exist. Returns whether the queue existed.

type Option added in v0.6.0

type Option func(*options)

Option configures the manager.

func WithPodsReadyRequeuingTimestamp added in v0.6.0

func WithPodsReadyRequeuingTimestamp(ts config.RequeuingTimestamp) Option

WithPodsReadyRequeuingTimestamp sets the timestamp that is used for ordering workloads that have been requeued due to the PodsReady condition.

type RequeueReason added in v0.2.0

type RequeueReason string
const (
	RequeueReasonFailedAfterNomination RequeueReason = "FailedAfterNomination"
	RequeueReasonNamespaceMismatch     RequeueReason = "NamespaceMismatch"
	RequeueReasonGeneric               RequeueReason = ""
	RequeueReasonPendingPreemption     RequeueReason = "PendingPreemption"
)

type StatusChecker added in v0.2.0

type StatusChecker interface {
	// ClusterQueueActive returns whether the clusterQueue is active.
	ClusterQueueActive(name string) bool
}

StatusChecker checks status of clusterQueue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL