scheduling

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PopulatePreemptionDescriptions added in v0.15.4

func PopulatePreemptionDescriptions(preemptedJobs []*context.JobSchedulingContext, scheduledJobs []*context.JobSchedulingContext)

func PreemptedJobsFromSchedulerResult

func PreemptedJobsFromSchedulerResult(sr *SchedulerResult) []*jobdb.Job

PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result.

func ScheduledJobsFromSchedulerResult

func ScheduledJobsFromSchedulerResult(sr *SchedulerResult) []*jobdb.Job

ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result.

Types

type CandidateGangIterator

type CandidateGangIterator interface {
	Peek() (*schedulercontext.GangSchedulingContext, float64, error)
	Clear() error
	GetAllocationForQueue(queue string) (internaltypes.ResourceList, bool)
	OnlyYieldEvicted()
	OnlyYieldEvictedForQueue(queue string)
}

type CostBasedCandidateGangIterator added in v0.15.7

type CostBasedCandidateGangIterator struct {
	// contains filtered or unexported fields
}

CostBasedCandidateGangIterator determines which gang to try scheduling next across queues. Specifically, it yields the next gang in the queue with smallest fraction of its fair share, where the fraction of fair share computation includes the yielded gang.

func NewCostBasedCandidateGangIterator added in v0.15.7

func NewCostBasedCandidateGangIterator(
	pool string,
	queueRepository fairness.QueueRepository,
	fairnessCostProvider fairness.FairnessCostProvider,
	iteratorsByQueue map[string]*QueuedGangIterator,
	considerPriority bool,
	prioritiseLargerJobs bool,
) (*CostBasedCandidateGangIterator, error)

func (*CostBasedCandidateGangIterator) Clear added in v0.15.7

Clear removes the first item in the iterator. If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed.

func (*CostBasedCandidateGangIterator) GetAllocationForQueue added in v0.15.7

func (it *CostBasedCandidateGangIterator) GetAllocationForQueue(queue string) (internaltypes.ResourceList, bool)

func (*CostBasedCandidateGangIterator) OnlyYieldEvicted added in v0.15.7

func (it *CostBasedCandidateGangIterator) OnlyYieldEvicted()

func (*CostBasedCandidateGangIterator) OnlyYieldEvictedForQueue added in v0.15.7

func (it *CostBasedCandidateGangIterator) OnlyYieldEvictedForQueue(queue string)

func (*CostBasedCandidateGangIterator) Peek added in v0.15.7

type Evictor

type Evictor struct {
	// contains filtered or unexported fields
}

func NewFilteredEvictor

func NewFilteredEvictor(
	jobRepo JobRepository,
	nodeDb *nodedb.NodeDb,
	nodeIdsToEvict map[string]bool,
	jobIdsToEvict map[string]bool,
) *Evictor

NewFilteredEvictor returns a new evictor that evicts all jobs for which jobIdsToEvict[jobId] is true on nodes for which nodeIdsToEvict[nodeId] is true.

func NewNodeEvictor

func NewNodeEvictor(
	jobRepo JobRepository,
	nodeDb *nodedb.NodeDb,
	jobFilter func(*armadacontext.Context, *jobdb.Job) bool,
) *Evictor

func NewOversubscribedEvictor

func NewOversubscribedEvictor(
	queueChecker queueChecker,
	jobRepo JobRepository,
	nodeDb *nodedb.NodeDb,
) *Evictor

NewOversubscribedEvictor returns a new evictor that for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled

func (*Evictor) Evict

func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*EvictorResult, error)

Evict removes jobs from nodes, returning all affected jobs and nodes. Any node for which nodeFilter returns false is skipped. Any job for which jobFilter returns true is evicted (if the node was not skipped). If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.

type EvictorPerQueueStats added in v0.15.4

type EvictorPerQueueStats struct {
	EvictedJobCount  int
	EvictedResources internaltypes.ResourceList
}

type EvictorResult

type EvictorResult struct {
	// For all evicted jobs, map from job id to the scheduling context for re-scheduling that job.
	EvictedJctxsByJobId map[string]*schedulercontext.JobSchedulingContext
	// Map from node id to node, containing all nodes on which at least one job was evicted.
	AffectedNodesById map[string]*internaltypes.Node
	// For each evicted job, maps the id of the job to the id of the node it was evicted from.
	NodeIdByJobId map[string]string
}

func (*EvictorResult) GetStatsPerQueue added in v0.15.4

func (er *EvictorResult) GetStatsPerQueue() map[string]EvictorPerQueueStats

func (*EvictorResult) SummaryString

func (er *EvictorResult) SummaryString() string

type FairSchedulingAlgo

type FairSchedulingAlgo struct {
	// contains filtered or unexported fields
}

FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler.

func NewFairSchedulingAlgo

func NewFairSchedulingAlgo(
	config configuration.SchedulingConfig,
	maxSchedulingDuration time.Duration,
	executorRepository database.ExecutorRepository,
	queueCache queue.QueueCache,
	schedulingContextRepository *reports.SchedulingContextRepository,
	resourceListFactory *internaltypes.ResourceListFactory,
	floatingResourceTypes *floatingresources.FloatingResourceTypes,
) (*FairSchedulingAlgo, error)

func (*FairSchedulingAlgo) Schedule

func (l *FairSchedulingAlgo) Schedule(
	ctx *armadacontext.Context,
	txn *jobdb.Txn,
) (*SchedulerResult, error)

Schedule assigns jobs to nodes in the same way as the old lease call. It iterates over each executor in turn (using lexicographical order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor. It maintains state of which executors it has considered already and may take multiple Schedule() calls to consider all executors if scheduling is slow. Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.

func (*FairSchedulingAlgo) SchedulePool added in v0.14.0

SchedulePool schedules jobs on nodes that belong to a given pool.

type FairSchedulingAlgoContext added in v0.14.0

type FairSchedulingAlgoContext struct {
	Txn *jobdb.Txn
	// contains filtered or unexported fields
}

type GangScheduler

type GangScheduler struct {
	// contains filtered or unexported fields
}

GangScheduler schedules one gang at a time. GangScheduler is not aware of queues.

func NewGangScheduler

func NewGangScheduler(
	sctx *context.SchedulingContext,
	constraints schedulerconstraints.SchedulingConstraints,
	floatingResourceTypes *floatingresources.FloatingResourceTypes,
	nodeDb *nodedb.NodeDb,
	skipUnsuccessfulSchedulingKeyCheck bool,
) (*GangScheduler, error)

func (*GangScheduler) Schedule

func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *context.GangSchedulingContext) (ok bool, unschedulableReason string, err error)

type InMemoryJobIterator

type InMemoryJobIterator struct {
	// contains filtered or unexported fields
}

func (*InMemoryJobIterator) Next

type InMemoryJobRepository

type InMemoryJobRepository struct {
	// contains filtered or unexported fields
}

func NewInMemoryJobRepository

func NewInMemoryJobRepository(pool string, sortOrder func(a, b *jobdb.Job) int) *InMemoryJobRepository

func (*InMemoryJobRepository) EnqueueMany

func (repo *InMemoryJobRepository) EnqueueMany(jctxs []*schedulercontext.JobSchedulingContext)

func (*InMemoryJobRepository) GetExistingJobsByIds

func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job

func (*InMemoryJobRepository) GetJobIterator

func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterator

func (*InMemoryJobRepository) GetQueueJobIds

func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) []string

type JobContextIterator

type JobContextIterator interface {
	Next() (*schedulercontext.JobSchedulingContext, error)
}

type JobRepository

type JobRepository interface {
	QueuedJobs(queueName string, order jobdb.JobSortOrder) jobdb.JobIterator
	GetById(id string) *jobdb.Job
}

type MarketBasedCandidateGangIterator added in v0.15.7

type MarketBasedCandidateGangIterator struct {
	// contains filtered or unexported fields
}

func NewMarketCandidateGangIterator added in v0.15.7

func NewMarketCandidateGangIterator(
	pool string,
	queueRepository fairness.QueueRepository,
	iteratorsByQueue map[string]*QueuedGangIterator,
) (*MarketBasedCandidateGangIterator, error)

func (*MarketBasedCandidateGangIterator) Clear added in v0.15.7

Clear removes the first item in the iterator. If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed.

func (*MarketBasedCandidateGangIterator) GetAllocationForQueue added in v0.15.7

func (it *MarketBasedCandidateGangIterator) GetAllocationForQueue(queue string) (internaltypes.ResourceList, bool)

func (*MarketBasedCandidateGangIterator) OnlyYieldEvicted added in v0.15.7

func (it *MarketBasedCandidateGangIterator) OnlyYieldEvicted()

func (*MarketBasedCandidateGangIterator) OnlyYieldEvictedForQueue added in v0.15.7

func (it *MarketBasedCandidateGangIterator) OnlyYieldEvictedForQueue(queue string)

func (*MarketBasedCandidateGangIterator) Peek added in v0.15.7

type MarketDrivenMultiJobsIterator added in v0.15.7

type MarketDrivenMultiJobsIterator struct {
	// contains filtered or unexported fields
}

MarketDrivenMultiJobsIterator combines two iterators by price

func NewMarketDrivenMultiJobsIterator added in v0.15.7

func NewMarketDrivenMultiJobsIterator(it1, it2 JobContextIterator) *MarketDrivenMultiJobsIterator

func (*MarketDrivenMultiJobsIterator) Next added in v0.15.7

type MarketIteratorPQ added in v0.15.7

type MarketIteratorPQ struct {
	// contains filtered or unexported fields
}

func (*MarketIteratorPQ) Len added in v0.15.7

func (pq *MarketIteratorPQ) Len() int

func (*MarketIteratorPQ) Less added in v0.15.7

func (pq *MarketIteratorPQ) Less(i, j int) bool

func (*MarketIteratorPQ) Pop added in v0.15.7

func (pq *MarketIteratorPQ) Pop() any

func (*MarketIteratorPQ) Push added in v0.15.7

func (pq *MarketIteratorPQ) Push(x any)

func (*MarketIteratorPQ) Swap added in v0.15.7

func (pq *MarketIteratorPQ) Swap(i, j int)

type MarketIteratorPQItem added in v0.15.7

type MarketIteratorPQItem struct {
	// contains filtered or unexported fields
}

type MinimalQueue

type MinimalQueue struct {
	// contains filtered or unexported fields
}

func (MinimalQueue) GetAllocation

func (q MinimalQueue) GetAllocation() internaltypes.ResourceList

func (MinimalQueue) GetWeight

func (q MinimalQueue) GetWeight() float64

type MinimalQueueRepository

type MinimalQueueRepository struct {
	// contains filtered or unexported fields
}

func NewMinimalQueueRepositoryFromSchedulingContext

func NewMinimalQueueRepositoryFromSchedulingContext(sctx *schedulercontext.SchedulingContext) *MinimalQueueRepository

func (*MinimalQueueRepository) GetQueue

func (qr *MinimalQueueRepository) GetQueue(name string) (fairness.Queue, bool)

type MultiJobsIterator

type MultiJobsIterator struct {
	// contains filtered or unexported fields
}

MultiJobsIterator chains several JobIterators together in the order provided.

func NewMultiJobsIterator

func NewMultiJobsIterator(its ...JobContextIterator) *MultiJobsIterator

func (*MultiJobsIterator) Next

type PerPoolSchedulingStats added in v0.15.4

type PerPoolSchedulingStats struct {
	// scheduling stats per queue
	StatsPerQueue map[string]QueueStats
	// number of loops executed in this cycle
	LoopNumber int
	// Result of any eviction in this cycle
	EvictorResult *EvictorResult
}

type PreemptingQueueScheduler

type PreemptingQueueScheduler struct {
	// contains filtered or unexported fields
}

PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. Uses QueueScheduler as a building block.

func NewPreemptingQueueScheduler

func NewPreemptingQueueScheduler(
	sctx *schedulercontext.SchedulingContext,
	constraints schedulerconstraints.SchedulingConstraints,
	floatingResourceTypes *floatingresources.FloatingResourceTypes,
	preferLargeJobOrdering bool,
	protectedFractionOfFairShare float64,
	maxQueueLookBack uint,
	jobRepo JobRepository,
	nodeDb *nodedb.NodeDb,
	initialNodeIdByJobId map[string]string,
	initialJobIdsByGangId map[string]map[string]bool,
	initialGangIdByJobId map[string]string,
	marketDriven bool,
) *PreemptingQueueScheduler

func (*PreemptingQueueScheduler) Schedule

Schedule - preempts jobs belonging to queues with total allocation above their fair share and - schedules new jobs belonging to queues with total allocation less than their fair share.

type QueueCandidateGangIteratorItem

type QueueCandidateGangIteratorItem struct {
	// contains filtered or unexported fields
}

type QueueCandidateGangIteratorPQ

type QueueCandidateGangIteratorPQ struct {
	// contains filtered or unexported fields
}

QueueCandidateGangIteratorPQ is a priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.

func (*QueueCandidateGangIteratorPQ) Len

func (*QueueCandidateGangIteratorPQ) Less

func (pq *QueueCandidateGangIteratorPQ) Less(i, j int) bool

func (*QueueCandidateGangIteratorPQ) Pop

func (*QueueCandidateGangIteratorPQ) Push

func (pq *QueueCandidateGangIteratorPQ) Push(x any)

func (*QueueCandidateGangIteratorPQ) Swap

func (pq *QueueCandidateGangIteratorPQ) Swap(i, j int)

type QueueScheduler

type QueueScheduler struct {
	// contains filtered or unexported fields
}

QueueScheduler is responsible for choosing the order in which to attempt scheduling queued gangs. Relies on GangScheduler for scheduling once a gang is chosen.

func NewQueueScheduler

func NewQueueScheduler(
	sctx *schedulercontext.SchedulingContext,
	constraints schedulerconstraints.SchedulingConstraints,
	floatingResourceTypes *floatingresources.FloatingResourceTypes,
	nodeDb *nodedb.NodeDb,
	jobIteratorByQueue map[string]JobContextIterator,
	skipUnsuccessfulSchedulingKeyCheck bool,
	considerPriorityClassPriority bool,
	prioritiseLargerJobs bool,
	maxQueueLookBack uint,
	marketDriven bool,
) (*QueueScheduler, error)

func (*QueueScheduler) Schedule

func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error)

type QueueStats added in v0.15.4

type QueueStats struct {
	GangsConsidered                  int
	JobsConsidered                   int
	GangsScheduled                   int
	FirstGangConsideredSampleJobId   string
	FirstGangConsideredResult        string
	FirstGangConsideredQueuePosition int
	LastGangScheduledSampleJobId     string
	LastGangScheduledQueuePosition   int
	LastGangScheduledQueueCost       float64
	LastGangScheduledResources       internaltypes.ResourceList
	LastGangScheduledQueueResources  internaltypes.ResourceList
	Time                             time.Duration
}

type QueuedGangIterator

type QueuedGangIterator struct {
	// contains filtered or unexported fields
}

QueuedGangIterator is an iterator over queued gangs. Each gang is yielded once its final member is received from the underlying iterator. Jobs without gangIdAnnotation are considered gangs of cardinality 1.

func NewQueuedGangIterator

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator

func (*QueuedGangIterator) Clear

func (it *QueuedGangIterator) Clear() error

func (*QueuedGangIterator) Next

func (*QueuedGangIterator) Peek

type QueuedJobsIterator

type QueuedJobsIterator struct {
	// contains filtered or unexported fields
}

QueuedJobsIterator is an iterator over all jobs in a queue.

func NewQueuedJobsIterator

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, repo JobRepository, order jobdb.JobSortOrder) *QueuedJobsIterator

func (*QueuedJobsIterator) Next

type SchedulerResult

type SchedulerResult struct {
	// Running jobs that should be preempted.
	PreemptedJobs []*context.JobSchedulingContext
	// Queued jobs that should be scheduled.
	ScheduledJobs []*context.JobSchedulingContext
	// For each preempted job, maps the job id to the id of the node on which the job was running.
	// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
	NodeIdByJobId map[string]string
	// Each result may bundle the result of several scheduling decisions.
	// These are the corresponding scheduling contexts.
	// TODO: This doesn't seem like the right approach.
	SchedulingContexts []*context.SchedulingContext
	// scheduling stats
	PerPoolSchedulingStats map[string]PerPoolSchedulingStats
}

SchedulerResult is returned by Rescheduler.Schedule().

type SchedulingAlgo

type SchedulingAlgo interface {
	// Schedule should assign jobs to nodes.
	// Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided.
	Schedule(*armadacontext.Context, *jobdb.Txn) (*SchedulerResult, error)
}

SchedulingAlgo is the interface between the Pulsar-backed scheduler and the algorithm deciding which jobs to schedule and preempt.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL