Documentation ¶
Index ¶
- Constants
- func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, ...) ([]*armadaevents.EventSequence, error)
- func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, ...) ([]*armadaevents.EventSequence, error)
- func AppendEventSequencesFromUnschedulableJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, ...) ([]*armadaevents.EventSequence, error)
- func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error)
- func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
- func JavaStringHash(s string) uint32
- func JobsSummary(jctxs []*schedulercontext.JobSchedulingContext) string
- func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
- func Run(config schedulerconfig.Configuration) error
- func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
- type CandidateGangIterator
- type DefaultPoolAssigner
- type Evictor
- type EvictorResult
- type ExecutorApi
- type FairSchedulingAlgo
- type GangScheduler
- type InMemoryJobIterator
- type InMemoryJobRepository
- func (repo *InMemoryJobRepository) EnqueueMany(jctxs []*schedulercontext.JobSchedulingContext)
- func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]interfaces.LegacySchedulerJob, error)
- func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator
- func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)
- type JobIterator
- type JobQueueIteratorAdapter
- type JobRepository
- type MetricsCollector
- type MinimalQueue
- type MinimalQueueRepository
- type MultiJobsIterator
- type PoolAssigner
- type PreemptingQueueScheduler
- type Publisher
- type PulsarPublisher
- type QueueCandidateGangIteratorItem
- type QueueCandidateGangIteratorPQ
- type QueueScheduler
- type QueuedGangIterator
- type QueuedJobsIterator
- type Scheduler
- type SchedulerJobRepositoryAdapter
- type SchedulerMetrics
- func (m *SchedulerMetrics) Collect(metrics chan<- prometheus.Metric)
- func (m *SchedulerMetrics) Describe(desc chan<- *prometheus.Desc)
- func (m *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duration)
- func (m *SchedulerMetrics) ReportScheduleCycleTime(cycleTime time.Duration)
- func (m *SchedulerMetrics) ReportSchedulerResult(result SchedulerResult)
- type SchedulerResult
- type SchedulingAlgo
- type SubmitChecker
- type SubmitScheduleChecker
Constants ¶
const ( NAMESPACE = "armada" SUBSYSTEM = "scheduler" )
Variables ¶
This section is empty.
Functions ¶
func AppendEventSequencesFromPreemptedJobs ¶ added in v0.3.90
func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error)
func AppendEventSequencesFromScheduledJobs ¶ added in v0.3.90
func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext, additionalAnnotationsByJobId map[string]map[string]string) ([]*armadaevents.EventSequence, error)
func AppendEventSequencesFromUnschedulableJobs ¶ added in v0.3.95
func AppendEventSequencesFromUnschedulableJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error)
func EventsFromSchedulerResult ¶ added in v0.3.90
func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error)
EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.
func FailedJobsFromSchedulerResult ¶ added in v0.3.95
func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
FailedJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T.
func JavaStringHash ¶
JavaStringHash is the default hashing algorithm used by Pulsar copied from https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/hash.go
func JobsSummary ¶ added in v0.3.50
func JobsSummary(jctxs []*schedulercontext.JobSchedulingContext) string
JobsSummary returns a string giving an overview of the provided jobs meant for logging. For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]".
func PreemptedJobsFromSchedulerResult ¶ added in v0.3.54
func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result cast to type T.
func Run ¶
func Run(config schedulerconfig.Configuration) error
Run sets up a Scheduler application and runs it until a SIGTERM is received
func ScheduledJobsFromSchedulerResult ¶ added in v0.3.54
func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T
ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T.
Types ¶
type CandidateGangIterator ¶
type CandidateGangIterator struct {
// contains filtered or unexported fields
}
CandidateGangIterator determines which gang to try scheduling next across queues. Specifically, it yields the next gang in the queue with smallest fraction of its fair share, where the fraction of fair share computation includes the yielded gang.
func NewCandidateGangIterator ¶ added in v0.3.47
func NewCandidateGangIterator( queueRepository fairness.QueueRepository, fairnessCostProvider fairness.FairnessCostProvider, iteratorsByQueue map[string]*QueuedGangIterator, ) (*CandidateGangIterator, error)
func (*CandidateGangIterator) Clear ¶ added in v0.3.47
func (it *CandidateGangIterator) Clear() error
Clear removes the first item in the iterator. If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed.
func (*CandidateGangIterator) OnlyYieldEvicted ¶ added in v0.3.66
func (it *CandidateGangIterator) OnlyYieldEvicted()
func (*CandidateGangIterator) OnlyYieldEvictedForQueue ¶ added in v0.3.92
func (it *CandidateGangIterator) OnlyYieldEvictedForQueue(queue string)
func (*CandidateGangIterator) Peek ¶ added in v0.3.47
func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error)
type DefaultPoolAssigner ¶ added in v0.3.54
type DefaultPoolAssigner struct {
// contains filtered or unexported fields
}
func NewPoolAssigner ¶ added in v0.3.54
func NewPoolAssigner(executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, ) (*DefaultPoolAssigner, error)
func (*DefaultPoolAssigner) AssignPool ¶ added in v0.3.54
func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error)
AssignPool returns the pool associated with the job or the empty string if no pool is valid
func (*DefaultPoolAssigner) Refresh ¶ added in v0.3.54
func (p *DefaultPoolAssigner) Refresh(ctx *armadacontext.Context) error
Refresh updates executor state
type Evictor ¶ added in v0.3.54
type Evictor struct {
// contains filtered or unexported fields
}
func NewFilteredEvictor ¶ added in v0.3.56
func NewFilteredEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, priorityClasses map[string]types.PriorityClass, nodeIdsToEvict map[string]bool, jobIdsToEvict map[string]bool, ) *Evictor
NewFilteredEvictor returns a new evictor that evicts all jobs for which jobIdsToEvict[jobId] is true on nodes for which nodeIdsToEvict[nodeId] is true.
func NewNodeEvictor ¶ added in v0.3.54
func NewNodeEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, priorityClasses map[string]types.PriorityClass, perNodeEvictionProbability float64, jobFilter func(*armadacontext.Context, interfaces.LegacySchedulerJob) bool, random *rand.Rand, ) *Evictor
func NewOversubscribedEvictor ¶ added in v0.3.54
func NewOversubscribedEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, priorityClasses map[string]types.PriorityClass, defaultPriorityClassName string, perNodeEvictionProbability float64, random *rand.Rand, ) *Evictor
NewOversubscribedEvictor returns a new evictor that for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled with probability perNodeEvictionProbability.
func (*Evictor) Evict ¶ added in v0.3.54
func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*EvictorResult, error)
Evict removes jobs from nodes, returning all affected jobs and nodes. Any node for which nodeFilter returns false is skipped. Any job for which jobFilter returns true is evicted (if the node was not skipped). If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.
type EvictorResult ¶ added in v0.3.54
type EvictorResult struct { // For all evicted jobs, map from job id to the scheduling context for re-scheduling that job. EvictedJctxsByJobId map[string]*schedulercontext.JobSchedulingContext // Map from node id to node, containing all nodes on which at least one job was evicted. AffectedNodesById map[string]*nodedb.Node // For each evicted job, maps the id of the job to the id of the node it was evicted from. NodeIdByJobId map[string]string }
type ExecutorApi ¶
type ExecutorApi struct {
// contains filtered or unexported fields
}
ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler.
func NewExecutorApi ¶
func NewExecutorApi(producer pulsar.Producer, jobRepository database.JobRepository, executorRepository database.ExecutorRepository, legacyExecutorRepository database.ExecutorRepository, allowedPriorities []int32, nodeIdLabel string, priorityClassNameOverride *string, maxPulsarMessageSizeBytes uint, ) (*ExecutorApi, error)
func (*ExecutorApi) LeaseJobRuns ¶
func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error
LeaseJobRuns reconciles the state of the executor with that of the scheduler. Specifically it: 1. Stores job and capacity information received from the executor to make it available to the scheduler. 2. Notifies the executor if any of its jobs are no longer active, e.g., due to being preempted by the scheduler. 3. Transfers any jobs scheduled on this executor cluster that the executor don't already have.
func (*ExecutorApi) ReportEvents ¶
func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error)
ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing.
type FairSchedulingAlgo ¶ added in v0.3.63
type FairSchedulingAlgo struct {
// contains filtered or unexported fields
}
FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler.
func NewFairSchedulingAlgo ¶ added in v0.3.63
func NewFairSchedulingAlgo( config configuration.SchedulingConfig, maxSchedulingDuration time.Duration, executorRepository database.ExecutorRepository, queueRepository repository.QueueRepository, schedulingContextRepository *reports.SchedulingContextRepository, nodeQuarantiner *quarantine.NodeQuarantiner, queueQuarantiner *quarantine.QueueQuarantiner, ) (*FairSchedulingAlgo, error)
func (*FairSchedulingAlgo) Schedule ¶ added in v0.3.63
func (l *FairSchedulingAlgo) Schedule( ctx *armadacontext.Context, txn *jobdb.Txn, ) (*SchedulerResult, error)
Schedule assigns jobs to nodes in the same way as the old lease call. It iterates over each executor in turn (using lexicographical order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor. It maintains state of which executors it has considered already and may take multiple Schedule() calls to consider all executors if scheduling is slow. Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.
type GangScheduler ¶ added in v0.3.63
type GangScheduler struct {
// contains filtered or unexported fields
}
GangScheduler schedules one gang at a time. GangScheduler is not aware of queues.
func NewGangScheduler ¶ added in v0.3.63
func NewGangScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, nodeDb *nodedb.NodeDb, ) (*GangScheduler, error)
func (*GangScheduler) Schedule ¶ added in v0.3.63
func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error)
func (*GangScheduler) SkipUnsuccessfulSchedulingKeyCheck ¶ added in v0.3.65
func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck()
type InMemoryJobIterator ¶ added in v0.3.50
type InMemoryJobIterator struct {
// contains filtered or unexported fields
}
func NewInMemoryJobIterator ¶ added in v0.3.50
func NewInMemoryJobIterator(jctxs []*schedulercontext.JobSchedulingContext) *InMemoryJobIterator
func (*InMemoryJobIterator) Next ¶ added in v0.3.50
func (it *InMemoryJobIterator) Next() (*schedulercontext.JobSchedulingContext, error)
type InMemoryJobRepository ¶ added in v0.3.50
type InMemoryJobRepository struct {
// contains filtered or unexported fields
}
func NewInMemoryJobRepository ¶ added in v0.3.50
func NewInMemoryJobRepository() *InMemoryJobRepository
func (*InMemoryJobRepository) EnqueueMany ¶ added in v0.3.50
func (repo *InMemoryJobRepository) EnqueueMany(jctxs []*schedulercontext.JobSchedulingContext)
func (*InMemoryJobRepository) GetExistingJobsByIds ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]interfaces.LegacySchedulerJob, error)
Should only be used in testing.
func (*InMemoryJobRepository) GetJobIterator ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator
func (*InMemoryJobRepository) GetQueueJobIds ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)
Should only be used in testing.
type JobIterator ¶
type JobIterator interface {
Next() (*schedulercontext.JobSchedulingContext, error)
}
type JobQueueIteratorAdapter ¶ added in v0.3.47
type JobQueueIteratorAdapter struct {
// contains filtered or unexported fields
}
func (*JobQueueIteratorAdapter) Next ¶ added in v0.3.47
func (it *JobQueueIteratorAdapter) Next() (interfaces.LegacySchedulerJob, error)
type JobRepository ¶
type JobRepository interface { GetQueueJobIds(queueName string) ([]string, error) GetExistingJobsByIds(ids []string) ([]interfaces.LegacySchedulerJob, error) }
type MetricsCollector ¶ added in v0.3.54
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector is a Prometheus Collector that handles scheduler metrics. The metrics themselves are calculated asynchronously every refreshPeriod
func NewMetricsCollector ¶ added in v0.3.54
func NewMetricsCollector( jobDb *jobdb.JobDb, queueRepository repository.QueueRepository, executorRepository database.ExecutorRepository, poolAssigner PoolAssigner, refreshPeriod time.Duration, ) *MetricsCollector
func (*MetricsCollector) Collect ¶ added in v0.3.54
func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric)
Collect returns the current state of all metrics of the collector.
func (*MetricsCollector) Describe ¶ added in v0.3.54
func (c *MetricsCollector) Describe(out chan<- *prometheus.Desc)
Describe returns all descriptions of the collector.
func (*MetricsCollector) Run ¶ added in v0.3.54
func (c *MetricsCollector) Run(ctx *armadacontext.Context) error
Run enters s a loop which updates the metrics every refreshPeriod until the supplied context is cancelled
type MinimalQueue ¶ added in v0.3.90
type MinimalQueue struct {
// contains filtered or unexported fields
}
func (MinimalQueue) GetAllocation ¶ added in v0.3.90
func (q MinimalQueue) GetAllocation() schedulerobjects.ResourceList
func (MinimalQueue) GetWeight ¶ added in v0.3.90
func (q MinimalQueue) GetWeight() float64
type MinimalQueueRepository ¶ added in v0.3.90
type MinimalQueueRepository struct {
// contains filtered or unexported fields
}
func NewMinimalQueueRepositoryFromSchedulingContext ¶ added in v0.3.90
func NewMinimalQueueRepositoryFromSchedulingContext(sctx *schedulercontext.SchedulingContext) *MinimalQueueRepository
type MultiJobsIterator ¶ added in v0.3.47
type MultiJobsIterator struct {
// contains filtered or unexported fields
}
MultiJobsIterator chains several JobIterators together in the order provided.
func NewMultiJobsIterator ¶ added in v0.3.47
func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator
func (*MultiJobsIterator) Next ¶ added in v0.3.47
func (it *MultiJobsIterator) Next() (*schedulercontext.JobSchedulingContext, error)
type PoolAssigner ¶ added in v0.3.54
type PoolAssigner interface { Refresh(ctx *armadacontext.Context) error AssignPool(j *jobdb.Job) (string, error) }
PoolAssigner allows jobs to be assigned to a pool Note that this is intended only for use with metrics calculation
type PreemptingQueueScheduler ¶ added in v0.3.63
type PreemptingQueueScheduler struct {
// contains filtered or unexported fields
}
PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. Uses QueueScheduler as a building block.
func NewPreemptingQueueScheduler ¶ added in v0.3.63
func NewPreemptingQueueScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, nodeEvictionProbability float64, nodeOversubscriptionEvictionProbability float64, protectedFractionOfFairShare float64, jobRepo JobRepository, nodeDb *nodedb.NodeDb, initialNodeIdByJobId map[string]string, initialJobIdsByGangId map[string]map[string]bool, initialGangIdByJobId map[string]string, ) *PreemptingQueueScheduler
func (*PreemptingQueueScheduler) EnableAssertions ¶ added in v0.3.63
func (sch *PreemptingQueueScheduler) EnableAssertions()
func (*PreemptingQueueScheduler) Schedule ¶ added in v0.3.63
func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error)
Schedule - preempts jobs belonging to queues with total allocation above their fair share and - schedules new jobs belonging to queues with total allocation less than their fair share.
func (*PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck ¶ added in v0.3.65
func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck()
type Publisher ¶
type Publisher interface { // PublishMessages will publish the supplied messages. A LeaderToken is provided and the // implementor may decide whether to publish based on the status of this token PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error // PublishMarkers publishes a single marker message for each Pulsar partition. Each marker // massage contains the supplied group id, which allows all marker messages for a given call // to be identified. The uint32 returned is the number of messages published PublishMarkers(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error) }
Publisher is an interface to be implemented by structs that handle publishing messages to pulsar
type PulsarPublisher ¶
type PulsarPublisher struct {
// contains filtered or unexported fields
}
PulsarPublisher is the default implementation of Publisher
func NewPulsarPublisher ¶
func NewPulsarPublisher( pulsarClient pulsar.Client, producerOptions pulsar.ProducerOptions, pulsarSendTimeout time.Duration, ) (*PulsarPublisher, error)
func (*PulsarPublisher) PublishMarkers ¶
func (p *PulsarPublisher) PublishMarkers(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error)
PublishMarkers sends one pulsar message (containing an armadaevents.PartitionMarker) to each partition of the producer's Pulsar topic.
func (*PulsarPublisher) PublishMessages ¶
func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error
PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into single event sequences up to maxMessageBatchSize.
type QueueCandidateGangIteratorItem ¶ added in v0.3.47
type QueueCandidateGangIteratorItem struct {
// contains filtered or unexported fields
}
type QueueCandidateGangIteratorPQ ¶ added in v0.3.47
type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem
Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.
func (QueueCandidateGangIteratorPQ) Len ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Len() int
func (QueueCandidateGangIteratorPQ) Less ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Less(i, j int) bool
func (*QueueCandidateGangIteratorPQ) Pop ¶ added in v0.3.47
func (pq *QueueCandidateGangIteratorPQ) Pop() any
func (*QueueCandidateGangIteratorPQ) Push ¶ added in v0.3.47
func (pq *QueueCandidateGangIteratorPQ) Push(x any)
func (QueueCandidateGangIteratorPQ) Swap ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Swap(i, j int)
type QueueScheduler ¶ added in v0.3.63
type QueueScheduler struct {
// contains filtered or unexported fields
}
QueueScheduler is responsible for choosing the order in which to attempt scheduling queued gangs. Relies on GangScheduler for scheduling once a gang is chosen.
func NewQueueScheduler ¶ added in v0.3.63
func NewQueueScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobIterator, ) (*QueueScheduler, error)
func (*QueueScheduler) Schedule ¶ added in v0.3.63
func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error)
func (*QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck ¶ added in v0.3.65
func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck()
type QueuedGangIterator ¶
type QueuedGangIterator struct {
// contains filtered or unexported fields
}
QueuedGangIterator is an iterator over queued gangs. Each gang is yielded once its final member is received from the underlying iterator. Jobs without gangIdAnnotation are considered gangs of cardinality 1.
func NewQueuedGangIterator ¶
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator
func (*QueuedGangIterator) Clear ¶ added in v0.3.47
func (it *QueuedGangIterator) Clear() error
func (*QueuedGangIterator) Next ¶
func (it *QueuedGangIterator) Next() (*schedulercontext.GangSchedulingContext, error)
func (*QueuedGangIterator) Peek ¶ added in v0.3.47
func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error)
type QueuedJobsIterator ¶
type QueuedJobsIterator struct {
// contains filtered or unexported fields
}
QueuedJobsIterator is an iterator over all jobs in a queue. It loads jobs asynchronously in batches from the underlying database. This is necessary for good performance when jobs are stored in Redis.
func NewQueuedJobsIterator ¶
func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository, priorityClasses map[string]types.PriorityClass) (*QueuedJobsIterator, error)
func (*QueuedJobsIterator) Next ¶
func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, error)
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the main Armada scheduler. It periodically performs the following cycle: 1. Update state from postgres (via the jobRepository). 2. Determine if leader and exit if not. 3. Generate any necessary eventSequences resulting from the state update. 4. Expire any jobs assigned to clusters that have timed out. 5. Schedule jobs. 6. Publish any Armada eventSequences resulting from the scheduling cycle.
func NewScheduler ¶
func NewScheduler( jobDb *jobdb.JobDb, jobRepository database.JobRepository, executorRepository database.ExecutorRepository, schedulingAlgo SchedulingAlgo, leaderController leader.LeaderController, publisher Publisher, submitChecker SubmitScheduleChecker, cyclePeriod time.Duration, schedulePeriod time.Duration, executorTimeout time.Duration, maxAttemptedRuns uint, nodeIdLabel string, metrics *SchedulerMetrics, schedulerMetrics *metrics.Metrics, failureEstimator *failureestimator.FailureEstimator, ) (*Scheduler, error)
func (*Scheduler) EnableAssertions ¶ added in v0.4.12
func (s *Scheduler) EnableAssertions()
type SchedulerJobRepositoryAdapter ¶ added in v0.3.90
type SchedulerJobRepositoryAdapter struct {
// contains filtered or unexported fields
}
Adapter to make jobDb implement the JobRepository interface.
TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo.
func NewSchedulerJobRepositoryAdapter ¶ added in v0.3.90
func NewSchedulerJobRepositoryAdapter(txn *jobdb.Txn) *SchedulerJobRepositoryAdapter
func (*SchedulerJobRepositoryAdapter) GetExistingJobsByIds ¶ added in v0.3.90
func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]interfaces.LegacySchedulerJob, error)
GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the old to new scheduler.
func (*SchedulerJobRepositoryAdapter) GetQueueJobIds ¶ added in v0.3.90
func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error)
GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old to new scheduler.
type SchedulerMetrics ¶ added in v0.3.90
type SchedulerMetrics struct {
// contains filtered or unexported fields
}
func NewSchedulerMetrics ¶ added in v0.3.91
func NewSchedulerMetrics(config configuration.SchedulerMetricsConfig) *SchedulerMetrics
func (*SchedulerMetrics) Collect ¶ added in v0.4.20
func (m *SchedulerMetrics) Collect(metrics chan<- prometheus.Metric)
func (*SchedulerMetrics) Describe ¶ added in v0.4.20
func (m *SchedulerMetrics) Describe(desc chan<- *prometheus.Desc)
func (*SchedulerMetrics) ReportReconcileCycleTime ¶ added in v0.3.90
func (m *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duration)
func (*SchedulerMetrics) ReportScheduleCycleTime ¶ added in v0.3.90
func (m *SchedulerMetrics) ReportScheduleCycleTime(cycleTime time.Duration)
func (*SchedulerMetrics) ReportSchedulerResult ¶ added in v0.3.90
func (m *SchedulerMetrics) ReportSchedulerResult(result SchedulerResult)
type SchedulerResult ¶ added in v0.3.54
type SchedulerResult struct { // Running jobs that should be preempted. PreemptedJobs []*schedulercontext.JobSchedulingContext // Queued jobs that should be scheduled. ScheduledJobs []*schedulercontext.JobSchedulingContext // Queued jobs that could not be scheduled. // This is used to fail jobs that could not schedule above `minimumGangCardinality`. FailedJobs []*schedulercontext.JobSchedulingContext // For each preempted job, maps the job id to the id of the node on which the job was running. // For each scheduled job, maps the job id to the id of the node on which the job should be scheduled. NodeIdByJobId map[string]string // Each result may bundle the result of several scheduling decisions. // These are the corresponding scheduling contexts. // TODO: This doesn't seem like the right approach. SchedulingContexts []*schedulercontext.SchedulingContext // Additional annotations to be appended to the PodSpec. // Format: JobId -> AnnotationName -> AnnotationValue. AdditionalAnnotationsByJobId map[string]map[string]string }
SchedulerResult is returned by Rescheduler.Schedule().
type SchedulingAlgo ¶
type SchedulingAlgo interface { // Schedule should assign jobs to nodes. // Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided. Schedule(*armadacontext.Context, *jobdb.Txn) (*SchedulerResult, error) }
SchedulingAlgo is the interface between the Pulsar-backed scheduler and the algorithm deciding which jobs to schedule and preempt.
type SubmitChecker ¶
type SubmitChecker struct { ExecutorUpdateFrequency time.Duration // contains filtered or unexported fields }
func NewSubmitChecker ¶
func NewSubmitChecker( executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, ) *SubmitChecker
func (*SubmitChecker) CheckApiJobs ¶
func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string)
func (*SubmitChecker) CheckJobDbJobs ¶ added in v0.3.78
func (srv *SubmitChecker) CheckJobDbJobs(jobs []*jobdb.Job) (bool, string)
func (*SubmitChecker) Run ¶ added in v0.3.49
func (srv *SubmitChecker) Run(ctx *armadacontext.Context) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
kubernetesobjects
|
|
Package schedulermocks is a generated GoMock package.
|
Package schedulermocks is a generated GoMock package. |