scheduler

package
v0.3.94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2023 License: Apache-2.0 Imports: 77 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NAMESPACE = "armada"
	SUBSYSTEM = "scheduler"
)

Variables

This section is empty.

Functions

func AppendEventSequencesFromPreemptedJobs added in v0.3.90

func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error)

func AppendEventSequencesFromScheduledJobs added in v0.3.90

func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error)

func EventsFromSchedulerResult added in v0.3.90

func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error)

EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.

func GangIdAndCardinalityFromAnnotations

func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, int, bool, error)

GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).

func GangIdAndCardinalityFromLegacySchedulerJob added in v0.3.47

func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, int, bool, error)

GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).

func JavaStringHash

func JavaStringHash(s string) uint32

JavaStringHash is the default hashing algorithm used by Pulsar copied from https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/hash.go

func JobsSummary added in v0.3.50

func JobsSummary(jobs []interfaces.LegacySchedulerJob) string

JobsSummary returns a string giving an overview of the provided jobs meant for logging. For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]".

func PreemptedJobsFromSchedulerResult added in v0.3.54

func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T

PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result, cast to type T.

func Run

Run sets up a Scheduler application and runs it until a SIGTERM is received

func ScheduledJobsFromSchedulerResult added in v0.3.54

func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T

ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result, cast to type T.

Types

type CandidateGangIterator

type CandidateGangIterator struct {
	// contains filtered or unexported fields
}

CandidateGangIterator determines which gang to try scheduling next across queues. Specifically, it yields the next gang in the queue with smallest fraction of its fair share, where the fraction of fair share computation includes the yielded gang.

func NewCandidateGangIterator added in v0.3.47

func NewCandidateGangIterator(
	queueRepository fairness.QueueRepository,
	fairnessCostProvider fairness.FairnessCostProvider,
	iteratorsByQueue map[string]*QueuedGangIterator,
) (*CandidateGangIterator, error)

func (*CandidateGangIterator) Clear added in v0.3.47

func (it *CandidateGangIterator) Clear() error

Clear removes the first item in the iterator. If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed.

func (*CandidateGangIterator) OnlyYieldEvicted added in v0.3.66

func (it *CandidateGangIterator) OnlyYieldEvicted()

func (*CandidateGangIterator) OnlyYieldEvictedForQueue added in v0.3.92

func (it *CandidateGangIterator) OnlyYieldEvictedForQueue(queue string)

func (*CandidateGangIterator) Peek added in v0.3.47

type DefaultPoolAssigner added in v0.3.54

type DefaultPoolAssigner struct {
	// contains filtered or unexported fields
}

func NewPoolAssigner added in v0.3.54

func NewPoolAssigner(executorTimeout time.Duration,
	schedulingConfig configuration.SchedulingConfig,
	executorRepository database.ExecutorRepository,
) (*DefaultPoolAssigner, error)

func (*DefaultPoolAssigner) AssignPool added in v0.3.54

func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error)

AssignPool returns the pool associated with the job or the empty string if no pool is valid

func (*DefaultPoolAssigner) Refresh added in v0.3.54

Refresh updates executor state

type Evictor added in v0.3.54

type Evictor struct {
	// contains filtered or unexported fields
}

func NewFilteredEvictor added in v0.3.56

func NewFilteredEvictor(
	jobRepo JobRepository,
	priorityClasses map[string]types.PriorityClass,
	nodeIdsToEvict map[string]bool,
	jobIdsToEvict map[string]bool,
) *Evictor

NewFilteredEvictor returns a new evictor that evicts all jobs for which jobIdsToEvict[jobId] is true on nodes for which nodeIdsToEvict[nodeId] is true.

func NewNodeEvictor added in v0.3.54

func NewNodeEvictor(
	jobRepo JobRepository,
	priorityClasses map[string]types.PriorityClass,
	perNodeEvictionProbability float64,
	jobFilter func(*armadacontext.Context, interfaces.LegacySchedulerJob) bool,
	random *rand.Rand,
) *Evictor

func NewOversubscribedEvictor added in v0.3.54

func NewOversubscribedEvictor(
	jobRepo JobRepository,
	priorityClasses map[string]types.PriorityClass,
	defaultPriorityClass string,
	perNodeEvictionProbability float64,
	random *rand.Rand,
) *Evictor

NewOversubscribedEvictor returns a new evictor that for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled with probability perNodeEvictionProbability.

func (*Evictor) Evict added in v0.3.54

Evict removes jobs from nodes, returning all affected jobs and nodes. Any node for which nodeFilter returns false is skipped. Any job for which jobFilter returns true is evicted (if the node was not skipped). If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.

type EvictorResult added in v0.3.54

type EvictorResult struct {
	// Map from job id to job, containing all evicted jobs.
	EvictedJobsById map[string]interfaces.LegacySchedulerJob
	// Map from node id to node, containing all nodes on which at least one job was evicted.
	AffectedNodesById map[string]*nodedb.Node
	// For each evicted job, maps the id of the job to the id of the node it was evicted from.
	NodeIdByJobId map[string]string
}

type ExecutorApi

type ExecutorApi struct {
	// contains filtered or unexported fields
}

ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler.

func NewExecutorApi

func NewExecutorApi(producer pulsar.Producer,
	jobRepository database.JobRepository,
	executorRepository database.ExecutorRepository,
	legacyExecutorRepository database.ExecutorRepository,
	allowedPriorities []int32,
	nodeIdLabel string,
	priorityClassNameOverride *string,
	maxPulsarMessageSizeBytes uint,
) (*ExecutorApi, error)

func (*ExecutorApi) LeaseJobRuns

func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error

LeaseJobRuns reconciles the state of the executor with that of the scheduler. Specifically it: 1. Stores job and capacity information received from the executor to make it available to the scheduler. 2. Notifies the executor if any of its jobs are no longer active, e.g., due to being preempted by the scheduler. 3. Transfers any jobs scheduled on this executor cluster that the executor don't already have.

func (*ExecutorApi) ReportEvents

func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error)

ReportEvents publishes all events to Pulsar. The events are compacted for more efficient publishing.

type FairSchedulingAlgo added in v0.3.63

type FairSchedulingAlgo struct {
	// contains filtered or unexported fields
}

FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler.

func NewFairSchedulingAlgo added in v0.3.63

func NewFairSchedulingAlgo(
	config configuration.SchedulingConfig,
	maxSchedulingDuration time.Duration,
	executorRepository database.ExecutorRepository,
	queueRepository database.QueueRepository,
	schedulingContextRepository *SchedulingContextRepository,
) (*FairSchedulingAlgo, error)

func (*FairSchedulingAlgo) Schedule added in v0.3.63

func (l *FairSchedulingAlgo) Schedule(
	ctx *armadacontext.Context,
	txn *jobdb.Txn,
	jobDb *jobdb.JobDb,
) (*SchedulerResult, error)

Schedule assigns jobs to nodes in the same way as the old lease call. It iterates over each executor in turn (using lexicographical order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor. It maintains state of which executors it has considered already and may take multiple Schedule() calls to consider all executors if scheduling is slow. Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.

type GangScheduler added in v0.3.63

type GangScheduler struct {
	// contains filtered or unexported fields
}

GangScheduler schedules one gang at a time. GangScheduler is not aware of queues.

func NewGangScheduler added in v0.3.63

func (*GangScheduler) Schedule added in v0.3.63

func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error)

func (*GangScheduler) SkipUnsuccessfulSchedulingKeyCheck added in v0.3.65

func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck()

type InMemoryJobIterator added in v0.3.50

type InMemoryJobIterator struct {
	// contains filtered or unexported fields
}

func NewInMemoryJobIterator added in v0.3.50

func NewInMemoryJobIterator[S ~[]E, E interfaces.LegacySchedulerJob](jobs S) *InMemoryJobIterator

func (*InMemoryJobIterator) Next added in v0.3.50

type InMemoryJobRepository added in v0.3.50

type InMemoryJobRepository struct {
	// contains filtered or unexported fields
}

func NewInMemoryJobRepository added in v0.3.50

func NewInMemoryJobRepository(priorityClasses map[string]types.PriorityClass) *InMemoryJobRepository

func (*InMemoryJobRepository) Enqueue added in v0.3.50

func (*InMemoryJobRepository) EnqueueMany added in v0.3.50

func (repo *InMemoryJobRepository) EnqueueMany(jobs []interfaces.LegacySchedulerJob)

func (*InMemoryJobRepository) GetExistingJobsByIds added in v0.3.50

func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]interfaces.LegacySchedulerJob, error)

func (*InMemoryJobRepository) GetJobIterator added in v0.3.50

func (repo *InMemoryJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) (JobIterator, error)

func (*InMemoryJobRepository) GetQueueJobIds added in v0.3.50

func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)

type JobIterator

type JobIterator interface {
	Next() (interfaces.LegacySchedulerJob, error)
}

type JobQueueIteratorAdapter added in v0.3.47

type JobQueueIteratorAdapter struct {
	// contains filtered or unexported fields
}

func (*JobQueueIteratorAdapter) Next added in v0.3.47

type JobRepository

type JobRepository interface {
	GetQueueJobIds(queueName string) ([]string, error)
	GetExistingJobsByIds(ids []string) ([]interfaces.LegacySchedulerJob, error)
}

type KubernetesLeaderController

type KubernetesLeaderController struct {
	// contains filtered or unexported fields
}

KubernetesLeaderController uses the Kubernetes leader election mechanism to determine who is leader. This allows multiple instances of the scheduler to be run for high availability.

TODO: Move into package in common.

func (*KubernetesLeaderController) GetLeaderReport added in v0.3.80

func (lc *KubernetesLeaderController) GetLeaderReport() LeaderReport

func (*KubernetesLeaderController) GetToken

func (lc *KubernetesLeaderController) GetToken() LeaderToken

func (*KubernetesLeaderController) RegisterListener added in v0.3.79

func (lc *KubernetesLeaderController) RegisterListener(listener LeaseListener)

func (*KubernetesLeaderController) Run

Run starts the controller. This is a blocking call that returns when the provided context is cancelled.

func (*KubernetesLeaderController) ValidateToken

func (lc *KubernetesLeaderController) ValidateToken(tok LeaderToken) bool

type LeaderClientConnectionProvider added in v0.3.80

type LeaderClientConnectionProvider interface {
	GetCurrentLeaderClientConnection() (bool, *grpc.ClientConn, error)
}

type LeaderConnectionProvider added in v0.3.80

type LeaderConnectionProvider struct {
	// contains filtered or unexported fields
}

func NewLeaderConnectionProvider added in v0.3.80

func NewLeaderConnectionProvider(leaderController LeaderController, leaderConfig configuration.LeaderConfig) *LeaderConnectionProvider

func (*LeaderConnectionProvider) GetCurrentLeaderClientConnection added in v0.3.80

func (l *LeaderConnectionProvider) GetCurrentLeaderClientConnection() (bool, *grpc.ClientConn, error)

type LeaderController

type LeaderController interface {
	// GetToken returns a LeaderToken which allows you to determine if you are leader or not
	GetToken() LeaderToken
	// ValidateToken allows a caller to determine whether a previously obtained token is still valid.
	// Returns true if the token is a leader and false otherwise
	ValidateToken(tok LeaderToken) bool
	// Run starts the controller.  This is a blocking call which will return when the provided context is cancelled
	Run(ctx *armadacontext.Context) error
	// GetLeaderReport returns a report about the current leader
	GetLeaderReport() LeaderReport
}

LeaderController is an interface to be implemented by structs that control which scheduler is leader

type LeaderProxyingSchedulingReportsServer added in v0.3.80

type LeaderProxyingSchedulingReportsServer struct {
	// contains filtered or unexported fields
}

func NewLeaderProxyingSchedulingReportsServer added in v0.3.80

func NewLeaderProxyingSchedulingReportsServer(
	schedulingReportsRepository schedulerobjects.SchedulerReportingServer,
	leaderClientProvider LeaderClientConnectionProvider,
) *LeaderProxyingSchedulingReportsServer

func (*LeaderProxyingSchedulingReportsServer) GetJobReport added in v0.3.80

func (*LeaderProxyingSchedulingReportsServer) GetQueueReport added in v0.3.80

func (*LeaderProxyingSchedulingReportsServer) GetSchedulingReport added in v0.3.80

type LeaderReport added in v0.3.80

type LeaderReport struct {
	IsCurrentProcessLeader bool
	LeaderName             string
}

type LeaderStatusMetricsCollector added in v0.3.79

type LeaderStatusMetricsCollector struct {
	// contains filtered or unexported fields
}

func NewLeaderStatusMetricsCollector added in v0.3.79

func NewLeaderStatusMetricsCollector(currentInstanceName string) *LeaderStatusMetricsCollector

func (*LeaderStatusMetricsCollector) Collect added in v0.3.79

func (l *LeaderStatusMetricsCollector) Collect(metrics chan<- prometheus.Metric)

func (*LeaderStatusMetricsCollector) Describe added in v0.3.79

func (l *LeaderStatusMetricsCollector) Describe(desc chan<- *prometheus.Desc)

type LeaderToken

type LeaderToken struct {
	// contains filtered or unexported fields
}

LeaderToken is a token handed out to schedulers which they can use to determine if they are leader

func InvalidLeaderToken

func InvalidLeaderToken() LeaderToken

InvalidLeaderToken returns a LeaderToken indicating this instance is not leader.

func NewLeaderToken

func NewLeaderToken() LeaderToken

NewLeaderToken returns a LeaderToken indicating this instance is the leader.

type LeaseListener

type LeaseListener interface {
	// contains filtered or unexported methods
}

LeaseListener allows clients to listen for lease events.

type MetricsCollector added in v0.3.54

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector is a Prometheus Collector that handles scheduler metrics. The metrics themselves are calculated asynchronously every refreshPeriod

func NewMetricsCollector added in v0.3.54

func NewMetricsCollector(
	jobDb *jobdb.JobDb,
	queueRepository database.QueueRepository,
	executorRepository database.ExecutorRepository,
	poolAssigner PoolAssigner,
	refreshPeriod time.Duration,
) *MetricsCollector

func (*MetricsCollector) Collect added in v0.3.54

func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric)

Collect returns the current state of all metrics of the collector.

func (*MetricsCollector) Describe added in v0.3.54

func (c *MetricsCollector) Describe(out chan<- *prometheus.Desc)

Describe returns all descriptions of the collector.

func (*MetricsCollector) Run added in v0.3.54

Run enters s a loop which updates the metrics every refreshPeriod until the supplied context is cancelled

type MinimalQueue added in v0.3.90

type MinimalQueue struct {
	// contains filtered or unexported fields
}

func (MinimalQueue) GetAllocation added in v0.3.90

func (q MinimalQueue) GetAllocation() schedulerobjects.ResourceList

func (MinimalQueue) GetWeight added in v0.3.90

func (q MinimalQueue) GetWeight() float64

type MinimalQueueRepository added in v0.3.90

type MinimalQueueRepository struct {
	// contains filtered or unexported fields
}

func NewMinimalQueueRepositoryFromSchedulingContext added in v0.3.90

func NewMinimalQueueRepositoryFromSchedulingContext(sctx *schedulercontext.SchedulingContext) *MinimalQueueRepository

func (*MinimalQueueRepository) GetQueue added in v0.3.90

func (qr *MinimalQueueRepository) GetQueue(name string) (fairness.Queue, bool)

type MultiJobsIterator added in v0.3.47

type MultiJobsIterator struct {
	// contains filtered or unexported fields
}

MultiJobsIterator chains several JobIterators together, emptying them in the order provided.

func NewMultiJobsIterator added in v0.3.47

func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator

func (*MultiJobsIterator) Next added in v0.3.47

type PoolAssigner added in v0.3.54

type PoolAssigner interface {
	Refresh(ctx *armadacontext.Context) error
	AssignPool(j *jobdb.Job) (string, error)
}

PoolAssigner allows jobs to be assigned to a pool Note that this is intended only for use with metrics calculation

type PreemptingQueueScheduler added in v0.3.63

type PreemptingQueueScheduler struct {
	// contains filtered or unexported fields
}

PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. Uses QueueScheduler as a building block.

func NewPreemptingQueueScheduler added in v0.3.63

func NewPreemptingQueueScheduler(
	sctx *schedulercontext.SchedulingContext,
	constraints schedulerconstraints.SchedulingConstraints,
	nodeEvictionProbability float64,
	nodeOversubscriptionEvictionProbability float64,
	protectedFractionOfFairShare float64,
	jobRepo JobRepository,
	nodeDb *nodedb.NodeDb,
	initialNodeIdByJobId map[string]string,
	initialJobIdsByGangId map[string]map[string]bool,
	initialGangIdByJobId map[string]string,
) *PreemptingQueueScheduler

func (*PreemptingQueueScheduler) EnableAssertions added in v0.3.63

func (sch *PreemptingQueueScheduler) EnableAssertions()

func (*PreemptingQueueScheduler) EnableNewPreemptionStrategy added in v0.3.90

func (sch *PreemptingQueueScheduler) EnableNewPreemptionStrategy()

func (*PreemptingQueueScheduler) Schedule added in v0.3.63

Schedule - preempts jobs belonging to queues with total allocation above their fair share and - schedules new jobs belonging to queues with total allocation less than their fair share.

func (*PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck added in v0.3.65

func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck()

type ProxyingSchedulingReportsServer added in v0.3.80

type ProxyingSchedulingReportsServer struct {
	// contains filtered or unexported fields
}

func NewProxyingSchedulingReportsServer added in v0.3.80

func NewProxyingSchedulingReportsServer(client schedulerobjects.SchedulerReportingClient) *ProxyingSchedulingReportsServer

func (*ProxyingSchedulingReportsServer) GetJobReport added in v0.3.80

func (*ProxyingSchedulingReportsServer) GetQueueReport added in v0.3.80

func (*ProxyingSchedulingReportsServer) GetSchedulingReport added in v0.3.80

type Publisher

type Publisher interface {
	// PublishMessages will publish the supplied messages. A LeaderToken is provided and the
	// implementor may decide whether to publish based on the status of this token
	PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error

	// PublishMarkers publishes a single marker message for each Pulsar partition.  Each marker
	// massage contains the supplied group id, which allows all marker messages for a given call
	// to be identified.  The uint32 returned is the number of messages published
	PublishMarkers(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error)
}

Publisher is an interface to be implemented by structs that handle publishing messages to pulsar

type PulsarPublisher

type PulsarPublisher struct {
	// contains filtered or unexported fields
}

PulsarPublisher is the default implementation of Publisher

func NewPulsarPublisher

func NewPulsarPublisher(
	pulsarClient pulsar.Client,
	producerOptions pulsar.ProducerOptions,
	pulsarSendTimeout time.Duration,
) (*PulsarPublisher, error)

func (*PulsarPublisher) PublishMarkers

func (p *PulsarPublisher) PublishMarkers(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error)

PublishMarkers sends one pulsar message (containing an armadaevents.PartitionMarker) to each partition of the producer's Pulsar topic.

func (*PulsarPublisher) PublishMessages

func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error

PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into single event sequences up to maxMessageBatchSize.

type QueueCandidateGangIteratorItem added in v0.3.47

type QueueCandidateGangIteratorItem struct {
	// contains filtered or unexported fields
}

type QueueCandidateGangIteratorPQ added in v0.3.47

type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem

Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.

func (QueueCandidateGangIteratorPQ) Len added in v0.3.47

func (QueueCandidateGangIteratorPQ) Less added in v0.3.47

func (pq QueueCandidateGangIteratorPQ) Less(i, j int) bool

func (*QueueCandidateGangIteratorPQ) Pop added in v0.3.47

func (*QueueCandidateGangIteratorPQ) Push added in v0.3.47

func (pq *QueueCandidateGangIteratorPQ) Push(x any)

func (QueueCandidateGangIteratorPQ) Swap added in v0.3.47

func (pq QueueCandidateGangIteratorPQ) Swap(i, j int)

type QueueScheduler added in v0.3.63

type QueueScheduler struct {
	// contains filtered or unexported fields
}

QueueScheduler is responsible for choosing the order in which to attempt scheduling queued gangs. Relies on GangScheduler for scheduling once a gang is chosen.

func NewQueueScheduler added in v0.3.63

func NewQueueScheduler(
	sctx *schedulercontext.SchedulingContext,
	constraints schedulerconstraints.SchedulingConstraints,
	nodeDb *nodedb.NodeDb,
	jobIteratorByQueue map[string]JobIterator,
) (*QueueScheduler, error)

func (*QueueScheduler) Schedule added in v0.3.63

func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error)

func (*QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck added in v0.3.65

func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck()

type QueuedGangIterator

type QueuedGangIterator struct {
	// contains filtered or unexported fields
}

QueuedGangIterator is an iterator over queued gangs. Each gang is yielded once its final member is received from the underlying iterator. Jobs without gangIdAnnotation are considered gangs of cardinality 1.

func NewQueuedGangIterator

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobIterator, maxLookback uint) *QueuedGangIterator

func (*QueuedGangIterator) Clear added in v0.3.47

func (it *QueuedGangIterator) Clear() error

func (*QueuedGangIterator) Next

func (*QueuedGangIterator) Peek added in v0.3.47

type QueuedJobsIterator

type QueuedJobsIterator struct {
	// contains filtered or unexported fields
}

QueuedJobsIterator is an iterator over all jobs in a queue. It lazily loads jobs in batches from Redis asynch.

func NewQueuedJobsIterator

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) (*QueuedJobsIterator, error)

func (*QueuedJobsIterator) Next

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the main Armada scheduler. It periodically performs the following cycle: 1. Update state from postgres (via the jobRepository). 2. Determine if leader and exit if not. 3. Generate any necessary events resulting from the state update. 4. Expire any jobs assigned to clusters that have timed out. 5. Schedule jobs. 6. Publish any Armada events resulting from the scheduling cycle.

func NewScheduler

func NewScheduler(
	jobRepository database.JobRepository,
	executorRepository database.ExecutorRepository,
	schedulingAlgo SchedulingAlgo,
	leaderController LeaderController,
	publisher Publisher,
	stringInterner *stringinterner.StringInterner,
	submitChecker SubmitScheduleChecker,
	cyclePeriod time.Duration,
	schedulePeriod time.Duration,
	executorTimeout time.Duration,
	maxAttemptedRuns uint,
	nodeIdLabel string,
	schedulerMetrics *SchedulerMetrics,
) (*Scheduler, error)

func (*Scheduler) Run

func (s *Scheduler) Run(ctx *armadacontext.Context) error

Run enters the scheduling loop, which will continue until ctx is cancelled.

type SchedulerJobRepositoryAdapter added in v0.3.90

type SchedulerJobRepositoryAdapter struct {
	// contains filtered or unexported fields
}

Adapter to make jobDb implement the JobRepository interface.

TODO: Pass JobDb into the scheduler instead of using this shim to convert to a JobRepo.

func NewSchedulerJobRepositoryAdapter added in v0.3.90

func NewSchedulerJobRepositoryAdapter(db *jobdb.JobDb, txn *jobdb.Txn) *SchedulerJobRepositoryAdapter

func (*SchedulerJobRepositoryAdapter) GetExistingJobsByIds added in v0.3.90

func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([]interfaces.LegacySchedulerJob, error)

GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the old to new scheduler.

func (*SchedulerJobRepositoryAdapter) GetQueueJobIds added in v0.3.90

func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) ([]string, error)

GetQueueJobIds is necessary to implement the JobRepository interface, which we need while transitioning from the old to new scheduler.

type SchedulerMetrics added in v0.3.90

type SchedulerMetrics struct {
	// contains filtered or unexported fields
}

func NewSchedulerMetrics added in v0.3.91

func NewSchedulerMetrics(config configuration.SchedulerMetricsConfig) *SchedulerMetrics

func (*SchedulerMetrics) ReportReconcileCycleTime added in v0.3.90

func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duration)

func (*SchedulerMetrics) ReportScheduleCycleTime added in v0.3.90

func (metrics *SchedulerMetrics) ReportScheduleCycleTime(cycleTime time.Duration)

func (*SchedulerMetrics) ReportSchedulerResult added in v0.3.90

func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result SchedulerResult)

func (*SchedulerMetrics) ResetGaugeMetrics added in v0.3.92

func (metrics *SchedulerMetrics) ResetGaugeMetrics()

type SchedulerResult added in v0.3.54

type SchedulerResult struct {
	// Whether the scheduler failed to create a result for some reason
	EmptyResult bool
	// Running jobs that should be preempted.
	PreemptedJobs []interfaces.LegacySchedulerJob
	// Queued jobs that should be scheduled.
	ScheduledJobs []interfaces.LegacySchedulerJob
	// For each preempted job, maps the job id to the id of the node on which the job was running.
	// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
	NodeIdByJobId map[string]string
	// The Scheduling Context. Being passed up for metrics decisions made in scheduler.go and scheduler_metrics.go.
	// Passing a pointer as the structure is enormous
	SchedulingContexts []*schedulercontext.SchedulingContext
}

SchedulerResult is returned by Rescheduler.Schedule().

func NewSchedulerResult added in v0.3.54

func NewSchedulerResult[S ~[]T, T interfaces.LegacySchedulerJob](
	preemptedJobs S,
	scheduledJobs S,
	nodeIdByJobId map[string]string,
) *SchedulerResult

type SchedulingAlgo

type SchedulingAlgo interface {
	// Schedule should assign jobs to nodes.
	// Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided.
	Schedule(ctx *armadacontext.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error)
}

SchedulingAlgo is the interface between the Pulsar-backed scheduler and the algorithm deciding which jobs to schedule and preempt.

type SchedulingContextByExecutor added in v0.3.62

type SchedulingContextByExecutor map[string]*schedulercontext.SchedulingContext

func (SchedulingContextByExecutor) String added in v0.3.62

type SchedulingContextRepository added in v0.3.62

type SchedulingContextRepository struct {
	// contains filtered or unexported fields
}

SchedulingContextRepository stores scheduling contexts associated with recent scheduling attempts. On adding a context, a map is cloned, then mutated, and then swapped for the previous map using atomic pointers. Hence, reads concurrent with writes are safe and don't need locking. A mutex protects against concurrent writes.

func NewSchedulingContextRepository added in v0.3.62

func NewSchedulingContextRepository(jobCacheSize uint) (*SchedulingContextRepository, error)

func (*SchedulingContextRepository) AddSchedulingContext added in v0.3.62

func (repo *SchedulingContextRepository) AddSchedulingContext(sctx *schedulercontext.SchedulingContext) error

AddSchedulingContext adds a scheduling context to the repo. It also extracts the queue and job scheduling contexts it contains and stores those separately.

It's safe to call this method concurrently with itself and with methods getting contexts from the repo. It's not safe to mutate contexts once they've been provided to this method.

Job contexts are stored first, then queue contexts, and finally the scheduling context itself. This avoids having a stored scheduling (queue) context referring to a queue (job) context that isn't stored yet.

func (*SchedulingContextRepository) GetJobReport added in v0.3.62

GetJobReport is a gRPC endpoint for querying job reports. TODO: Further separate this from internal contexts.

func (*SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutor added in v0.3.71

func (repo *SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutor() SchedulingContextByExecutor

func (*SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutorForQueue added in v0.3.75

func (repo *SchedulingContextRepository) GetMostRecentPreemptingSchedulingContextByExecutorForQueue(queue string) (SchedulingContextByExecutor, bool)

func (*SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor added in v0.3.62

func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor() SchedulingContextByExecutor

func (*SchedulingContextRepository) GetMostRecentSchedulingContextByExecutorForJob added in v0.3.75

func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutorForJob(jobId string) (SchedulingContextByExecutor, bool)

func (*SchedulingContextRepository) GetMostRecentSchedulingContextByExecutorForQueue added in v0.3.75

func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutorForQueue(queue string) (SchedulingContextByExecutor, bool)

func (*SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutor added in v0.3.62

func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutor() SchedulingContextByExecutor

func (*SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutorForQueue added in v0.3.75

func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutorForQueue(queue string) (SchedulingContextByExecutor, bool)

func (*SchedulingContextRepository) GetQueueReport added in v0.3.62

GetQueueReport is a gRPC endpoint for querying queue reports. TODO: Further separate this from internal contexts.

func (*SchedulingContextRepository) GetSchedulingReport added in v0.3.62

GetSchedulingReport is a gRPC endpoint for querying scheduler reports. TODO: Further separate this from internal contexts.

func (*SchedulingContextRepository) GetSortedExecutorIds added in v0.3.62

func (repo *SchedulingContextRepository) GetSortedExecutorIds() []string

type StandaloneLeaderController

type StandaloneLeaderController struct {
	// contains filtered or unexported fields
}

StandaloneLeaderController returns a token that always indicates you are leader This can be used when only a single instance of the scheduler is needed

func NewStandaloneLeaderController

func NewStandaloneLeaderController() *StandaloneLeaderController

func (*StandaloneLeaderController) GetLeaderReport added in v0.3.80

func (lc *StandaloneLeaderController) GetLeaderReport() LeaderReport

func (*StandaloneLeaderController) GetToken

func (lc *StandaloneLeaderController) GetToken() LeaderToken

func (*StandaloneLeaderController) Run added in v0.3.47

func (*StandaloneLeaderController) ValidateToken

func (lc *StandaloneLeaderController) ValidateToken(tok LeaderToken) bool

type SubmitChecker

type SubmitChecker struct {
	ExecutorUpdateFrequency time.Duration
	// contains filtered or unexported fields
}

func NewSubmitChecker

func NewSubmitChecker(
	executorTimeout time.Duration,
	schedulingConfig configuration.SchedulingConfig,
	executorRepository database.ExecutorRepository,
) *SubmitChecker

func (*SubmitChecker) CheckApiJobs

func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string)

func (*SubmitChecker) CheckJobDbJobs added in v0.3.78

func (srv *SubmitChecker) CheckJobDbJobs(jobs []*jobdb.Job) (bool, string)

func (*SubmitChecker) Run added in v0.3.49

func (srv *SubmitChecker) Run(ctx *armadacontext.Context) error

type SubmitScheduleChecker added in v0.3.63

type SubmitScheduleChecker interface {
	CheckApiJobs(jobs []*api.Job) (bool, string)
	CheckJobDbJobs(jobs []*jobdb.Job) (bool, string)
}

Directories

Path Synopsis
kubernetesobjects
Package schedulermocks is a generated GoMock package.
Package schedulermocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL