Documentation ¶
Index ¶
- Constants
- Variables
- func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
- func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
- func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, bool, error)
- func GangIdAndCardinalityFromLegacySchedulerJob(job LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (string, int, bool, error)
- func GroupJobsByAnnotation(annotation string, jobs []*api.Job) map[string][]*api.Job
- func JavaStringHash(s string) uint32
- func JobIdFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)
- func JobsSummary(jobs []LegacySchedulerJob) string
- func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*schedulerobjects.Node, map[string]*schedulerobjects.Node, error)
- func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements
- func PodRequirementFromLegacySchedulerJob[E LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements
- func PodRequirementsFromJobSchedulingInfos(infos []*schedulerobjects.JobSchedulingInfo) []*schedulerobjects.PodRequirements
- func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements
- func PreemptedJobsFromSchedulerResult[T LegacySchedulerJob](sr *SchedulerResult) []T
- func QueueFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)
- func ResourceListAsWeightedApproximateFloat64(resourceScarcity map[string]float64, rl schedulerobjects.ResourceList) float64
- func Run(config Configuration) error
- func ScheduledJobsFromSchedulerResult[T LegacySchedulerJob](sr *SchedulerResult) []T
- func UnbindPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
- func UnbindPodsFromNode(reqs []*schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
- func UpdateUsage[S ~[]E, E LegacySchedulerJob](usage map[string]schedulerobjects.QuantityByPriorityAndResourceType, jobs S, ...) map[string]schedulerobjects.QuantityByPriorityAndResourceType
- type AddOrSubtract
- type CandidateGangIterator
- type Configuration
- type DefaultPoolAssigner
- type Evictor
- func NewFilteredEvictor(jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, ...) *Evictor
- func NewOversubscribedEvictor(jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, ...) *Evictor
- func NewPreemptibleEvictor(jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, ...) *Evictor
- func NewStochasticEvictor(jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, ...) *Evictor
- type EvictorResult
- type ExecutorApi
- type InMemoryJobIterator
- type InMemoryJobRepository
- func (repo *InMemoryJobRepository) Enqueue(job LegacySchedulerJob)
- func (repo *InMemoryJobRepository) EnqueueMany(jobs []LegacySchedulerJob)
- func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]LegacySchedulerJob, error)
- func (repo *InMemoryJobRepository) GetJobIterator(ctx context.Context, queue string) (JobIterator, error)
- func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)
- type JobIterator
- type JobQueueIteratorAdapter
- type JobRepository
- type JobSchedulingContext
- type JobSchedulingContextByExecutor
- type KubernetesLeaderController
- type LeaderConfig
- type LeaderController
- type LeaderToken
- type LeaseListener
- type LegacyScheduler
- type LegacySchedulerJob
- type LegacySchedulingAlgo
- type MetricsCollector
- type MultiJobsIterator
- type NodeAvailableResourceIndex
- type NodeDb
- func (nodeDb *NodeDb) ClearAllocated() error
- func (nodeDb *NodeDb) GetNode(id string) (*schedulerobjects.Node, error)
- func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*schedulerobjects.Node, error)
- func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)
- func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingContext, bool, error)
- func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingContext, bool, error)
- func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
- func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
- func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
- func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
- func (nodeDb *NodeDb) String() string
- func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn
- func (nodeDb *NodeDb) Upsert(node *schedulerobjects.Node) error
- func (nodeDb *NodeDb) UpsertMany(nodes []*schedulerobjects.Node) error
- func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*schedulerobjects.Node) error
- func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *schedulerobjects.Node) error
- type NodeDominantQueueIndex
- type NodeIterator
- type NodePairIterator
- type NodePairIteratorItem
- type NodeTypeResourceIterator
- type NodeTypesResourceIterator
- type NodeTypesResourceIteratorItem
- type NodeTypesResourceIteratorPQ
- type NodesIterator
- type PodSchedulingContext
- type PoolAssigner
- type Publisher
- type PulsarPublisher
- type Queue
- type QueueCandidateGangIterator
- type QueueCandidateGangIteratorItem
- type QueueCandidateGangIteratorPQ
- type QueueSchedulingContext
- type QueueSchedulingContextByExecutor
- type QueuedGangIterator
- type QueuedJobsIterator
- type Rescheduler
- type Scheduler
- type SchedulerResult
- type SchedulingAlgo
- type SchedulingConstraints
- type SchedulingContext
- type SchedulingContextByExecutor
- type SchedulingContextRepository
- func (repo *SchedulingContextRepository) AddSchedulingContext(sctx *SchedulingContext) error
- func (repo *SchedulingContextRepository) GetJobReport(_ context.Context, jobId *schedulerobjects.JobId) (*schedulerobjects.JobReport, error)
- func (repo *SchedulingContextRepository) GetMostRecentJobSchedulingContextByExecutor(jobId string) (JobSchedulingContextByExecutor, bool)
- func (repo *SchedulingContextRepository) GetMostRecentQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool)
- func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor() SchedulingContextByExecutor
- func (repo *SchedulingContextRepository) GetMostRecentSuccessfulQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool)
- func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutor() SchedulingContextByExecutor
- func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, queue *schedulerobjects.Queue) (*schedulerobjects.QueueReport, error)
- func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context, _ *types.Empty) (*schedulerobjects.SchedulingReport, error)
- func (repo *SchedulingContextRepository) GetSortedExecutorIds() []string
- type StandaloneLeaderController
- type SubmitChecker
Constants ¶
const ( // TargetNodeIdAnnotation if set on a pod, the value of this annotation is interpreted as the id of a node // and only the node with that id will be considered for scheduling the pod. TargetNodeIdAnnotation = "armadaproject.io/targetNodeId" // IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running. // Used by the scheduler to differentiate between pods from running and queued jobs. IsEvictedAnnotation = "armadaproject.io/isEvicted" // JobIdAnnotation if set on a pod, indicates which job this pod is part of. JobIdAnnotation = "armadaproject.io/jobId" // QueueAnnotation if set on a pod, indicates which queue this pod is part of. QueueAnnotation = "armadaproject.io/queue" )
const NodeDominantQueueWildcard = "*"
Variables ¶
var ArmadaSchedulerManagedAnnotations = []string{ TargetNodeIdAnnotation, IsEvictedAnnotation, JobIdAnnotation, QueueAnnotation, }
Functions ¶
func BindPodToNode ¶ added in v0.3.50
func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
BindPodToNode returns a copy of node with req bound to it.
func EvictPodFromNode ¶ added in v0.3.54
func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
EvictPodFromNode returns a copy of node with req evicted from it. Specifically: - The job is marked as evicted on the node. - AllocatedByJobId and AllocatedByQueue are not updated. - Resources requested by the evicted pod are marked as allocated at priority evictedPriority.
func GangIdAndCardinalityFromLegacySchedulerJob ¶ added in v0.3.47
func GangIdAndCardinalityFromLegacySchedulerJob(job LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (string, int, bool, error)
func GroupJobsByAnnotation ¶ added in v0.3.49
func JavaStringHash ¶
JavaStringHash is the default hashing algorithm used by Pulsar copied from https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/hash.go
func JobIdFromPodRequirements ¶ added in v0.3.50
func JobIdFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)
func JobsSummary ¶ added in v0.3.50
func JobsSummary(jobs []LegacySchedulerJob) string
func NodeJobDiff ¶ added in v0.3.50
func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*schedulerobjects.Node, map[string]*schedulerobjects.Node, error)
NodeJobDiff compares two snapshots of the NodeDb memdb and returns - a map from job ids of all preempted jobs to the node they used to be on - a map from job ids of all scheduled jobs to the node they were scheduled on that happened between the two snapshots.
func PodRequirementFromJobSchedulingInfo ¶ added in v0.3.47
func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements
func PodRequirementFromLegacySchedulerJob ¶ added in v0.3.50
func PodRequirementFromLegacySchedulerJob[E LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements
func PodRequirementsFromJobSchedulingInfos ¶ added in v0.3.47
func PodRequirementsFromJobSchedulingInfos(infos []*schedulerobjects.JobSchedulingInfo) []*schedulerobjects.PodRequirements
func PodRequirementsFromLegacySchedulerJobs ¶ added in v0.3.47
func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements
func PreemptedJobsFromSchedulerResult ¶ added in v0.3.54
func PreemptedJobsFromSchedulerResult[T LegacySchedulerJob](sr *SchedulerResult) []T
PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result, cast to type T.
func QueueFromPodRequirements ¶ added in v0.3.50
func QueueFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)
func ResourceListAsWeightedApproximateFloat64 ¶
func ResourceListAsWeightedApproximateFloat64(resourceScarcity map[string]float64, rl schedulerobjects.ResourceList) float64
func Run ¶
func Run(config Configuration) error
Run sets up a Scheduler application and runs it until a SIGTERM is received
func ScheduledJobsFromSchedulerResult ¶ added in v0.3.54
func ScheduledJobsFromSchedulerResult[T LegacySchedulerJob](sr *SchedulerResult) []T
ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result, cast to type T.
func UnbindPodFromNode ¶ added in v0.3.50
func UnbindPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
UnbindPodFromNode returns a copy of node with req unbound from it.
func UnbindPodsFromNode ¶ added in v0.3.54
func UnbindPodsFromNode(reqs []*schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error)
UnbindPodsFromNode returns a node with all reqs unbound from it.
func UpdateUsage ¶ added in v0.3.50
func UpdateUsage[S ~[]E, E LegacySchedulerJob]( usage map[string]schedulerobjects.QuantityByPriorityAndResourceType, jobs S, priorityClasses map[string]configuration.PriorityClass, addOrSubtract AddOrSubtract, ) map[string]schedulerobjects.QuantityByPriorityAndResourceType
Types ¶
type AddOrSubtract ¶ added in v0.3.50
type AddOrSubtract int
const ( Add AddOrSubtract = iota Subtract )
type CandidateGangIterator ¶
type CandidateGangIterator struct { SchedulingConstraints SchedulingContext *SchedulingContext // contains filtered or unexported fields }
CandidateGangIterator multiplexes between queues. Responsible for maintaining fair share and enforcing cross-queue scheduling constraints.
func NewCandidateGangIterator ¶ added in v0.3.47
func NewCandidateGangIterator( schedulingConstraints SchedulingConstraints, schedulingContext *SchedulingContext, ctx context.Context, iteratorsByQueue map[string]*QueueCandidateGangIterator, priorityFactorByQueue map[string]float64, ) (*CandidateGangIterator, error)
func (*CandidateGangIterator) Clear ¶ added in v0.3.47
func (it *CandidateGangIterator) Clear() error
func (*CandidateGangIterator) Next ¶
func (it *CandidateGangIterator) Next() ([]*JobSchedulingContext, error)
func (*CandidateGangIterator) Peek ¶ added in v0.3.47
func (it *CandidateGangIterator) Peek() ([]*JobSchedulingContext, error)
type Configuration ¶
type Configuration struct { // Database configuration Postgres configuration.PostgresConfig // Redis Comnfig Redis config.RedisConfig // General Pulsar configuration Pulsar configuration.PulsarConfig // Configuration controlling leader election Leader LeaderConfig // Configuration controlling metrics Metrics configuration.MetricsConfig // Scheduler configuration (this is shared with the old scheduler) Scheduling configuration.SchedulingConfig Auth authconfig.AuthConfig Grpc grpcconfig.GrpcConfig // Maximum number of strings that should be cached at any one time InternedStringsCacheSize uint32 `validate:"required"` // How often the scheduling cycle should run CyclePeriod time.Duration `validate:"required"` // How long after a heartbeat an executor will be considered lost ExecutorTimeout time.Duration `validate:"required"` // Maximum number of rows to fetch in a given query DatabaseFetchSize int `validate:"required"` // Timeout to use when sending messages to pulsar PulsarSendTimeout time.Duration `validate:"required"` }
type DefaultPoolAssigner ¶ added in v0.3.54
type DefaultPoolAssigner struct {
// contains filtered or unexported fields
}
func NewPoolAssigner ¶ added in v0.3.54
func NewPoolAssigner(executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, ) (*DefaultPoolAssigner, error)
func (*DefaultPoolAssigner) AssignPool ¶ added in v0.3.54
func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error)
AssignPool returns the pool associated with the job or the empty string if no pool is valid
type Evictor ¶ added in v0.3.54
type Evictor struct {
// contains filtered or unexported fields
}
func NewFilteredEvictor ¶ added in v0.3.56
func NewFilteredEvictor( jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, nodeIdsToEvict map[string]bool, jobIdsToEvict map[string]bool, ) *Evictor
NewFilteredEvictor returns a new evictor that evicts all jobs for which jobIdsToEvict[jobId] is true on nodes for which nodeIdsToEvict[nodeId] is true.
func NewOversubscribedEvictor ¶ added in v0.3.54
func NewOversubscribedEvictor( jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, perNodeEvictionProbability float64, ) *Evictor
NewOversubscribedEvictor returns a new evictor that for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled with probability perNodeEvictionProbability.
func NewPreemptibleEvictor ¶ added in v0.3.54
func NewPreemptibleEvictor( jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, defaultPriorityClass string, nodeFilter func(context.Context, *schedulerobjects.Node) bool, ) *Evictor
NewPreemptibleEvictor returns a new evictor that evicts all preemptible jobs on nodes for which nodeFilter returns true.
func NewStochasticEvictor ¶ added in v0.3.54
func NewStochasticEvictor( jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, defaultPriorityClass string, perNodeEvictionProbability float64, ) *Evictor
NewStochasticEvictor returns a new evictor that for each node evicts all preemptible jobs from that node with probability perNodeEvictionProbability.
func (*Evictor) Evict ¶ added in v0.3.54
func (evi *Evictor) Evict(ctx context.Context, it NodeIterator) (*EvictorResult, error)
Evict removes jobs from nodes, returning all affected jobs and nodes. Any node for which nodeFilter returns false is skipped. Any job for which jobFilter returns true is evicted (if the node was not skipped). If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.
type EvictorResult ¶ added in v0.3.54
type EvictorResult struct { // Map from job id to job, containing all evicted jobs. EvictedJobsById map[string]LegacySchedulerJob // Map from node id to node, containing all nodes on which at least one job was evicted. AffectedNodesById map[string]*schedulerobjects.Node // For each evicted job, maps the id of the job to the id of the node it was evicted from. NodeIdByJobId map[string]string }
type ExecutorApi ¶
type ExecutorApi struct {
// contains filtered or unexported fields
}
ExecutorApi is a gRPC service that exposes functionality required by the armada executors
func NewExecutorApi ¶
func NewExecutorApi(producer pulsar.Producer, jobRepository database.JobRepository, executorRepository database.ExecutorRepository, legacyExecutorRepository database.ExecutorRepository, allowedPriorities []int32, maxJobsPerCall uint, nodeIdLabel string, ) (*ExecutorApi, error)
func (*ExecutorApi) LeaseJobRuns ¶
func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error
LeaseJobRuns performs the following actions:
- Stores the request in postgres so that the scheduler can use the job + capacity information in the next scheduling round
- Determines if any of the job runs in the request are no longer active and should be cancelled
- Determines if any new job runs should be leased to the executor
func (*ExecutorApi) ReportEvents ¶
func (srv *ExecutorApi) ReportEvents(ctx context.Context, list *executorapi.EventList) (*types.Empty, error)
ReportEvents publishes all events to pulsar. The events are compacted for more efficient publishing
type InMemoryJobIterator ¶ added in v0.3.50
type InMemoryJobIterator struct {
// contains filtered or unexported fields
}
func NewInMemoryJobIterator ¶ added in v0.3.50
func NewInMemoryJobIterator[S ~[]E, E LegacySchedulerJob](jobs S) *InMemoryJobIterator
func (*InMemoryJobIterator) Next ¶ added in v0.3.50
func (it *InMemoryJobIterator) Next() (LegacySchedulerJob, error)
type InMemoryJobRepository ¶ added in v0.3.50
type InMemoryJobRepository struct {
// contains filtered or unexported fields
}
func NewInMemoryJobRepository ¶ added in v0.3.50
func NewInMemoryJobRepository(priorityClasses map[string]configuration.PriorityClass) *InMemoryJobRepository
func (*InMemoryJobRepository) Enqueue ¶ added in v0.3.50
func (repo *InMemoryJobRepository) Enqueue(job LegacySchedulerJob)
func (*InMemoryJobRepository) EnqueueMany ¶ added in v0.3.50
func (repo *InMemoryJobRepository) EnqueueMany(jobs []LegacySchedulerJob)
func (*InMemoryJobRepository) GetExistingJobsByIds ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]LegacySchedulerJob, error)
func (*InMemoryJobRepository) GetJobIterator ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetJobIterator(ctx context.Context, queue string) (JobIterator, error)
func (*InMemoryJobRepository) GetQueueJobIds ¶ added in v0.3.50
func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)
type JobIterator ¶
type JobIterator interface {
Next() (LegacySchedulerJob, error)
}
type JobQueueIteratorAdapter ¶ added in v0.3.47
type JobQueueIteratorAdapter struct {
// contains filtered or unexported fields
}
func (*JobQueueIteratorAdapter) Next ¶ added in v0.3.47
func (it *JobQueueIteratorAdapter) Next() (LegacySchedulerJob, error)
type JobRepository ¶
type JobSchedulingContext ¶ added in v0.3.62
type JobSchedulingContext struct { // Time at which this context was created. Created time.Time // Executor this job was attempted to be assigned to. ExecutorId string // Total number of nodes in the cluster when trying to schedule. NumNodes int // Id of the job this pod corresponds to. JobId string // Job spec. Job LegacySchedulerJob // Scheduling requirements of this job. // We currently require that each job contains exactly one pod spec. Req *schedulerobjects.PodRequirements // Reason for why the job could not be scheduled. // Empty if the job was scheduled successfully. UnschedulableReason string // Pod scheduling contexts for the individual pods that make up the job. PodSchedulingContexts []*PodSchedulingContext }
JobSchedulingContext is created by the scheduler and contains information about the decision made by the scheduler for a particular job.
func (*JobSchedulingContext) String ¶ added in v0.3.62
func (jctx *JobSchedulingContext) String() string
type JobSchedulingContextByExecutor ¶ added in v0.3.62
type JobSchedulingContextByExecutor map[string]*JobSchedulingContext
func (JobSchedulingContextByExecutor) String ¶ added in v0.3.62
func (m JobSchedulingContextByExecutor) String() string
type KubernetesLeaderController ¶
type KubernetesLeaderController struct {
// contains filtered or unexported fields
}
KubernetesLeaderController uses the Kubernetes leader election mechanism to determine who is leader. This allows multiple instances of the scheduler to be run for high availability.
TODO: Move into package in common.
func NewKubernetesLeaderController ¶
func NewKubernetesLeaderController(config LeaderConfig, client coordinationv1client.LeasesGetter) *KubernetesLeaderController
func (*KubernetesLeaderController) GetToken ¶
func (lc *KubernetesLeaderController) GetToken() LeaderToken
func (*KubernetesLeaderController) Run ¶
func (lc *KubernetesLeaderController) Run(ctx context.Context) error
Run starts the controller. This is a blocking call that returns when the provided context is cancelled.
func (*KubernetesLeaderController) ValidateToken ¶
func (lc *KubernetesLeaderController) ValidateToken(tok LeaderToken) bool
type LeaderConfig ¶
type LeaderConfig struct { // Valid modes are "standalone" or "kubernetes" Mode string `validate:"required"` // Name of the K8s Lock Object LeaseLockName string // Namespace of the K8s Lock Object LeaseLockNamespace string // The name of the pod PodName string // How long the lease is held for. // Non leaders much wait this long before trying to acquire the lease LeaseDuration time.Duration // RenewDeadline is the duration that the acting leader will retry refreshing leadership before giving up. RenewDeadline time.Duration // RetryPeriod is the duration the LeaderElector clients should waite between tries of actions. RetryPeriod time.Duration }
type LeaderController ¶
type LeaderController interface { // GetToken returns a LeaderToken which allows you to determine if you are leader or not GetToken() LeaderToken // ValidateToken allows a caller to determine whether a previously obtained token is still valid. // Returns true if the token is a leader and false otherwise ValidateToken(tok LeaderToken) bool // Run starts the controller. This is a blocking call which will return when the provided context is cancelled Run(ctx context.Context) error }
LeaderController is an interface to be implemented by structs that control which scheduler is leader
type LeaderToken ¶
type LeaderToken struct {
// contains filtered or unexported fields
}
LeaderToken is a token handed out to schedulers which they can use to determine if they are leader
func InvalidLeaderToken ¶
func InvalidLeaderToken() LeaderToken
InvalidLeaderToken returns a LeaderToken indicating this instance is not leader.
func NewLeaderToken ¶
func NewLeaderToken() LeaderToken
NewLeaderToken returns a LeaderToken indicating this instance is the leader.
type LeaseListener ¶
type LeaseListener interface {
// contains filtered or unexported methods
}
LeaseListener allows clients to listen for lease events.
type LegacyScheduler ¶
type LegacyScheduler struct { SchedulingConstraints SchedulingContext *SchedulingContext // contains filtered or unexported fields }
func NewLegacyScheduler ¶
func NewLegacyScheduler( ctx context.Context, constraints SchedulingConstraints, nodeDb *NodeDb, queues []*Queue, initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, ) (*LegacyScheduler, error)
func (*LegacyScheduler) Schedule ¶
func (sch *LegacyScheduler) Schedule(ctx context.Context) (*SchedulerResult, error)
func (*LegacyScheduler) String ¶
func (sched *LegacyScheduler) String() string
type LegacySchedulerJob ¶
type LegacySchedulerJob interface { GetId() string GetQueue() string GetJobSet() string GetAnnotations() map[string]string GetRequirements(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo }
type LegacySchedulingAlgo ¶
type LegacySchedulingAlgo struct {
// contains filtered or unexported fields
}
LegacySchedulingAlgo is a SchedulingAlgo that schedules jobs in the same way as the old lease call
func NewLegacySchedulingAlgo ¶
func NewLegacySchedulingAlgo( config configuration.SchedulingConfig, executorRepository database.ExecutorRepository, queueRepository database.QueueRepository, ) *LegacySchedulingAlgo
func (*LegacySchedulingAlgo) Schedule ¶
func (l *LegacySchedulingAlgo) Schedule( ctx context.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb, ) (*SchedulerResult, error)
Schedule assigns jobs to nodes in the same way as the old lease call. It iterates over each executor in turn (using a random order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.
type MetricsCollector ¶ added in v0.3.54
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector is a Prometheus Collector that handles scheduler metrics. The metrics themselves are calculated asynchronously every refreshPeriod
func NewMetricsCollector ¶ added in v0.3.54
func NewMetricsCollector( jobDb *jobdb.JobDb, queueRepository database.QueueRepository, poolAssigner PoolAssigner, refreshPeriod time.Duration, ) *MetricsCollector
func (*MetricsCollector) Collect ¶ added in v0.3.54
func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric)
Collect returns the current state of all metrics of the collector.
func (*MetricsCollector) Describe ¶ added in v0.3.54
func (c *MetricsCollector) Describe(out chan<- *prometheus.Desc)
Describe returns all descriptions of the collector.
type MultiJobsIterator ¶ added in v0.3.47
type MultiJobsIterator struct {
// contains filtered or unexported fields
}
MultiJobsIterator chains several JobIterators together, emptying them in the order provided.
func NewMultiJobsIterator ¶ added in v0.3.47
func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator
func (*MultiJobsIterator) Next ¶ added in v0.3.47
func (it *MultiJobsIterator) Next() (LegacySchedulerJob, error)
type NodeAvailableResourceIndex ¶ added in v0.3.50
type NodeAvailableResourceIndex struct { // Resource name, e.g., "cpu", "gpu", or "memory". Resource string // Job priority. Priority int32 }
func (*NodeAvailableResourceIndex) FromArgs ¶ added in v0.3.50
func (index *NodeAvailableResourceIndex) FromArgs(args ...interface{}) ([]byte, error)
FromArgs computes the index key from a set of arguments. Takes a single argument resourceAmount of type resource.Quantity.
func (*NodeAvailableResourceIndex) FromObject ¶ added in v0.3.50
func (index *NodeAvailableResourceIndex) FromObject(raw interface{}) (bool, []byte, error)
FromObject extracts the index key from a *schedulerobjects.Node.
type NodeDb ¶
type NodeDb struct {
// contains filtered or unexported fields
}
NodeDb is the scheduler-internal system for storing node information. It's used to efficiently find nodes on which a pod can be scheduled.
func NewNodeDb ¶
func NewNodeDb( priorityClasses map[string]configuration.PriorityClass, maxExtraNodesToConsider uint, indexedResources, indexedTaints, indexedNodeLabels []string, ) (*NodeDb, error)
func (*NodeDb) ClearAllocated ¶
ClearAllocated zeroes out allocated resources on all nodes in the NodeDb.
func (*NodeDb) GetNode ¶ added in v0.3.47
func (nodeDb *NodeDb) GetNode(id string) (*schedulerobjects.Node, error)
GetNode returns a node in the db with given id.
func (*NodeDb) GetNodeWithTxn ¶ added in v0.3.47
func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*schedulerobjects.Node, error)
GetNodeWithTxn returns a node in the db with given id, within the provided transactions.
func (*NodeDb) NodeTypesMatchingPod ¶
func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)
NodeTypesMatchingPod returns a slice with all node types a pod could be scheduled on. It also returns the number of nodes excluded by reason for exclusion.
func (*NodeDb) ScheduleMany ¶
func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingContext, bool, error)
ScheduleMany assigns a set of pods to nodes. The assignment is atomic, i.e., either all pods are successfully assigned to nodes or none are. The returned bool indicates whether assignment succeeded or not. TODO: Pass through contexts to support timeouts.
func (*NodeDb) ScheduleManyWithTxn ¶
func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingContext, bool, error)
func (*NodeDb) SelectAndBindNodeToPod ¶
func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
func (*NodeDb) SelectAndBindNodeToPodWithTxn ¶
func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
func (*NodeDb) SelectNodeForPod ¶
func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
func (*NodeDb) SelectNodeForPodWithTxn ¶
func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingContext, error)
SelectNodeForPodWithTxn selects a node on which the pod can be scheduled.
func (*NodeDb) UpsertMany ¶ added in v0.3.50
func (nodeDb *NodeDb) UpsertMany(nodes []*schedulerobjects.Node) error
func (*NodeDb) UpsertManyWithTxn ¶ added in v0.3.50
func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*schedulerobjects.Node) error
func (*NodeDb) UpsertWithTxn ¶ added in v0.3.50
func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *schedulerobjects.Node) error
type NodeDominantQueueIndex ¶ added in v0.3.50
type NodeDominantQueueIndex struct{}
func (*NodeDominantQueueIndex) FromArgs ¶ added in v0.3.50
func (index *NodeDominantQueueIndex) FromArgs(args ...interface{}) ([]byte, error)
FromArgs computes the index value from a set of arguments. Takes a single argument of type string.
func (*NodeDominantQueueIndex) FromObject ¶ added in v0.3.50
func (index *NodeDominantQueueIndex) FromObject(raw interface{}) (bool, []byte, error)
FromObject extracts the index valuefrom a *schedulerobjects.Node object.
type NodeIterator ¶ added in v0.3.50
type NodeIterator interface {
NextNode() *schedulerobjects.Node
}
type NodePairIterator ¶ added in v0.3.50
type NodePairIterator struct {
// contains filtered or unexported fields
}
func NewNodePairIterator ¶ added in v0.3.50
func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error)
func (*NodePairIterator) Next ¶ added in v0.3.50
func (it *NodePairIterator) Next() interface{}
func (*NodePairIterator) NextItem ¶ added in v0.3.50
func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem)
func (*NodePairIterator) WatchCh ¶ added in v0.3.50
func (it *NodePairIterator) WatchCh() <-chan struct{}
type NodePairIteratorItem ¶ added in v0.3.50
type NodePairIteratorItem struct { NodeA *schedulerobjects.Node NodeB *schedulerobjects.Node }
type NodeTypeResourceIterator ¶
type NodeTypeResourceIterator struct {
// contains filtered or unexported fields
}
NodeTypeResourceIterator is an iterator over all nodes of a given nodeType, for which there's at least some specified amount of a given resource available. For example, all nodes of type "foo" for which there's at least 1Gi of memory available.
Available resources is the sum of unused resources and resources assigned to lower-priority jobs. Nodes are returned in sorted order, from least to most of the specified resource available.
If dominantQueue is NodeDominantQueueWildcard, all nodes of the given node type are considered. Otherwise, only nodes for which the given queue has the largest request are considered. If maxActiveQueues > 0, only nodes with less than or equal to this number of active queues are returned.
func NewNodeTypeResourceIterator ¶
func NewNodeTypeResourceIterator(txn *memdb.Txn, dominantQueue string, maxActiveQueues int, resource string, priority int32, nodeType *schedulerobjects.NodeType, resourceAmount resource.Quantity) (*NodeTypeResourceIterator, error)
func (*NodeTypeResourceIterator) Next ¶
func (it *NodeTypeResourceIterator) Next() interface{}
func (*NodeTypeResourceIterator) NextNodeItem ¶
func (it *NodeTypeResourceIterator) NextNodeItem() *schedulerobjects.Node
func (*NodeTypeResourceIterator) WatchCh ¶
func (it *NodeTypeResourceIterator) WatchCh() <-chan struct{}
type NodeTypesResourceIterator ¶
type NodeTypesResourceIterator struct {
// contains filtered or unexported fields
}
NodeTypesResourceIterator extends NodeTypeResourceIterator to iterate over nodes of several node types. Nodes are returned in sorted order, going from least to most of the specified resource available.
If exclusiveToQueue is NodeDominantQueueWildcard, all nodes of the given node type are considered. Otherwise, only nodes exclusive to that queue are considered.
func NewNodeTypesResourceIterator ¶
func NewNodeTypesResourceIterator(txn *memdb.Txn, dominantQueue string, maxActiveQueues int, resource string, priority int32, nodeTypes []*schedulerobjects.NodeType, resourceQuantity resource.Quantity) (*NodeTypesResourceIterator, error)
func (*NodeTypesResourceIterator) Next ¶
func (it *NodeTypesResourceIterator) Next() interface{}
func (*NodeTypesResourceIterator) NextNodeItem ¶
func (it *NodeTypesResourceIterator) NextNodeItem() *schedulerobjects.Node
func (*NodeTypesResourceIterator) WatchCh ¶
func (it *NodeTypesResourceIterator) WatchCh() <-chan struct{}
type NodeTypesResourceIteratorItem ¶
type NodeTypesResourceIteratorItem struct {
// contains filtered or unexported fields
}
type NodeTypesResourceIteratorPQ ¶
type NodeTypesResourceIteratorPQ []*NodeTypesResourceIteratorItem
NodeTypesResourceIteratorPQ is a priority queue used by NodeTypesResourceIterator to return results from across several sub-iterators in order.
func (NodeTypesResourceIteratorPQ) Len ¶
func (pq NodeTypesResourceIteratorPQ) Len() int
func (NodeTypesResourceIteratorPQ) Less ¶
func (pq NodeTypesResourceIteratorPQ) Less(i, j int) bool
func (*NodeTypesResourceIteratorPQ) Pop ¶
func (pq *NodeTypesResourceIteratorPQ) Pop() any
func (*NodeTypesResourceIteratorPQ) Push ¶
func (pq *NodeTypesResourceIteratorPQ) Push(x any)
func (NodeTypesResourceIteratorPQ) Swap ¶
func (pq NodeTypesResourceIteratorPQ) Swap(i, j int)
type NodesIterator ¶
type NodesIterator struct {
// contains filtered or unexported fields
}
NodesIterator is an iterator over all nodes in the db.
func NewNodesIterator ¶
func NewNodesIterator(txn *memdb.Txn) (*NodesIterator, error)
func (*NodesIterator) Next ¶
func (it *NodesIterator) Next() interface{}
func (*NodesIterator) NextNode ¶
func (it *NodesIterator) NextNode() *schedulerobjects.Node
func (*NodesIterator) WatchCh ¶
func (it *NodesIterator) WatchCh() <-chan struct{}
type PodSchedulingContext ¶ added in v0.3.62
type PodSchedulingContext struct { // Time at which this context was created. Created time.Time // Pod scheduling requirements. Req *schedulerobjects.PodRequirements // Resource type determined by the scheduler to be the hardest to satisfy // the scheduling requirements for. DominantResourceType string // Node the pod was assigned to. // If nil, the pod could not be assigned to any Node. Node *schedulerobjects.Node // Score indicates how well the pod fits on the selected Node. Score int // Node types on which this pod could be scheduled. MatchingNodeTypes []*schedulerobjects.NodeType // Total number of nodes in the cluster when trying to schedule. NumNodes int // Number of nodes excluded by reason. NumExcludedNodesByReason map[string]int // Set if an error occurred while attempting to schedule this pod. Err error }
PodSchedulingContext is returned by SelectAndBindNodeToPod and contains detailed information on the scheduling decision made for this pod.
func (*PodSchedulingContext) String ¶ added in v0.3.62
func (pctx *PodSchedulingContext) String() string
type PoolAssigner ¶ added in v0.3.54
type PoolAssigner interface { Refresh(ctx context.Context) error AssignPool(j *jobdb.Job) (string, error) }
PoolAssigner allows jobs to be assigned to a pool Note that this is intended only for use with metrics calculation
type Publisher ¶
type Publisher interface { // PublishMessages will publish the supplied messages. A LeaderToken is provided and the // implementor may decide whether to publish based on the status of this token PublishMessages(ctx context.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error // PublishMarkers publishes a single marker message for each Pulsar partition. Each marker // massage contains the supplied group id, which allows all marker messages for a given call // to be identified. The uint32 returned is the number of messages published PublishMarkers(ctx context.Context, groupId uuid.UUID) (uint32, error) }
Publisher is an interface to be implemented by structs that handle publishing messages to pulsar
type PulsarPublisher ¶
type PulsarPublisher struct {
// contains filtered or unexported fields
}
PulsarPublisher is the default implementation of Publisher
func NewPulsarPublisher ¶
func NewPulsarPublisher( pulsarClient pulsar.Client, producerOptions pulsar.ProducerOptions, pulsarSendTimeout time.Duration, ) (*PulsarPublisher, error)
func (*PulsarPublisher) PublishMarkers ¶
PublishMarkers sends one pulsar message (containing an armadaevents.PartitionMarker) to each partition of the producer's Pulsar topic.
func (*PulsarPublisher) PublishMessages ¶
func (p *PulsarPublisher) PublishMessages(ctx context.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error
PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into single event sequences up to maxMessageBatchSize.
type QueueCandidateGangIterator ¶
type QueueCandidateGangIterator struct { SchedulingConstraints QueueSchedulingContexts *QueueSchedulingContext // contains filtered or unexported fields }
QueueCandidateGangIterator is an iterator over gangs in a queue that could be scheduled without exceeding per-queue limits.
func (*QueueCandidateGangIterator) Clear ¶ added in v0.3.47
func (it *QueueCandidateGangIterator) Clear() error
func (*QueueCandidateGangIterator) Next ¶
func (it *QueueCandidateGangIterator) Next() ([]*JobSchedulingContext, error)
func (*QueueCandidateGangIterator) Peek ¶ added in v0.3.47
func (it *QueueCandidateGangIterator) Peek() ([]*JobSchedulingContext, error)
type QueueCandidateGangIteratorItem ¶ added in v0.3.47
type QueueCandidateGangIteratorItem struct {
// contains filtered or unexported fields
}
type QueueCandidateGangIteratorPQ ¶ added in v0.3.47
type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem
Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.
func (QueueCandidateGangIteratorPQ) Len ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Len() int
func (QueueCandidateGangIteratorPQ) Less ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Less(i, j int) bool
func (*QueueCandidateGangIteratorPQ) Pop ¶ added in v0.3.47
func (pq *QueueCandidateGangIteratorPQ) Pop() any
func (*QueueCandidateGangIteratorPQ) Push ¶ added in v0.3.47
func (pq *QueueCandidateGangIteratorPQ) Push(x any)
func (QueueCandidateGangIteratorPQ) Swap ¶ added in v0.3.47
func (pq QueueCandidateGangIteratorPQ) Swap(i, j int)
type QueueSchedulingContext ¶ added in v0.3.62
type QueueSchedulingContext struct { // Time at which this context was created. Created time.Time // Executor this job was attempted to be assigned to. ExecutorId string // Queue name. Queue string // These factors influence the fraction of resources assigned to each queue. PriorityFactor float64 // Total resources assigned to the queue across all clusters. // Including jobs scheduled during this invocation of the scheduler. ResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Resources assigned to this queue during this scheduling cycle. ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Job scheduling contexts associated with successful scheduling attempts. SuccessfulJobSchedulingContexts map[string]*JobSchedulingContext // Job scheduling contexts associated with unsuccessful scheduling attempts. UnsuccessfulJobSchedulingContexts map[string]*JobSchedulingContext }
QueueSchedulingContext captures the decisions made by the scheduler during one invocation for a particular queue.
func NewQueueSchedulingContext ¶ added in v0.3.62
func NewQueueSchedulingContext(queue, executorId string, priorityFactor float64, initialResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType) *QueueSchedulingContext
func (*QueueSchedulingContext) AddJobSchedulingContext ¶ added in v0.3.62
func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext, isEvictedJob bool)
AddJobSchedulingContext adds a job scheduling context. Automatically updates scheduled resources.
func (*QueueSchedulingContext) ClearJobSpecs ¶ added in v0.3.62
func (qctx *QueueSchedulingContext) ClearJobSpecs()
ClearJobSpecs zeroes out job specs to reduce memory usage.
func (*QueueSchedulingContext) String ¶ added in v0.3.62
func (qctx *QueueSchedulingContext) String() string
type QueueSchedulingContextByExecutor ¶ added in v0.3.62
type QueueSchedulingContextByExecutor map[string]*QueueSchedulingContext
func (QueueSchedulingContextByExecutor) String ¶ added in v0.3.62
func (m QueueSchedulingContextByExecutor) String() string
type QueuedGangIterator ¶
type QueuedGangIterator struct {
// contains filtered or unexported fields
}
QueuedGangIterator is an iterator over all gangs in a queue, where a gang is a set of jobs for which the gangIdAnnotation has equal value. A gang is yielded once the final member of the gang has been received. Jobs without gangIdAnnotation are considered to be gangs of cardinality 1.
func NewQueuedGangIterator ¶
func NewQueuedGangIterator(ctx context.Context, it JobIterator, maxLookback uint) *QueuedGangIterator
func (*QueuedGangIterator) Clear ¶ added in v0.3.47
func (it *QueuedGangIterator) Clear() error
func (*QueuedGangIterator) Next ¶
func (it *QueuedGangIterator) Next() ([]LegacySchedulerJob, error)
func (*QueuedGangIterator) Peek ¶ added in v0.3.47
func (it *QueuedGangIterator) Peek() ([]LegacySchedulerJob, error)
type QueuedJobsIterator ¶
type QueuedJobsIterator struct {
// contains filtered or unexported fields
}
QueuedJobsIterator is an iterator over all jobs in a queue. It lazily loads jobs in batches from Redis asynch.
func NewQueuedJobsIterator ¶
func NewQueuedJobsIterator(ctx context.Context, queue string, repo JobRepository) (*QueuedJobsIterator, error)
func (*QueuedJobsIterator) Next ¶
func (it *QueuedJobsIterator) Next() (LegacySchedulerJob, error)
type Rescheduler ¶ added in v0.3.54
type Rescheduler struct {
// contains filtered or unexported fields
}
Rescheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. Uses LegacyScheduler as a building block.
func NewRescheduler ¶ added in v0.3.54
func NewRescheduler( constraints SchedulingConstraints, config configuration.SchedulingConfig, jobRepo JobRepository, nodeDb *NodeDb, priorityFactorByQueue map[string]float64, initialAllocationByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, initialNodeIdByJobId map[string]string, initialJobIdsByGangId map[string]map[string]bool, initialGangIdByJobId map[string]string, ) *Rescheduler
func (*Rescheduler) EnableAssertions ¶ added in v0.3.56
func (sch *Rescheduler) EnableAssertions()
func (*Rescheduler) Schedule ¶ added in v0.3.54
func (sch *Rescheduler) Schedule(ctx context.Context) (*SchedulerResult, error)
Schedule - preempts jobs belonging to queues with total allocation above their fair share and - schedules new jobs belonging to queues with total allocation less than their fair share.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the main Armada scheduler. It periodically performs the following cycle: 1. Update state from postgres (via the jobRepository). 2. Determine if leader and exit if not. 3. Generate any necessary events resulting from the state update. 4. Expire any jobs assigned to clusters that have timed out. 5. Schedule jobs. 6. Publish any Armada events resulting from the scheduling cycle.
func NewScheduler ¶
func NewScheduler( jobRepository database.JobRepository, executorRepository database.ExecutorRepository, schedulingAlgo SchedulingAlgo, leaderController LeaderController, publisher Publisher, stringInterner *stringinterner.StringInterner, cyclePeriod time.Duration, executorTimeout time.Duration, maxLeaseReturns uint, ) (*Scheduler, error)
type SchedulerResult ¶ added in v0.3.54
type SchedulerResult struct { // Running jobs that should be preempted. PreemptedJobs []LegacySchedulerJob // Queued jobs that should be scheduled. ScheduledJobs []LegacySchedulerJob // For each preempted job, maps the job id to the id of the node on which the job was running. // For each scheduled job, maps the job id to the id of the node on which the job should be scheduled. NodeIdByJobId map[string]string // Resource usage by queue, accounting for preempted and scheduled jobs. AllocatedByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType // Scheduling context created for this scheduling round. SchedulingContext *SchedulingContext }
SchedulerResult is returned by Rescheduler.Schedule().
func NewSchedulerResult ¶ added in v0.3.54
func NewSchedulerResult[S ~[]T, T LegacySchedulerJob]( preemptedJobs S, scheduledJobs S, nodeIdByJobId map[string]string, allocatedByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, ) *SchedulerResult
type SchedulingAlgo ¶
type SchedulingAlgo interface { // Schedule should assign jobs to nodes // Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided // It should return a slice containing all scheduled jobs. Schedule(ctx context.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) (*SchedulerResult, error) }
SchedulingAlgo is the interface between the Pulsar-backed scheduler and the algorithm deciding which jobs to schedule and preempt.
type SchedulingConstraints ¶
type SchedulingConstraints struct { PriorityClasses map[string]configuration.PriorityClass // Executor for which we're currently scheduling jobs. ExecutorId string // Resource pool of this executor. Pool string // Weights used when computing total resource usage. ResourceScarcity map[string]float64 // Max number of jobs to scheduler per lease jobs call. MaximumJobsToSchedule uint // Max number of jobs to consider for a queue before giving up. MaxLookbackPerQueue uint // Jobs leased to this executor must be at least this large. // Used, e.g., to avoid scheduling CPU-only jobs onto clusters with GPUs. MinimumJobSize schedulerobjects.ResourceList // Per-queue resource limits. // Map from resource type to the limit for that resource. MaximalResourceFractionPerQueue map[string]float64 // Limit- as a fraction of total resources across worker clusters- of resource types at each priority. // The limits are cumulative, i.e., the limit at priority p includes all higher levels. MaximalCumulativeResourceFractionPerQueueAndPriority map[int32]map[string]float64 // Max resources to schedule per queue at a time. MaximalResourceFractionToSchedulePerQueue map[string]float64 // Max resources to schedule at a time. MaximalResourceFractionToSchedule map[string]float64 // Total resources across all worker clusters. // Used when computing resource limits. TotalResources schedulerobjects.ResourceList }
SchedulingConstraints collects scheduling constraints, e.g., per-queue resource limits.
func SchedulingConstraintsFromSchedulingConfig ¶
func SchedulingConstraintsFromSchedulingConfig( executorId, pool string, minimumJobSize schedulerobjects.ResourceList, config configuration.SchedulingConfig, totalResources schedulerobjects.ResourceList, ) *SchedulingConstraints
type SchedulingContext ¶ added in v0.3.62
type SchedulingContext struct { // Time at which the scheduling cycle started. Started time.Time // Time at which the scheduling cycle finished. Finished time.Time // ExecutorId for which the scheduler was invoked. ExecutorId string // Per-queue scheduling contexts. QueueSchedulingContexts map[string]*QueueSchedulingContext // Total resources across all clusters available at the start of the scheduling cycle. TotalResources schedulerobjects.ResourceList // Resources assigned across all queues during this scheduling cycle. ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Total number of jobs successfully scheduled in this round. NumScheduledJobs int // Reason for why the scheduling round finished. TerminationReason string }
SchedulingContext captures the decisions made by the scheduler during one invocation.
func NewSchedulingContext ¶ added in v0.3.62
func NewSchedulingContext( executorId string, totalResources schedulerobjects.ResourceList, priorityFactorByQueue map[string]float64, initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, ) *SchedulingContext
func (*SchedulingContext) AddJobSchedulingContext ¶ added in v0.3.62
func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext, isEvictedJob bool)
AddJobSchedulingContext adds a job scheduling context. Automatically updates scheduled resources
func (*SchedulingContext) ClearJobSpecs ¶ added in v0.3.62
func (sctx *SchedulingContext) ClearJobSpecs()
ClearJobSpecs zeroes out job specs to reduce memory usage.
func (*SchedulingContext) String ¶ added in v0.3.62
func (sctx *SchedulingContext) String() string
func (*SchedulingContext) SuccessfulJobSchedulingContexts ¶ added in v0.3.62
func (sctx *SchedulingContext) SuccessfulJobSchedulingContexts() []*JobSchedulingContext
type SchedulingContextByExecutor ¶ added in v0.3.62
type SchedulingContextByExecutor map[string]*SchedulingContext
func (SchedulingContextByExecutor) String ¶ added in v0.3.62
func (m SchedulingContextByExecutor) String() string
type SchedulingContextRepository ¶ added in v0.3.62
type SchedulingContextRepository struct {
// contains filtered or unexported fields
}
SchedulingContextRepository stores scheduling contexts associated with recent scheduling attempts. On adding a context, a map is cloned, then mutated, and then swapped for the previous map using atomic pointers. Hence, reads concurrent with writes are safe and don't need locking. A mutex protects against concurrent writes.
func NewSchedulingContextRepository ¶ added in v0.3.62
func NewSchedulingContextRepository(maxJobSchedulingContextsPerExecutor uint) (*SchedulingContextRepository, error)
func (*SchedulingContextRepository) AddSchedulingContext ¶ added in v0.3.62
func (repo *SchedulingContextRepository) AddSchedulingContext(sctx *SchedulingContext) error
AddSchedulingContext adds a scheduling context to the repo. It also extracts the queue and job scheduling contexts it contains and stores those separately.
It's safe to call this method concurrently with itself and with methods getting contexts from the repo. It's not safe to mutate contexts once they've been provided to this method.
Job contexts are stored first, then queue contexts, and finally the scheduling context itself. This avoids having a stored scheduling (queue) context referring to a queue (job) context that isn't stored yet.
func (*SchedulingContextRepository) GetJobReport ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetJobReport(_ context.Context, jobId *schedulerobjects.JobId) (*schedulerobjects.JobReport, error)
GetJobReport is a gRPC endpoint for querying job reports. TODO: Further separate this from internal contexts.
func (*SchedulingContextRepository) GetMostRecentJobSchedulingContextByExecutor ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetMostRecentJobSchedulingContextByExecutor(jobId string) (JobSchedulingContextByExecutor, bool)
func (*SchedulingContextRepository) GetMostRecentQueueSchedulingContextByExecutor ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetMostRecentQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool)
func (*SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetMostRecentSchedulingContextByExecutor() SchedulingContextByExecutor
func (*SchedulingContextRepository) GetMostRecentSuccessfulQueueSchedulingContextByExecutor ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetMostRecentSuccessfulQueueSchedulingContextByExecutor(queue string) (QueueSchedulingContextByExecutor, bool)
func (*SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutor ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetMostRecentSuccessfulSchedulingContextByExecutor() SchedulingContextByExecutor
func (*SchedulingContextRepository) GetQueueReport ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetQueueReport(_ context.Context, queue *schedulerobjects.Queue) (*schedulerobjects.QueueReport, error)
GetQueueReport is a gRPC endpoint for querying queue reports. TODO: Further separate this from internal contexts.
func (*SchedulingContextRepository) GetSchedulingReport ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetSchedulingReport(_ context.Context, _ *types.Empty) (*schedulerobjects.SchedulingReport, error)
GetSchedulingReport is a gRPC endpoint for querying scheduler reports. TODO: Further separate this from internal contexts.
func (*SchedulingContextRepository) GetSortedExecutorIds ¶ added in v0.3.62
func (repo *SchedulingContextRepository) GetSortedExecutorIds() []string
type StandaloneLeaderController ¶
type StandaloneLeaderController struct {
// contains filtered or unexported fields
}
StandaloneLeaderController returns a token that always indicates you are leader This can be used when only a single instance of the scheduler is needed
func NewStandaloneLeaderController ¶
func NewStandaloneLeaderController() *StandaloneLeaderController
func (*StandaloneLeaderController) GetToken ¶
func (lc *StandaloneLeaderController) GetToken() LeaderToken
func (*StandaloneLeaderController) Run ¶ added in v0.3.47
func (lc *StandaloneLeaderController) Run(ctx context.Context) error
func (*StandaloneLeaderController) ValidateToken ¶
func (lc *StandaloneLeaderController) ValidateToken(tok LeaderToken) bool
type SubmitChecker ¶
type SubmitChecker struct {
// contains filtered or unexported fields
}
func NewSubmitChecker ¶
func NewSubmitChecker( executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, ) *SubmitChecker
func (*SubmitChecker) CheckApiJobs ¶
func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string)