Documentation
¶
Index ¶
- func GangIdAndCardinalityFromAnnotations(annotations map[string]string, ...) (string, int, bool, error)
- func NodeTypesMatchingPod(nodeTypes map[string]*schedulerobjects.NodeType, ...) ([]*schedulerobjects.NodeType, map[string]int, error)
- func PodRequirementsFromJob(j LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (*schedulerobjects.PodRequirements, error)
- func PodRequirementsFromJobs[T LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) ([]*schedulerobjects.PodRequirements, error)
- func PriorityFromJob(job *api.Job, ...) (priority int32, ok bool)
- func ResourceListAsWeightedApproximateFloat64(resourceScarcity map[string]float64, rl schedulerobjects.ResourceList) float64
- func Run(_ *Configuration) error
- type CandidateGangIterator
- type Configuration
- type ExecutorApi
- func (srv *ExecutorApi) RenewLease(ctx context.Context, req *api.RenewLeaseRequest) (*api.IdList, error)
- func (srv *ExecutorApi) ReportDone(ctx context.Context, req *api.IdList) (*api.IdList, error)
- func (srv *ExecutorApi) ReportUsage(ctx context.Context, req *api.ClusterUsageReport) (*types.Empty, error)
- func (srv *ExecutorApi) ReturnLease(ctx context.Context, req *api.ReturnLeaseRequest) (*types.Empty, error)
- func (srv *ExecutorApi) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
- type JobDb
- func (jobDb *JobDb) BatchDelete(txn *memdb.Txn, ids []string) error
- func (jobDb *JobDb) GetAll(txn *memdb.Txn) ([]*SchedulerJob, error)
- func (jobDb *JobDb) GetById(txn *memdb.Txn, id string) (*SchedulerJob, error)
- func (jobDb *JobDb) ReadTxn() *memdb.Txn
- func (jobDb *JobDb) Upsert(txn *memdb.Txn, jobs []*SchedulerJob) error
- func (jobDb *JobDb) WriteTxn() *memdb.Txn
- type JobIterator
- type JobQueueIterator
- type JobRepository
- type JobRepositoryAdapter
- type JobRun
- type JobSchedulingReport
- type LeaderController
- type LeaderToken
- type LegacyScheduler
- type LegacySchedulerJob
- type NodeDb
- func (nodeDb *NodeDb) BindNodeToPod(txn *memdb.Txn, req *schedulerobjects.PodRequirements, ...) error
- func (nodeDb *NodeDb) ClearAllocated() error
- func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)
- func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)
- func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)
- func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
- func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
- func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
- func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
- func (nodeDb *NodeDb) String() string
- func (nodeDb *NodeDb) TimeOfMostRecentUpsert() time.Time
- func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn
- func (nodeDb *NodeDb) Upsert(nodes []*schedulerobjects.Node) error
- type NodeItemAvailableResourceIndex
- type NodeTypeResourceIterator
- type NodeTypesResourceIterator
- type NodeTypesResourceIteratorItem
- type NodeTypesResourceIteratorPQ
- type NodesIterator
- type PodSchedulingReport
- type Publisher
- type QueueCandidateGangIterator
- type QueueSchedulingReport
- type QueueSchedulingRoundReport
- type QueuedGangIterator
- type QueuedJobsIterator
- type Scheduler
- type SchedulerJob
- func (job *SchedulerJob) CurrentRun() *JobRun
- func (job *SchedulerJob) DeepCopy() *SchedulerJob
- func (job *SchedulerJob) GetAnnotations() map[string]string
- func (job *SchedulerJob) GetId() string
- func (job *SchedulerJob) GetQueue() string
- func (job *SchedulerJob) InTerminalState() bool
- func (job *SchedulerJob) NumReturned() uint
- func (job *SchedulerJob) RunById(id uuid.UUID) *JobRun
- type SchedulerJobRepository
- type SchedulingAlgo
- type SchedulingConstraints
- type SchedulingReportsRepository
- func (repo *SchedulingReportsRepository[T]) Add(queueName string, report *JobSchedulingReport[T])
- func (repo *SchedulingReportsRepository[T]) AddMany(queueName string, reports []*JobSchedulingReport[T])
- func (repo *SchedulingReportsRepository[T]) AddSchedulingRoundReport(report *SchedulingRoundReport[T])
- func (repo *SchedulingReportsRepository[T]) GetJobReport(ctx context.Context, jobId *schedulerobjects.JobId) (*schedulerobjects.JobReport, error)
- func (repo *SchedulingReportsRepository[T]) GetJobSchedulingReport(jobId uuid.UUID) (*JobSchedulingReport[T], bool)
- func (repo *SchedulingReportsRepository[T]) GetQueueReport(ctx context.Context, queue *schedulerobjects.Queue) (*schedulerobjects.QueueReport, error)
- func (repo *SchedulingReportsRepository[T]) GetQueueSchedulingReport(queueName string) (*QueueSchedulingReport[T], bool)
- type SchedulingRoundReport
- type StandaloneLeaderController
- type SubmitChecker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GangIdAndCardinalityFromAnnotations ¶ added in v0.3.40
func NodeTypesMatchingPod ¶ added in v0.3.35
func NodeTypesMatchingPod(nodeTypes map[string]*schedulerobjects.NodeType, req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)
NodeTypesMatchingPod returns a slice composed of all node types a given pod could potentially be scheduled on.
func PodRequirementsFromJob ¶ added in v0.3.40
func PodRequirementsFromJob(j LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) (*schedulerobjects.PodRequirements, error)
func PodRequirementsFromJobs ¶ added in v0.3.40
func PodRequirementsFromJobs[T LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) ([]*schedulerobjects.PodRequirements, error)
func PriorityFromJob ¶ added in v0.3.35
func PriorityFromJob(job *api.Job, priorityByPriorityClassName map[string]configuration.PriorityClass) (priority int32, ok bool)
func ResourceListAsWeightedApproximateFloat64 ¶ added in v0.3.35
func ResourceListAsWeightedApproximateFloat64(resourceScarcity map[string]float64, rl schedulerobjects.ResourceList) float64
func Run ¶
func Run(_ *Configuration) error
Types ¶
type CandidateGangIterator ¶ added in v0.3.40
type CandidateGangIterator[T LegacySchedulerJob] struct { SchedulingConstraints SchedulingRoundReport *SchedulingRoundReport[T] // contains filtered or unexported fields }
CandidateGangIterator multiplexes between queues. Responsible for maintaining fair share and enforcing cross-queue scheduling constraints.
func (*CandidateGangIterator[T]) Next ¶ added in v0.3.40
func (it *CandidateGangIterator[T]) Next() ([]*JobSchedulingReport[T], error)
type Configuration ¶ added in v0.3.43
type Configuration struct { // Database configuration Postgres configuration.PostgresConfig // Metrics configuration Metrics configuration.MetricsConfig // General Pulsar configuration Pulsar configuration.PulsarConfig // Pulsar subscription name SubscriptionName string // Maximum time since the last batch before a batch will be inserted into the database BatchDuration time.Duration // Time for which the pulsar consumer will wait for a new message before retrying PulsarReceiveTimeout time.Duration // Time for which the pulsar consumer will back off after receiving an error on trying to receive a message PulsarBackoffTime time.Duration }
type ExecutorApi ¶
type ExecutorApi struct { api.UnimplementedAggregatedQueueServer Producer pulsar.Producer Db *pgxpool.Pool MaxJobsPerCall int32 // contains filtered or unexported fields }
func (*ExecutorApi) RenewLease ¶
func (srv *ExecutorApi) RenewLease(ctx context.Context, req *api.RenewLeaseRequest) (*api.IdList, error)
func (*ExecutorApi) ReportDone ¶
func (*ExecutorApi) ReportUsage ¶
func (srv *ExecutorApi) ReportUsage(ctx context.Context, req *api.ClusterUsageReport) (*types.Empty, error)
TODO: Does nothing for now.
func (*ExecutorApi) ReturnLease ¶
func (srv *ExecutorApi) ReturnLease(ctx context.Context, req *api.ReturnLeaseRequest) (*types.Empty, error)
func (*ExecutorApi) StreamingLeaseJobs ¶
func (srv *ExecutorApi) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
type JobDb ¶ added in v0.3.39
type JobDb struct { // In-memory database. Stores *SchedulerJob. // Used to efficiently iterate over jobs in sorted order. Db *memdb.MemDB }
JobDb is the scheduler-internal system for storing job queues. It allows for efficiently iterating over jobs in a specified queue sorted first by in-queue priority value (smaller to greater, since smaller values indicate higher priority), and second by submission time. JobDb is implemented on top of https://github.com/hashicorp/go-memdb which is a simple in-memory database built on immutable radix trees.
func (*JobDb) BatchDelete ¶ added in v0.3.45
BatchDelete removes the jobs with the given ids from the database. Any ids that are not in the database will be ignored
func (*JobDb) GetAll ¶ added in v0.3.45
func (jobDb *JobDb) GetAll(txn *memdb.Txn) ([]*SchedulerJob, error)
GetAll returns all jobs in the database. The Jobs returned by this function *must not* be subsequently modified
func (*JobDb) GetById ¶ added in v0.3.45
func (jobDb *JobDb) GetById(txn *memdb.Txn, id string) (*SchedulerJob, error)
GetById returns the job with the given Id or nil if no such job exists The Job returned by this function *must not* be subsequently modified
func (*JobDb) ReadTxn ¶ added in v0.3.45
func (jobDb *JobDb) ReadTxn() *memdb.Txn
ReadTxn returns a read-only transaction. Multiple read-only transactions can access the db concurrently
func (*JobDb) Upsert ¶ added in v0.3.39
func (jobDb *JobDb) Upsert(txn *memdb.Txn, jobs []*SchedulerJob) error
Upsert will insert the given jobs if they don't already exist or update the if they do Any jobs passed to this function *must not* be subsequently modified
type JobIterator ¶ added in v0.3.45
type JobIterator[T LegacySchedulerJob] interface { Next() (T, error) }
type JobQueueIterator ¶ added in v0.3.39
type JobQueueIterator struct {
// contains filtered or unexported fields
}
JobQueueIterator is an iterator over all jobs in a given queue. Jobs are sorted first by per-queue priority, and secondly by submission time.
func NewJobQueueIterator ¶ added in v0.3.39
func NewJobQueueIterator(txn *memdb.Txn, queue string) (*JobQueueIterator, error)
func (*JobQueueIterator) Next ¶ added in v0.3.39
func (it *JobQueueIterator) Next() interface{}
Next is needed to implement the memdb.ResultIterator interface. External callers should use NextJobItem which provides a typesafe mechanism for getting the next SchedulerJob
func (*JobQueueIterator) NextJobItem ¶ added in v0.3.39
func (it *JobQueueIterator) NextJobItem() *SchedulerJob
NextJobItem returns the next SchedulerJob or nil if the end of the iterator has been reached
func (*JobQueueIterator) WatchCh ¶ added in v0.3.39
func (it *JobQueueIterator) WatchCh() <-chan struct{}
WatchCh is needed to implement the memdb.ResultIterator interface but is not needed for our use case
type JobRepository ¶ added in v0.3.45
type JobRepositoryAdapter ¶ added in v0.3.45
type JobRepositoryAdapter struct {
// contains filtered or unexported fields
}
func NewJobRepositoryAdapter ¶ added in v0.3.45
func NewJobRepositoryAdapter(jobRepo repository.JobRepository) *JobRepositoryAdapter
func (*JobRepositoryAdapter) GetJobIterator ¶ added in v0.3.45
func (j *JobRepositoryAdapter) GetJobIterator(ctx context.Context, queue string) (JobIterator[*api.Job], error)
func (*JobRepositoryAdapter) TryLeaseJobs ¶ added in v0.3.45
type JobRun ¶ added in v0.3.45
type JobRun struct { // Unique identifier for the run RunID uuid.UUID // The name of the executor this run has been leased to Executor string // True if the job has been reported as pending by the executor Pending bool // True if the job has been reported as running by the executor Running bool // True if the job has been reported as succeeded by the executor Succeeded bool // True if the job has been reported as failed by the executor Failed bool // True if the job has been reported as cancelled by the executor Cancelled bool // True if the job has been returned by the executor Returned bool // True if the job has been expired by the scheduler Expired bool }
JobRun is the scheduler-internal representation of a job run.
func (*JobRun) DeepCopy ¶ added in v0.3.45
DeepCopy deep copies the entire JobRun This is needed because when runs are stored in the JobDb they cannot be modified in-place
func (*JobRun) InTerminalState ¶ added in v0.3.45
InTerminalState returns true if the JobRun is in a terminal state
type JobSchedulingReport ¶ added in v0.3.35
type JobSchedulingReport[T LegacySchedulerJob] struct { // Time at which this report was created. Timestamp time.Time // Id of the job this pod corresponds to. JobId uuid.UUID // Job spec. Job T // Scheduling requirements of this job. // We currently require that each job contains exactly one pod spec. Req *schedulerobjects.PodRequirements // Executor this job was attempted to be assigned to. ExecutorId string // Reason for why the job could not be scheduled. // Empty if the job was scheduled successfully. UnschedulableReason string // Scheduling reports for the individual pods that make up the job. PodSchedulingReports []*PodSchedulingReport }
JobSchedulingReport is created by the scheduler and contains information about the decision made by the scheduler for this job.
func (*JobSchedulingReport[T]) String ¶ added in v0.3.35
func (report *JobSchedulingReport[T]) String() string
type LeaderController ¶ added in v0.3.45
type LeaderController interface { // GetToken returns a LeaderToken which allows you to determine if you are leader or not GetToken() LeaderToken // ValidateToken allows a caller to determine whether a previously obtained token is still valid. // Returns true if the token is a leader and false otherwise ValidateToken(tok LeaderToken) bool }
LeaderController is an interface to be implemented by structs that control which scheduler is leader
type LeaderToken ¶ added in v0.3.45
type LeaderToken struct {
// contains filtered or unexported fields
}
LeaderToken is a token handed out to schedulers which they can use to determine if they are leader
func InvalidLeaderToken ¶ added in v0.3.45
func InvalidLeaderToken() LeaderToken
InvalidLeaderToken returns a LeaderToken which indicates the scheduler is not leader
func NewLeaderToken ¶ added in v0.3.45
func NewLeaderToken() LeaderToken
NewLeaderToken returns a LeaderToken which indicates the scheduler is leader
type LegacyScheduler ¶ added in v0.3.35
type LegacyScheduler[T LegacySchedulerJob] struct { SchedulingConstraints SchedulingRoundReport *SchedulingRoundReport[T] CandidateGangIterator *CandidateGangIterator[T] // Contains all nodes to be considered for scheduling. // Used for matching pods with nodes. NodeDb *NodeDb // Used to request jobs from Redis and to mark jobs as leased. JobRepository SchedulerJobRepository[T] // Jobs are grouped into gangs by this annotation. GangIdAnnotation string // Jobs in a gang specify the number of jobs in the gang via this annotation. GangCardinalityAnnotation string // contains filtered or unexported fields }
func NewLegacyScheduler ¶ added in v0.3.35
func NewLegacyScheduler[T LegacySchedulerJob]( ctx context.Context, constraints SchedulingConstraints, config configuration.SchedulingConfig, nodeDb *NodeDb, jobRepository SchedulerJobRepository[T], priorityFactorByQueue map[string]float64, initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, ) (*LegacyScheduler[T], error)
func (*LegacyScheduler[T]) Schedule ¶ added in v0.3.35
func (sched *LegacyScheduler[T]) Schedule() ([]T, error)
func (*LegacyScheduler[T]) String ¶ added in v0.3.35
func (sched *LegacyScheduler[T]) String() string
type LegacySchedulerJob ¶ added in v0.3.45
type NodeDb ¶ added in v0.3.35
type NodeDb struct {
// contains filtered or unexported fields
}
NodeDb is the scheduler-internal system for storing node information. It's used to efficiently find nodes on which a pod can be scheduled.
func (*NodeDb) BindNodeToPod ¶ added in v0.3.35
func (nodeDb *NodeDb) BindNodeToPod(txn *memdb.Txn, req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) error
func (*NodeDb) ClearAllocated ¶ added in v0.3.40
ClearAllocated zeroes out allocated resources on all nodes in the NodeDb.
func (*NodeDb) NodeTypesMatchingPod ¶ added in v0.3.35
func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)
NodeTypesMatchingPod returns a slice composed of all node types a given pod could potentially be scheduled on.
func (*NodeDb) ScheduleMany ¶ added in v0.3.40
func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)
ScheduleMany assigns a set of pods to nodes. The assignment is atomic, i.e., either all pods are successfully assigned to nodes or none are. The returned bool indicates whether assignment succeeded or not. TODO: Pass through contexts to support timeouts.
func (*NodeDb) ScheduleManyWithTxn ¶ added in v0.3.40
func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)
func (*NodeDb) SelectAndBindNodeToPod ¶ added in v0.3.35
func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
func (*NodeDb) SelectAndBindNodeToPodWithTxn ¶ added in v0.3.40
func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
func (*NodeDb) SelectNodeForPod ¶ added in v0.3.35
func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
func (*NodeDb) SelectNodeForPodWithTxn ¶ added in v0.3.40
func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)
SelectAndBindNodeToPod selects a node on which the pod can be scheduled, and updates the internal state of the db to indicate that this pod is bound to that node.
func (*NodeDb) TimeOfMostRecentUpsert ¶ added in v0.3.40
type NodeItemAvailableResourceIndex ¶ added in v0.3.35
type NodeItemAvailableResourceIndex struct { // Resource name, e.g., "cpu", "gpu", or "memory". Resource string // Job priority. Priority int32 }
func (*NodeItemAvailableResourceIndex) FromArgs ¶ added in v0.3.35
func (s *NodeItemAvailableResourceIndex) FromArgs(args ...interface{}) ([]byte, error)
FromArgs computes the index key from a set of arguments. Takes a single argument resourceAmount of type uint64.
func (*NodeItemAvailableResourceIndex) FromObject ¶ added in v0.3.35
func (s *NodeItemAvailableResourceIndex) FromObject(raw interface{}) (bool, []byte, error)
FromObject extracts the index key from a *NodeItem object.
type NodeTypeResourceIterator ¶ added in v0.3.35
type NodeTypeResourceIterator struct {
// contains filtered or unexported fields
}
NodeTypeResourceIterator is an iterator over all nodes of a given nodeType, for which there's at least some specified amount of a given resource available. For example, all nodes of type "foo" for which there's at least 1Gi of memory available.
Available resources is the sum of unused resources and resources assigned to lower-priority jobs. Nodes are returned in sorted order, going from least to most of the specified resource available.
func NewNodeTypeResourceIterator ¶ added in v0.3.35
func NewNodeTypeResourceIterator(txn *memdb.Txn, resource string, priority int32, nodeType *schedulerobjects.NodeType, resourceAmount resource.Quantity) (*NodeTypeResourceIterator, error)
func (*NodeTypeResourceIterator) Next ¶ added in v0.3.35
func (it *NodeTypeResourceIterator) Next() interface{}
func (*NodeTypeResourceIterator) NextNodeItem ¶ added in v0.3.35
func (it *NodeTypeResourceIterator) NextNodeItem() *schedulerobjects.Node
func (*NodeTypeResourceIterator) WatchCh ¶ added in v0.3.35
func (it *NodeTypeResourceIterator) WatchCh() <-chan struct{}
type NodeTypesResourceIterator ¶ added in v0.3.35
type NodeTypesResourceIterator struct {
// contains filtered or unexported fields
}
NodeTypesResourceIterator extends NodeTypeResourceIterator to iterate over nodes of several node types. Nodes are returned in sorted order, going from least to most of the specified resource available.
func NewNodeTypesResourceIterator ¶ added in v0.3.35
func NewNodeTypesResourceIterator(txn *memdb.Txn, resource string, priority int32, nodeTypes []*schedulerobjects.NodeType, resourceQuantity resource.Quantity) (*NodeTypesResourceIterator, error)
func (*NodeTypesResourceIterator) Next ¶ added in v0.3.35
func (it *NodeTypesResourceIterator) Next() interface{}
func (*NodeTypesResourceIterator) NextNodeItem ¶ added in v0.3.35
func (it *NodeTypesResourceIterator) NextNodeItem() *schedulerobjects.Node
func (*NodeTypesResourceIterator) WatchCh ¶ added in v0.3.35
func (it *NodeTypesResourceIterator) WatchCh() <-chan struct{}
type NodeTypesResourceIteratorItem ¶ added in v0.3.35
type NodeTypesResourceIteratorItem struct {
// contains filtered or unexported fields
}
type NodeTypesResourceIteratorPQ ¶ added in v0.3.35
type NodeTypesResourceIteratorPQ []*NodeTypesResourceIteratorItem
A priority queue used by NodeTypesResourceIterator to return results from across several sub-iterators in order.
func (NodeTypesResourceIteratorPQ) Len ¶ added in v0.3.35
func (pq NodeTypesResourceIteratorPQ) Len() int
func (NodeTypesResourceIteratorPQ) Less ¶ added in v0.3.35
func (pq NodeTypesResourceIteratorPQ) Less(i, j int) bool
func (*NodeTypesResourceIteratorPQ) Pop ¶ added in v0.3.35
func (pq *NodeTypesResourceIteratorPQ) Pop() any
func (*NodeTypesResourceIteratorPQ) Push ¶ added in v0.3.35
func (pq *NodeTypesResourceIteratorPQ) Push(x any)
func (NodeTypesResourceIteratorPQ) Swap ¶ added in v0.3.35
func (pq NodeTypesResourceIteratorPQ) Swap(i, j int)
type NodesIterator ¶ added in v0.3.40
type NodesIterator struct {
// contains filtered or unexported fields
}
NodesIterator is an iterator over all nodes in the db.
func NewNodesIterator ¶ added in v0.3.40
func NewNodesIterator(txn *memdb.Txn) (*NodesIterator, error)
func (*NodesIterator) Next ¶ added in v0.3.40
func (it *NodesIterator) Next() interface{}
func (*NodesIterator) NextNode ¶ added in v0.3.40
func (it *NodesIterator) NextNode() *schedulerobjects.Node
func (*NodesIterator) WatchCh ¶ added in v0.3.40
func (it *NodesIterator) WatchCh() <-chan struct{}
type PodSchedulingReport ¶ added in v0.3.35
type PodSchedulingReport struct { // Time at which this report was created. Timestamp time.Time // Pod scheduling requirements. Req *schedulerobjects.PodRequirements // Resource type determined by the scheduler to be the hardest to satisfy // the scheduling requirements for. DominantResourceType string // Node the pod was assigned to. // If nil, the pod could not be assigned to any Node. Node *schedulerobjects.Node // Score indicates how well the pod fits on the selected Node. Score int // Number of Node types that NumMatchedNodeTypes int // Number of Node types excluded by reason. NumExcludedNodeTypesByReason map[string]int // Number of nodes excluded by reason. NumExcludedNodesByReason map[string]int }
PodSchedulingReport is returned by SelectAndBindNodeToPod and contains detailed information on the scheduling decision made for this pod.
func (*PodSchedulingReport) String ¶ added in v0.3.35
func (report *PodSchedulingReport) String() string
type Publisher ¶ added in v0.3.45
type Publisher interface { // PublishMessages will publish the supplied messages. A LeaderToken is provided and the // implementor may decide whether or not to publish based on the status of this token PublishMessages(ctx context.Context, events []*armadaevents.EventSequence, token LeaderToken) error // PublishMarkers publishes a single marker message for each Pulsar partition. Each marker // massage contains the supplied group id, which allows nall marker messages for a given call // to be identified. The uint32 returned is the number of messages published PublishMarkers(ctx context.Context, groupId uuid.UUID) (uint32, error) }
Publisher is an interface to be implemented by structs that handle publishing messages to pulsar
type QueueCandidateGangIterator ¶ added in v0.3.40
type QueueCandidateGangIterator[T LegacySchedulerJob] struct { SchedulingConstraints QueueSchedulingRoundReport *QueueSchedulingRoundReport[T] // contains filtered or unexported fields }
QueueCandidateGangIterator is an iterator over gangs in a queue that could be scheduled without exceeding per-queue limits.
func (*QueueCandidateGangIterator[T]) Next ¶ added in v0.3.40
func (it *QueueCandidateGangIterator[T]) Next() ([]*JobSchedulingReport[T], error)
type QueueSchedulingReport ¶ added in v0.3.35
type QueueSchedulingReport[T LegacySchedulerJob] struct { // Queue name. Name string MostRecentSuccessfulJobSchedulingReport *JobSchedulingReport[T] MostRecentUnsuccessfulJobSchedulingReport *JobSchedulingReport[T] }
QueueSchedulingReport contains job scheduling reports for the most recent successful and failed scheduling attempts for this queue.
func (*QueueSchedulingReport[T]) String ¶ added in v0.3.35
func (report *QueueSchedulingReport[T]) String() string
type QueueSchedulingRoundReport ¶ added in v0.3.40
type QueueSchedulingRoundReport[T LegacySchedulerJob] struct { // These factors influence the fraction of resources assigned to each queue. PriorityFactor float64 // Resources assigned to the queue across all clusters at the start of the scheduling cycle. InitialResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Resources assigned to this queue during this scheduling cycle. ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Reports for all successful job scheduling attempts. SuccessfulJobSchedulingReports map[uuid.UUID]*JobSchedulingReport[T] // Reports for all unsuccessful job scheduling attempts. UnsuccessfulJobSchedulingReports map[uuid.UUID]*JobSchedulingReport[T] // Total number of jobs successfully scheduled in this round for this queue. NumScheduledJobs int // contains filtered or unexported fields }
QueueSchedulingRoundReport captures the decisions made by the scheduler during one invocation for a particular queue.
func NewQueueSchedulingRoundReport ¶ added in v0.3.40
func NewQueueSchedulingRoundReport[T LegacySchedulerJob](priorityFactor float64, initialResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType) *QueueSchedulingRoundReport[T]
func (*QueueSchedulingRoundReport[T]) AddJobSchedulingReport ¶ added in v0.3.40
func (report *QueueSchedulingRoundReport[T]) AddJobSchedulingReport(r *JobSchedulingReport[T])
Add a job scheduling report to the report for this invocation of the scheduler. Automatically updates scheduled resources by calling AddScheduledResources. Is thread-safe.
func (*QueueSchedulingRoundReport[T]) ClearJobSpecs ¶ added in v0.3.40
func (report *QueueSchedulingRoundReport[T]) ClearJobSpecs()
ClearJobSpecs zeroes out job specs to reduce memory usage.
type QueuedGangIterator ¶ added in v0.3.40
type QueuedGangIterator[T LegacySchedulerJob] struct { // contains filtered or unexported fields }
QueuedGangIterator is an iterator over all gangs in a queue, where a gang is a set of jobs for which the gangIdAnnotation has equal value. A gang is yielded once the final member of the gang has been received. Jobs without gangIdAnnotation are considered to be gangs of cardinality 1.
func NewQueuedGangIterator ¶ added in v0.3.40
func NewQueuedGangIterator[T LegacySchedulerJob](ctx context.Context, it JobIterator[T], gangIdAnnotation, gangCardinalityAnnotation string) *QueuedGangIterator[T]
func (*QueuedGangIterator[T]) Next ¶ added in v0.3.40
func (it *QueuedGangIterator[T]) Next() ([]T, error)
type QueuedJobsIterator ¶ added in v0.3.35
type QueuedJobsIterator struct {
// contains filtered or unexported fields
}
QueuedJobsIterator is an iterator over all jobs in a queue. It lazily loads jobs in batches from Redis asynch.
func NewQueuedJobsIterator ¶ added in v0.3.35
func NewQueuedJobsIterator(ctx context.Context, queue string, repo JobRepository) (*QueuedJobsIterator, error)
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is the main armada Scheduler. It runs a periodic scheduling cycle during which the following actions are performed: * Determine if we are leader * Update internal state from postgres (via the jobRepository) * If Leader:
- Generate any armada events resulting from the state update
- Expire any jobs that are running on stale clusters
- Attempt to schedule jobs from the queue
- Publish any armada events resulting from the cycle to Pulsar
func NewScheduler ¶
func NewScheduler( jobRepository database.JobRepository, executorRepository database.ExecutorRepository, schedulingAlgo SchedulingAlgo, leaderController LeaderController, publisher Publisher, cyclePeriod time.Duration, executorTimeout time.Duration, maxLeaseReturns uint, ) (*Scheduler, error)
type SchedulerJob ¶ added in v0.3.39
type SchedulerJob struct { // String representation of the job id JobId string // Name of the queue this job belongs to. Queue string // Jobset the job belongs to // We store this so we can send messages about the job Jobset string // Per-queue priority of this job. Priority uint32 // Logical timestamp indicating the order in which jobs are submitted. // Jobs with identical Queue and Priority // are sorted by timestamp. Timestamp int64 // Name of the executor to which this job has been assigned. // Empty if this job has not yet been assigned. Executor string // Name of the node to which this job has been assigned. // Empty if this job has not yet been assigned. Node string // True if the job is currently queued. // If this is set then the job will not be considered for scheduling Queued bool // True if the user has requested this job be cancelled CancelRequested bool // True if the scheduler has cancelled the job Cancelled bool // True if the scheduler has failed the job Failed bool // True if the scheduler has marked the job as succeeded Succeeded bool // Job Runs in the order they were received. // For now there can be only one active job run which will be the last element of the slice Runs []*JobRun // contains filtered or unexported fields }
SchedulerJob is the scheduler-internal representation of a job.
func (*SchedulerJob) CurrentRun ¶ added in v0.3.45
func (job *SchedulerJob) CurrentRun() *JobRun
CurrentRun returns the currently active job run or nil if there are no runs yet
func (*SchedulerJob) DeepCopy ¶ added in v0.3.45
func (job *SchedulerJob) DeepCopy() *SchedulerJob
DeepCopy deep copies the entire job including the runs. This is needed because when jobs are stored in the JobDb they cannot be modified in-place
func (*SchedulerJob) GetAnnotations ¶ added in v0.3.45
func (job *SchedulerJob) GetAnnotations() map[string]string
GetAnnotations returns the annotations on the job.
func (*SchedulerJob) GetId ¶ added in v0.3.45
func (job *SchedulerJob) GetId() string
GetId returns the id of the Job.
func (*SchedulerJob) GetQueue ¶ added in v0.3.45
func (job *SchedulerJob) GetQueue() string
GetQueue returns the queue this job belongs to.
func (*SchedulerJob) InTerminalState ¶ added in v0.3.45
func (job *SchedulerJob) InTerminalState() bool
InTerminalState returns true if the job is in a terminal state
func (*SchedulerJob) NumReturned ¶ added in v0.3.45
func (job *SchedulerJob) NumReturned() uint
NumReturned returns the number of times this job has been returned by executors Note that this is O(N) on Runs, but this should be fine as the number of runs should be small
type SchedulerJobRepository ¶ added in v0.3.35
type SchedulerJobRepository[T LegacySchedulerJob] interface { // GetJobIterator returns a iterator over queued jobs for a given queue. GetJobIterator(ctx context.Context, queue string) (JobIterator[T], error) // TryLeaseJobs tries to create jobs leases and returns the jobs that were successfully leased. // Leasing may fail, e.g., if the job was concurrently leased to another executor. TryLeaseJobs(clusterId string, queue string, jobs []T) ([]T, error) }
SchedulerJobRepository represents the underlying jobs database.
type SchedulingAlgo ¶ added in v0.3.45
type SchedulingAlgo interface { // Schedule should assign jobs to nodes // Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided // It should return a slice containing all scheduled jobs. Schedule(txn *memdb.Txn, jobDb *JobDb) ([]*SchedulerJob, error) }
SchedulingAlgo is an interface that should bne implemented by structs capable of assigning Jobs to nodes
type SchedulingConstraints ¶ added in v0.3.40
type SchedulingConstraints struct { Priorities []int32 PriorityClasses map[string]configuration.PriorityClass // Executor for which we're currently scheduling jobs. ExecutorId string // Resource pool of this executor. Pool string // Weights used when computing total resource usage. ResourceScarcity map[string]float64 // Max number of jobs to scheduler per lease jobs call. MaximumJobsToSchedule uint // Max number of consecutive unschedulable jobs to consider for a queue before giving up. MaxConsecutiveUnschedulableJobs uint // Jobs leased to this executor must be at least this large. // Used, e.g., to avoid scheduling CPU-only jobs onto clusters with GPUs. MinimumJobSize schedulerobjects.ResourceList // Per-queue resource limits. // Map from resource type to the limit for that resource. MaximalResourceFractionPerQueue map[string]float64 // Limit- as a fraction of total resources across worker clusters- of resource types at each priority. // The limits are cumulative, i.e., the limit at priority p includes all higher levels. MaximalCumulativeResourceFractionPerQueueAndPriority map[int32]map[string]float64 // Max resources to schedule per queue at a time. MaximalResourceFractionToSchedulePerQueue map[string]float64 // Max resources to schedule at a time. MaximalResourceFractionToSchedule map[string]float64 // Total resources across all worker clusters. // Used when computing resource limits. TotalResources schedulerobjects.ResourceList }
SchedulingConstraints collects scheduling constraints, e.g., per-queue resource limits.
func SchedulingConstraintsFromSchedulingConfig ¶ added in v0.3.40
func SchedulingConstraintsFromSchedulingConfig( executorId, pool string, minimumJobSize schedulerobjects.ResourceList, config configuration.SchedulingConfig, totalResources schedulerobjects.ResourceList, ) *SchedulingConstraints
type SchedulingReportsRepository ¶ added in v0.3.35
type SchedulingReportsRepository[T LegacySchedulerJob] struct { // Scheduling reports for the jobs that were most recently attempted to be scheduled. MostRecentJobSchedulingReports *lru.Cache // Scheduling reports for the most recently seen queues. MostRecentQueueSchedulingReports *lru.Cache }
SchedulingReportsRepository stores reports on the most recent scheduling attempts.
func NewSchedulingReportsRepository ¶ added in v0.3.35
func NewSchedulingReportsRepository[T LegacySchedulerJob](maxQueueSchedulingReports, maxJobSchedulingReports int) *SchedulingReportsRepository[T]
func (*SchedulingReportsRepository[T]) Add ¶ added in v0.3.35
func (repo *SchedulingReportsRepository[T]) Add(queueName string, report *JobSchedulingReport[T])
func (*SchedulingReportsRepository[T]) AddMany ¶ added in v0.3.40
func (repo *SchedulingReportsRepository[T]) AddMany(queueName string, reports []*JobSchedulingReport[T])
func (*SchedulingReportsRepository[T]) AddSchedulingRoundReport ¶ added in v0.3.40
func (repo *SchedulingReportsRepository[T]) AddSchedulingRoundReport(report *SchedulingRoundReport[T])
func (*SchedulingReportsRepository[T]) GetJobReport ¶ added in v0.3.35
func (repo *SchedulingReportsRepository[T]) GetJobReport(ctx context.Context, jobId *schedulerobjects.JobId) (*schedulerobjects.JobReport, error)
func (*SchedulingReportsRepository[T]) GetJobSchedulingReport ¶ added in v0.3.35
func (repo *SchedulingReportsRepository[T]) GetJobSchedulingReport(jobId uuid.UUID) (*JobSchedulingReport[T], bool)
func (*SchedulingReportsRepository[T]) GetQueueReport ¶ added in v0.3.35
func (repo *SchedulingReportsRepository[T]) GetQueueReport(ctx context.Context, queue *schedulerobjects.Queue) (*schedulerobjects.QueueReport, error)
func (*SchedulingReportsRepository[T]) GetQueueSchedulingReport ¶ added in v0.3.35
func (repo *SchedulingReportsRepository[T]) GetQueueSchedulingReport(queueName string) (*QueueSchedulingReport[T], bool)
type SchedulingRoundReport ¶ added in v0.3.35
type SchedulingRoundReport[T LegacySchedulerJob] struct { // Time at which the scheduling cycle started. Started time.Time // Time at which the scheduling cycle finished. Finished time.Time // Executor for which the scheduler was invoked. Executor string // Per-queue scheduling reports. QueueSchedulingRoundReports map[string]*QueueSchedulingRoundReport[T] // Total resources across all clusters available at the start of the scheduling cycle. TotalResources schedulerobjects.ResourceList // Resources assigned across all queues during this scheduling cycle. ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType // Total number of jobs successfully scheduled in this round. NumScheduledJobs int // Reason for why the scheduling round finished. TerminationReason string // contains filtered or unexported fields }
SchedulingRoundReport captures the decisions made by the scheduler during one invocation.
func NewSchedulingRoundReport ¶ added in v0.3.35
func NewSchedulingRoundReport[T LegacySchedulerJob]( totalResources schedulerobjects.ResourceList, priorityFactorByQueue map[string]float64, initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType, ) *SchedulingRoundReport[T]
func (*SchedulingRoundReport[T]) AddJobSchedulingReport ¶ added in v0.3.35
func (report *SchedulingRoundReport[T]) AddJobSchedulingReport(r *JobSchedulingReport[T])
AddJobSchedulingReport adds a job scheduling report to the report for this invocation of the scheduler. Automatically updates scheduled resources by calling AddScheduledResources. Is thread-safe.
func (*SchedulingRoundReport[T]) ClearJobSpecs ¶ added in v0.3.35
func (report *SchedulingRoundReport[T]) ClearJobSpecs()
ClearJobSpecs zeroes out job specs to reduce memory usage.
func (*SchedulingRoundReport[T]) String ¶ added in v0.3.35
func (report *SchedulingRoundReport[T]) String() string
type StandaloneLeaderController ¶ added in v0.3.45
type StandaloneLeaderController struct {
// contains filtered or unexported fields
}
StandaloneLeaderController returns a token that always indicates you are leader This can be used when only a single instance of the scheduler is needed
func NewStandaloneLeaderController ¶ added in v0.3.45
func NewStandaloneLeaderController() *StandaloneLeaderController
func (*StandaloneLeaderController) GetToken ¶ added in v0.3.45
func (lc *StandaloneLeaderController) GetToken() LeaderToken
func (*StandaloneLeaderController) ValidateToken ¶ added in v0.3.45
func (lc *StandaloneLeaderController) ValidateToken(tok LeaderToken) bool
type SubmitChecker ¶ added in v0.3.40
type SubmitChecker struct {
// contains filtered or unexported fields
}
func NewSubmitChecker ¶ added in v0.3.40
func NewSubmitChecker(executorTimeout time.Duration, priorityClasses map[string]configuration.PriorityClass, gangIdAnnotation string) *SubmitChecker
func (*SubmitChecker) Check ¶ added in v0.3.40
func (srv *SubmitChecker) Check(reqs []*schedulerobjects.PodRequirements) (bool, string)
Check if a set of pods can be scheduled onto some cluster.
func (*SubmitChecker) CheckApiJobs ¶ added in v0.3.40
func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string)
func (*SubmitChecker) RegisterNodeDb ¶ added in v0.3.40
func (srv *SubmitChecker) RegisterNodeDb(executor string, nodeDb *NodeDb)
RegisterNodeDb adds a NodeDb to use when checking if a pod can be scheduled. To only check static scheduling requirements, set NodeDb.CheckOnlyStaticRequirements = true before registering it.