scheduler

package
v0.3.53 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2023 License: Apache-2.0 Imports: 64 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TargetNodeIdAnnotation if set on a pod, the value of this annotation is interpreted as the id of a node
	// and only the node with that id will be considered for scheduling the pod.
	TargetNodeIdAnnotation = "armadaproject.io/targetNodeId"
	// IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running.
	// Used by the scheduler to differentiate between pods from running and queued jobs.
	IsEvictedAnnotation = "armadaproject.io/isEvicted"
	// JobIdAnnotation if set on a pod, indicates which job this pod is part of.
	JobIdAnnotation = "armadaproject.io/jobId"
	// QueueAnnotation if set on a pod, indicates which queue this pod is part of.
	QueueAnnotation = "armadaproject.io/queue"
)
View Source
const NodeDominantQueueWildcard = "*"

Variables

Functions

func BindPodToNode added in v0.3.50

BindPodToNode returns a copy of node with req bound to it.

func Evict added in v0.3.50

func Evict(
	it NodeIterator,
	jobRepo JobRepository,
	priorityClasses map[string]configuration.PriorityClass,
	nodeFilter func(*schedulerobjects.Node) bool,
	jobFilter func(LegacySchedulerJob) bool,
	postEvictFunc func(LegacySchedulerJob, *schedulerobjects.Node),
) (map[string]LegacySchedulerJob, map[string]*schedulerobjects.Node, error)

Evict removes jobs from nodes, returning all affected jobs and nodes. Any node for which nodeFilter returns false is skipped. Any job for which jobFilter returns true is evicted (if the node was not skipped). If a job was evicted from a node, postEvictFunc is called with the corresponding job and node.

func EvictOversubscribed added in v0.3.50

func EvictOversubscribed(
	ctx context.Context,
	it NodeIterator,
	jobRepo JobRepository,
	priorityClasses map[string]configuration.PriorityClass,
	evictionProbability float64,
) (map[string]LegacySchedulerJob, map[string]*schedulerobjects.Node, error)

EvictOversubscribed evicts from all nodes any jobs of a priority class for which at least one job could not be scheduled.

func EvictPreemptible added in v0.3.50

func EvictPreemptible(
	ctx context.Context,
	it NodeIterator,
	jobRepo JobRepository,
	priorityClasses map[string]configuration.PriorityClass,
	defaultPriorityClass string,
	evictionProbability float64,
) (map[string]LegacySchedulerJob, map[string]*schedulerobjects.Node, error)

EvictPreemptible evicts from all nodes any jobs of a priority class marked as preemptible.

func GangIdAndCardinalityFromAnnotations

func GangIdAndCardinalityFromAnnotations(annotations map[string]string, gangIdAnnotation, gangCardinalityAnnotation string) (string, int, bool, error)

func GangIdAndCardinalityFromLegacySchedulerJob added in v0.3.47

func GangIdAndCardinalityFromLegacySchedulerJob(job LegacySchedulerJob, gangIdAnnotation, gangCardinalityAnnotation string, priorityClasses map[string]configuration.PriorityClass) (string, int, bool, error)

func GroupJobsByAnnotation added in v0.3.49

func GroupJobsByAnnotation(annotation string, jobs []*api.Job) map[string][]*api.Job

func JavaStringHash

func JavaStringHash(s string) uint32

JavaStringHash is the default hashing algorithm used by Pulsar copied from https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/hash.go

func JobIdFromPodRequirements added in v0.3.50

func JobIdFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)

func JobsSummary added in v0.3.50

func JobsSummary(jobs []LegacySchedulerJob) string

func NodeJobDiff added in v0.3.50

func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*schedulerobjects.Node, map[string]*schedulerobjects.Node, error)

NodeJobDiff compares two snapshots of the NodeDb memdb and returns - a map from job ids of all preempted jobs to the node they used to be on - a map from job ids of all scheduled jobs to the node they were scheduled on that happened between the two snapshots.

func NodeTypesMatchingPod

func NodeTypesMatchingPod(nodeTypes map[string]*schedulerobjects.NodeType, req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)

NodeTypesMatchingPod returns a slice composed of all node types a given pod could potentially be scheduled on.

func PodRequirementFromJobSchedulingInfo added in v0.3.47

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements

func PodRequirementFromLegacySchedulerJob added in v0.3.50

func PodRequirementFromLegacySchedulerJob[E LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements

func PodRequirementsFromJobSchedulingInfos added in v0.3.47

func PodRequirementsFromJobSchedulingInfos(infos []*schedulerobjects.JobSchedulingInfo) []*schedulerobjects.PodRequirements

func PodRequirementsFromLegacySchedulerJobs added in v0.3.47

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements

func QueueFromPodRequirements added in v0.3.50

func QueueFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error)

func Reschedule added in v0.3.50

func Reschedule(
	ctx context.Context,
	jobRepo JobRepository,
	constraints SchedulingConstraints,
	config configuration.SchedulingConfig,
	nodeDb *NodeDb,
	priorityFactorByQueue map[string]float64,
	initialUsageByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType,
	nodePreemptibleEvictionProbability float64,
	nodeOversubscribedEvictionProbability float64,
	schedulingReportsRepository *SchedulingReportsRepository,
) ([]LegacySchedulerJob, []LegacySchedulerJob, map[string]*schedulerobjects.Node, map[string]schedulerobjects.QuantityByPriorityAndResourceType, error)

Reschedule - preempts jobs belonging to queues with total allocation above their fair share and - schedules new jobs belonging to queues with total allocation less than their fair share. Returns: - Slice of jobs to preempt. - Slice of jobs to schedule. - Map from job id to node the job was preempted on (scheduled onto). - Total resource usage per queue, accounting for preempted/scheduled jobs.

func ResourceListAsWeightedApproximateFloat64

func ResourceListAsWeightedApproximateFloat64(resourceScarcity map[string]float64, rl schedulerobjects.ResourceList) float64

func Run

func Run(config Configuration) error

Run sets up a Scheduler application and runs it until a SIGTERM is received

func UnbindPodFromNode added in v0.3.50

UnbindPodFromNode returns a copy of node with req unbound from it.

func UpdateUsage added in v0.3.50

Types

type AddOrSubtract added in v0.3.50

type AddOrSubtract int
const (
	Add AddOrSubtract = iota
	Subtract
)

type CandidateGangIterator

type CandidateGangIterator struct {
	SchedulingConstraints
	SchedulingRoundReport *SchedulingRoundReport
	// contains filtered or unexported fields
}

CandidateGangIterator multiplexes between queues. Responsible for maintaining fair share and enforcing cross-queue scheduling constraints.

func NewCandidateGangIterator added in v0.3.47

func NewCandidateGangIterator(
	schedulingConstraints SchedulingConstraints,
	schedulingRoundReport *SchedulingRoundReport,
	ctx context.Context,
	iteratorsByQueue map[string]*QueueCandidateGangIterator,
	priorityFactorByQueue map[string]float64,
) (*CandidateGangIterator, error)

func (*CandidateGangIterator) Clear added in v0.3.47

func (it *CandidateGangIterator) Clear() error

func (*CandidateGangIterator) Next

func (*CandidateGangIterator) Peek added in v0.3.47

type Configuration

type Configuration struct {
	// Database configuration
	Postgres configuration.PostgresConfig
	// Redis Comnfig
	Redis config.RedisConfig
	// General Pulsar configuration
	Pulsar configuration.PulsarConfig
	// Configuration controlling leader election
	Leader LeaderConfig
	// Scheduler configuration (this is shared with the old scheduler)
	Scheduling configuration.SchedulingConfig
	Auth       authconfig.AuthConfig
	Grpc       grpcconfig.GrpcConfig
	// Maximum number of strings that should be cached at any one time
	InternedStringsCacheSize uint32 `validate:"required"`
	// How often the scheduling cycle should run
	CyclePeriod time.Duration `validate:"required"`
	// How long after a heartbeat an executor will be considered lost
	ExecutorTimeout time.Duration `validate:"required"`
	// Maximum number of rows to fetch in a given query
	DatabaseFetchSize int `validate:"required"`
	// Timeout to use when sending messages to pulsar
	PulsarSendTimeout time.Duration `validate:"required"`
}

type ExecutorApi

type ExecutorApi struct {
	// contains filtered or unexported fields
}

ExecutorApi is a gRPC service that exposes functionality required by the armada executors

func NewExecutorApi

func NewExecutorApi(producer pulsar.Producer,
	jobRepository database.JobRepository,
	executorRepository database.ExecutorRepository,
	legacyExecutorRepository database.ExecutorRepository,
	allowedPriorities []int32,
	maxJobsPerCall uint,
	nodeIdLabel string,
) (*ExecutorApi, error)

func (*ExecutorApi) LeaseJobRuns

func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error

LeaseJobRuns performs the following actions:

  • Stores the request in postgres so that the scheduler can use the job + capacity information in the next scheduling round
  • Determines if any of the job runs in the request are no longer active and should be cancelled
  • Determines if any new job runs should be leased to the executor

func (*ExecutorApi) ReportEvents

func (srv *ExecutorApi) ReportEvents(ctx context.Context, list *executorapi.EventList) (*types.Empty, error)

ReportEvents publishes all events to pulsar. The events are compacted for more efficient publishing

type InMemoryJobIterator added in v0.3.50

type InMemoryJobIterator struct {
	// contains filtered or unexported fields
}

func NewInMemoryJobIterator added in v0.3.50

func NewInMemoryJobIterator[S ~[]E, E LegacySchedulerJob](jobs S) *InMemoryJobIterator

func (*InMemoryJobIterator) Next added in v0.3.50

type InMemoryJobRepository added in v0.3.50

type InMemoryJobRepository struct {
	// contains filtered or unexported fields
}

func NewInMemoryJobRepository added in v0.3.50

func NewInMemoryJobRepository(priorityClasses map[string]configuration.PriorityClass) *InMemoryJobRepository

func (*InMemoryJobRepository) Enqueue added in v0.3.50

func (repo *InMemoryJobRepository) Enqueue(job LegacySchedulerJob)

func (*InMemoryJobRepository) EnqueueMany added in v0.3.50

func (repo *InMemoryJobRepository) EnqueueMany(jobs []LegacySchedulerJob)

func (*InMemoryJobRepository) GetExistingJobsByIds added in v0.3.50

func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]LegacySchedulerJob, error)

func (*InMemoryJobRepository) GetJobIterator added in v0.3.50

func (repo *InMemoryJobRepository) GetJobIterator(ctx context.Context, queue string) (JobIterator, error)

func (*InMemoryJobRepository) GetQueueJobIds added in v0.3.50

func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error)

type JobIterator

type JobIterator interface {
	Next() (LegacySchedulerJob, error)
}

type JobQueueIteratorAdapter added in v0.3.47

type JobQueueIteratorAdapter struct {
	// contains filtered or unexported fields
}

func (*JobQueueIteratorAdapter) Next added in v0.3.47

type JobRepository

type JobRepository interface {
	GetQueueJobIds(queueName string) ([]string, error)
	GetExistingJobsByIds(ids []string) ([]LegacySchedulerJob, error)
}

type JobSchedulingReport

type JobSchedulingReport struct {
	// Time at which this report was created.
	Timestamp time.Time
	// Id of the job this pod corresponds to.
	JobId uuid.UUID
	// Job spec.
	Job LegacySchedulerJob
	// Scheduling requirements of this job.
	// We currently require that each job contains exactly one pod spec.
	Req *schedulerobjects.PodRequirements
	// Executor this job was attempted to be assigned to.
	ExecutorId string
	// Reason for why the job could not be scheduled.
	// Empty if the job was scheduled successfully.
	UnschedulableReason string
	// Scheduling reports for the individual pods that make up the job.
	PodSchedulingReports []*PodSchedulingReport
}

JobSchedulingReport is created by the scheduler and contains information about the decision made by the scheduler for this job.

func (*JobSchedulingReport) String

func (report *JobSchedulingReport) String() string

type KubernetesLeaderController

type KubernetesLeaderController struct {
	// contains filtered or unexported fields
}

KubernetesLeaderController uses the Kubernetes leader election mechanism to determine who is leader. This allows multiple instances of the scheduler to be run for high availability.

TODO: Move into package in common.

func (*KubernetesLeaderController) GetToken

func (lc *KubernetesLeaderController) GetToken() LeaderToken

func (*KubernetesLeaderController) Run

Run starts the controller. This is a blocking call that returns when the provided context is cancelled.

func (*KubernetesLeaderController) ValidateToken

func (lc *KubernetesLeaderController) ValidateToken(tok LeaderToken) bool

type LeaderConfig

type LeaderConfig struct {
	// Valid modes are "standalone" or "cluster"
	Mode string `validate:"required"`
	// Name of the K8s Lock Object
	LeaseLockName string
	// Namespace of the K8s Lock Object
	LeaseLockNamespace string
	// The name of the pod
	PodName string
	// How long the lease is held for.
	// Non leaders much wait this long before trying to acquire the lease
	LeaseDuration time.Duration
	// RenewDeadline is the duration that the acting leader will retry refreshing leadership before giving up.
	RenewDeadline time.Duration
	// RetryPeriod is the duration the LeaderElector clients should waite between tries of actions.
	RetryPeriod time.Duration
}

type LeaderController

type LeaderController interface {
	// GetToken returns a LeaderToken which allows you to determine if you are leader or not
	GetToken() LeaderToken
	// ValidateToken allows a caller to determine whether a previously obtained token is still valid.
	// Returns true if the token is a leader and false otherwise
	ValidateToken(tok LeaderToken) bool
	// Run starts the controller.  This is a blocking call which will return when the provided context is cancelled
	Run(ctx context.Context) error
}

LeaderController is an interface to be implemented by structs that control which scheduler is leader

type LeaderToken

type LeaderToken struct {
	// contains filtered or unexported fields
}

LeaderToken is a token handed out to schedulers which they can use to determine if they are leader

func InvalidLeaderToken

func InvalidLeaderToken() LeaderToken

InvalidLeaderToken returns a LeaderToken indicating this instance is not leader.

func NewLeaderToken

func NewLeaderToken() LeaderToken

NewLeaderToken returns a LeaderToken indicating this instance is the leader.

type LeaseListener

type LeaseListener interface {
	// contains filtered or unexported methods
}

LeaseListener allows clients to listen for lease events.

type LegacyScheduler

type LegacyScheduler struct {
	SchedulingConstraints
	SchedulingRoundReport *SchedulingRoundReport
	CandidateGangIterator *CandidateGangIterator
	// Contains all nodes to be considered for scheduling.
	// Used for matching pods with nodes.
	NodeDb *NodeDb
	// contains filtered or unexported fields
}

func NewLegacyScheduler

func NewLegacyScheduler(
	ctx context.Context,
	constraints SchedulingConstraints,
	config configuration.SchedulingConfig,
	nodeDb *NodeDb,
	queues []*Queue,
	initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType,
) (*LegacyScheduler, error)

func (*LegacyScheduler) Schedule

func (sched *LegacyScheduler) Schedule() ([]LegacySchedulerJob, error)

func (*LegacyScheduler) String

func (sched *LegacyScheduler) String() string

type LegacySchedulerJob

type LegacySchedulerJob interface {
	GetId() string
	GetQueue() string
	GetJobSet() string
	GetAnnotations() map[string]string
	GetRequirements(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo
}

type LegacySchedulingAlgo

type LegacySchedulingAlgo struct {
	// contains filtered or unexported fields
}

LegacySchedulingAlgo is a SchedulingAlgo that schedules jobs in the same way as the old lease call

func NewLegacySchedulingAlgo

func NewLegacySchedulingAlgo(
	config configuration.SchedulingConfig,
	executorRepository database.ExecutorRepository,
	queueRepository database.QueueRepository,
) *LegacySchedulingAlgo

func (*LegacySchedulingAlgo) Schedule

func (l *LegacySchedulingAlgo) Schedule(ctx context.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) ([]*jobdb.Job, error)

Schedule assigns jobs to nodes in the same way as the old lease call. It iterates over each executor in turn (using a random order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.

type MultiJobsIterator added in v0.3.47

type MultiJobsIterator struct {
	// contains filtered or unexported fields
}

MultiJobsIterator chains several JobIterators together, emptying them in the order provided.

func NewMultiJobsIterator added in v0.3.47

func NewMultiJobsIterator(its ...JobIterator) *MultiJobsIterator

func (*MultiJobsIterator) Next added in v0.3.47

type NodeAvailableResourceIndex added in v0.3.50

type NodeAvailableResourceIndex struct {
	// Resource name, e.g., "cpu", "gpu", or "memory".
	Resource string
	// Job priority.
	Priority int32
}

func (*NodeAvailableResourceIndex) FromArgs added in v0.3.50

func (index *NodeAvailableResourceIndex) FromArgs(args ...interface{}) ([]byte, error)

FromArgs computes the index key from a set of arguments. Takes a single argument resourceAmount of type resource.Quantity.

func (*NodeAvailableResourceIndex) FromObject added in v0.3.50

func (index *NodeAvailableResourceIndex) FromObject(raw interface{}) (bool, []byte, error)

FromObject extracts the index key from a *schedulerobjects.Node.

type NodeDb

type NodeDb struct {
	// contains filtered or unexported fields
}

NodeDb is the scheduler-internal system for storing node information. It's used to efficiently find nodes on which a pod can be scheduled.

func NewNodeDb

func NewNodeDb(priorityClasses map[string]configuration.PriorityClass, maxExtraNodesToConsider uint, indexedResources, indexedTaints, indexedNodeLabels []string) (*NodeDb, error)

func (*NodeDb) ClearAllocated

func (nodeDb *NodeDb) ClearAllocated() error

ClearAllocated zeroes out allocated resources on all nodes in the NodeDb.

func (*NodeDb) GetNode added in v0.3.47

func (nodeDb *NodeDb) GetNode(id string) (*schedulerobjects.Node, error)

GetNode returns a node in the db with given id.

func (*NodeDb) GetNodeWithTxn added in v0.3.47

func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*schedulerobjects.Node, error)

GetNodeWithTxn returns a node in the db with given id, within the provided transactions.

func (*NodeDb) NodeTypesMatchingPod

func (nodeDb *NodeDb) NodeTypesMatchingPod(req *schedulerobjects.PodRequirements) ([]*schedulerobjects.NodeType, map[string]int, error)

NodeTypesMatchingPod returns a slice composed of all node types a given pod could potentially be scheduled on.

func (*NodeDb) ScheduleMany

func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)

ScheduleMany assigns a set of pods to nodes. The assignment is atomic, i.e., either all pods are successfully assigned to nodes or none are. The returned bool indicates whether assignment succeeded or not. TODO: Pass through contexts to support timeouts.

func (*NodeDb) ScheduleManyWithTxn

func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*PodSchedulingReport, bool, error)

func (*NodeDb) SelectAndBindNodeToPod

func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)

func (*NodeDb) SelectAndBindNodeToPodWithTxn

func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)

func (*NodeDb) SelectNodeForPod

func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)

func (*NodeDb) SelectNodeForPodWithTxn

func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*PodSchedulingReport, error)

SelectNodeForPodWithTxn selects a node on which the pod can be scheduled.

func (*NodeDb) String

func (nodeDb *NodeDb) String() string

func (*NodeDb) Txn

func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn

func (*NodeDb) Upsert

func (nodeDb *NodeDb) Upsert(node *schedulerobjects.Node) error

func (*NodeDb) UpsertMany added in v0.3.50

func (nodeDb *NodeDb) UpsertMany(nodes []*schedulerobjects.Node) error

func (*NodeDb) UpsertManyWithTxn added in v0.3.50

func (nodeDb *NodeDb) UpsertManyWithTxn(txn *memdb.Txn, nodes []*schedulerobjects.Node) error

func (*NodeDb) UpsertWithTxn added in v0.3.50

func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *schedulerobjects.Node) error

type NodeDominantQueueIndex added in v0.3.50

type NodeDominantQueueIndex struct{}

func (*NodeDominantQueueIndex) FromArgs added in v0.3.50

func (index *NodeDominantQueueIndex) FromArgs(args ...interface{}) ([]byte, error)

FromArgs computes the index value from a set of arguments. Takes a single argument of type string.

func (*NodeDominantQueueIndex) FromObject added in v0.3.50

func (index *NodeDominantQueueIndex) FromObject(raw interface{}) (bool, []byte, error)

FromObject extracts the index valuefrom a *schedulerobjects.Node object.

type NodeIterator added in v0.3.50

type NodeIterator interface {
	NextNode() *schedulerobjects.Node
}

type NodePairIterator added in v0.3.50

type NodePairIterator struct {
	// contains filtered or unexported fields
}

func NewNodePairIterator added in v0.3.50

func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error)

func (*NodePairIterator) Next added in v0.3.50

func (it *NodePairIterator) Next() interface{}

func (*NodePairIterator) NextItem added in v0.3.50

func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem)

func (*NodePairIterator) WatchCh added in v0.3.50

func (it *NodePairIterator) WatchCh() <-chan struct{}

type NodePairIteratorItem added in v0.3.50

type NodePairIteratorItem struct {
	NodeA *schedulerobjects.Node
	NodeB *schedulerobjects.Node
}

type NodeTypeResourceIterator

type NodeTypeResourceIterator struct {
	// contains filtered or unexported fields
}

NodeTypeResourceIterator is an iterator over all nodes of a given nodeType, for which there's at least some specified amount of a given resource available. For example, all nodes of type "foo" for which there's at least 1Gi of memory available.

Available resources is the sum of unused resources and resources assigned to lower-priority jobs. Nodes are returned in sorted order, going from least to most of the specified resource available.

If exclusiveToQueue is NodeDominantQueueWildcard, all nodes of the given node type are considered. Otherwise, only nodes for which the given queue has the largest request are returned. If maxActiveQueues > 0, only nodes with less than or equal to this number of active queues are returned.

func NewNodeTypeResourceIterator

func NewNodeTypeResourceIterator(txn *memdb.Txn, dominantQueue string, maxActiveQueues int, resource string, priority int32, nodeType *schedulerobjects.NodeType, resourceAmount resource.Quantity) (*NodeTypeResourceIterator, error)

func (*NodeTypeResourceIterator) Next

func (it *NodeTypeResourceIterator) Next() interface{}

func (*NodeTypeResourceIterator) NextNodeItem

func (it *NodeTypeResourceIterator) NextNodeItem() *schedulerobjects.Node

func (*NodeTypeResourceIterator) WatchCh

func (it *NodeTypeResourceIterator) WatchCh() <-chan struct{}

type NodeTypesResourceIterator

type NodeTypesResourceIterator struct {
	// contains filtered or unexported fields
}

NodeTypesResourceIterator extends NodeTypeResourceIterator to iterate over nodes of several node types. Nodes are returned in sorted order, going from least to most of the specified resource available.

If exclusiveToQueue is NodeDominantQueueWildcard, all nodes of the given node type are considered. Otherwise, only nodes exclusive to that queue are considered.

func NewNodeTypesResourceIterator

func NewNodeTypesResourceIterator(txn *memdb.Txn, dominantQueue string, maxActiveQueues int, resource string, priority int32, nodeTypes []*schedulerobjects.NodeType, resourceQuantity resource.Quantity) (*NodeTypesResourceIterator, error)

func (*NodeTypesResourceIterator) Next

func (it *NodeTypesResourceIterator) Next() interface{}

func (*NodeTypesResourceIterator) NextNodeItem

func (it *NodeTypesResourceIterator) NextNodeItem() *schedulerobjects.Node

func (*NodeTypesResourceIterator) WatchCh

func (it *NodeTypesResourceIterator) WatchCh() <-chan struct{}

type NodeTypesResourceIteratorItem

type NodeTypesResourceIteratorItem struct {
	// contains filtered or unexported fields
}

type NodeTypesResourceIteratorPQ

type NodeTypesResourceIteratorPQ []*NodeTypesResourceIteratorItem

NodeTypesResourceIteratorPQ is a priority queue used by NodeTypesResourceIterator to return results from across several sub-iterators in order.

func (NodeTypesResourceIteratorPQ) Len

func (NodeTypesResourceIteratorPQ) Less

func (pq NodeTypesResourceIteratorPQ) Less(i, j int) bool

func (*NodeTypesResourceIteratorPQ) Pop

func (pq *NodeTypesResourceIteratorPQ) Pop() any

func (*NodeTypesResourceIteratorPQ) Push

func (pq *NodeTypesResourceIteratorPQ) Push(x any)

func (NodeTypesResourceIteratorPQ) Swap

func (pq NodeTypesResourceIteratorPQ) Swap(i, j int)

type NodesIterator

type NodesIterator struct {
	// contains filtered or unexported fields
}

NodesIterator is an iterator over all nodes in the db.

func NewNodesIterator

func NewNodesIterator(txn *memdb.Txn) (*NodesIterator, error)

func (*NodesIterator) Next

func (it *NodesIterator) Next() interface{}

func (*NodesIterator) NextNode

func (it *NodesIterator) NextNode() *schedulerobjects.Node

func (*NodesIterator) WatchCh

func (it *NodesIterator) WatchCh() <-chan struct{}

type PodSchedulingReport

type PodSchedulingReport struct {
	// Time at which this report was created.
	Timestamp time.Time
	// Pod scheduling requirements.
	Req *schedulerobjects.PodRequirements
	// Resource type determined by the scheduler to be the hardest to satisfy
	// the scheduling requirements for.
	DominantResourceType string
	// Node the pod was assigned to.
	// If nil, the pod could not be assigned to any Node.
	Node *schedulerobjects.Node
	// Score indicates how well the pod fits on the selected Node.
	Score int
	// Number of Node types that
	NumMatchedNodeTypes int
	// Number of Node types excluded by reason.
	NumExcludedNodeTypesByReason map[string]int
	// Number of nodes excluded by reason.
	NumExcludedNodesByReason map[string]int
	// Set if an error occurred while attempting to schedule this pod.
	Err error
}

PodSchedulingReport is returned by SelectAndBindNodeToPod and contains detailed information on the scheduling decision made for this pod.

func (*PodSchedulingReport) String

func (report *PodSchedulingReport) String() string

type Publisher

type Publisher interface {
	// PublishMessages will publish the supplied messages. A LeaderToken is provided and the
	// implementor may decide whether to publish based on the status of this token
	PublishMessages(ctx context.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error

	// PublishMarkers publishes a single marker message for each Pulsar partition.  Each marker
	// massage contains the supplied group id, which allows all marker messages for a given call
	// to be identified.  The uint32 returned is the number of messages published
	PublishMarkers(ctx context.Context, groupId uuid.UUID) (uint32, error)
}

Publisher is an interface to be implemented by structs that handle publishing messages to pulsar

type PulsarPublisher

type PulsarPublisher struct {
	// contains filtered or unexported fields
}

PulsarPublisher is the default implementation of Publisher

func NewPulsarPublisher

func NewPulsarPublisher(
	pulsarClient pulsar.Client,
	producerOptions pulsar.ProducerOptions,
	pulsarSendTimeout time.Duration,
) (*PulsarPublisher, error)

func (*PulsarPublisher) PublishMarkers

func (p *PulsarPublisher) PublishMarkers(ctx context.Context, groupId uuid.UUID) (uint32, error)

PublishMarkers sends one pulsar message (containing an armadaevents.PartitionMarker) to each partition of the producer's Pulsar topic.

func (*PulsarPublisher) PublishMessages

func (p *PulsarPublisher) PublishMessages(ctx context.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error

PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into single event sequences up to maxMessageBatchSize.

type Queue added in v0.3.47

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue added in v0.3.47

func NewQueue(name string, priorityFactor float64, jobIterator JobIterator) (*Queue, error)

type QueueCandidateGangIterator

type QueueCandidateGangIterator struct {
	SchedulingConstraints
	QueueSchedulingRoundReport *QueueSchedulingRoundReport
	// contains filtered or unexported fields
}

QueueCandidateGangIterator is an iterator over gangs in a queue that could be scheduled without exceeding per-queue limits.

func (*QueueCandidateGangIterator) Clear added in v0.3.47

func (it *QueueCandidateGangIterator) Clear() error

func (*QueueCandidateGangIterator) Next

func (*QueueCandidateGangIterator) Peek added in v0.3.47

type QueueCandidateGangIteratorItem added in v0.3.47

type QueueCandidateGangIteratorItem struct {
	// contains filtered or unexported fields
}

type QueueCandidateGangIteratorPQ added in v0.3.47

type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem

Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.

func (QueueCandidateGangIteratorPQ) Len added in v0.3.47

func (QueueCandidateGangIteratorPQ) Less added in v0.3.47

func (pq QueueCandidateGangIteratorPQ) Less(i, j int) bool

func (*QueueCandidateGangIteratorPQ) Pop added in v0.3.47

func (*QueueCandidateGangIteratorPQ) Push added in v0.3.47

func (pq *QueueCandidateGangIteratorPQ) Push(x any)

func (QueueCandidateGangIteratorPQ) Swap added in v0.3.47

func (pq QueueCandidateGangIteratorPQ) Swap(i, j int)

type QueueSchedulingReport

type QueueSchedulingReport struct {
	// Queue name.
	Name                                      string
	MostRecentSuccessfulJobSchedulingReport   *JobSchedulingReport
	MostRecentUnsuccessfulJobSchedulingReport *JobSchedulingReport
}

QueueSchedulingReport contains job scheduling reports for the most recent successful and failed scheduling attempts for this queue.

func (*QueueSchedulingReport) String

func (report *QueueSchedulingReport) String() string

type QueueSchedulingRoundReport

type QueueSchedulingRoundReport struct {
	// These factors influence the fraction of resources assigned to each queue.
	PriorityFactor float64
	// Resources assigned to the queue across all clusters at the start of the scheduling cycle.
	ResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType
	// Resources assigned to this queue during this scheduling cycle.
	ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType
	// Reports for all successful job scheduling attempts.
	SuccessfulJobSchedulingReports map[uuid.UUID]*JobSchedulingReport
	// Reports for all unsuccessful job scheduling attempts.
	UnsuccessfulJobSchedulingReports map[uuid.UUID]*JobSchedulingReport
	// Total number of jobs successfully scheduled in this round for this queue.
	NumScheduledJobs int
	// contains filtered or unexported fields
}

QueueSchedulingRoundReport captures the decisions made by the scheduler during one invocation for a particular queue.

func NewQueueSchedulingRoundReport

func NewQueueSchedulingRoundReport(priorityFactor float64, initialResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType) *QueueSchedulingRoundReport

func (*QueueSchedulingRoundReport) AddJobSchedulingReport

func (report *QueueSchedulingRoundReport) AddJobSchedulingReport(r *JobSchedulingReport, isEvictedJob bool)

AddJobSchedulingReport adds a job scheduling report to the report for this invocation of the scheduler. Automatically updates scheduled resources by calling AddScheduledResources. Is thread-safe.

func (*QueueSchedulingRoundReport) ClearJobSpecs

func (report *QueueSchedulingRoundReport) ClearJobSpecs()

ClearJobSpecs zeroes out job specs to reduce memory usage.

type QueuedGangIterator

type QueuedGangIterator struct {
	// contains filtered or unexported fields
}

QueuedGangIterator is an iterator over all gangs in a queue, where a gang is a set of jobs for which the gangIdAnnotation has equal value. A gang is yielded once the final member of the gang has been received. Jobs without gangIdAnnotation are considered to be gangs of cardinality 1.

func NewQueuedGangIterator

func NewQueuedGangIterator(ctx context.Context, it JobIterator, maxLookback uint, gangIdAnnotation, gangCardinalityAnnotation string) *QueuedGangIterator

func (*QueuedGangIterator) Clear added in v0.3.47

func (it *QueuedGangIterator) Clear() error

func (*QueuedGangIterator) Next

func (*QueuedGangIterator) Peek added in v0.3.47

type QueuedJobsIterator

type QueuedJobsIterator struct {
	// contains filtered or unexported fields
}

QueuedJobsIterator is an iterator over all jobs in a queue. It lazily loads jobs in batches from Redis asynch.

func NewQueuedJobsIterator

func NewQueuedJobsIterator(ctx context.Context, queue string, repo JobRepository) (*QueuedJobsIterator, error)

func (*QueuedJobsIterator) Next

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is the main Armada scheduler. It periodically performs the following cycle: 1. Update state from postgres (via the jobRepository). 2. Determine if leader and exit if not. 3. Generate any necessary events resulting from the state update. 4. Expire any jobs assigned to clusters that have timed out. 5. Schedule jobs. 6. Publish any Armada events resulting from the scheduling cycle.

func NewScheduler

func NewScheduler(
	jobRepository database.JobRepository,
	executorRepository database.ExecutorRepository,
	schedulingAlgo SchedulingAlgo,
	leaderController LeaderController,
	publisher Publisher,
	stringInterner *stringinterner.StringInterner,
	cyclePeriod time.Duration,
	executorTimeout time.Duration,
	maxLeaseReturns uint,
) (*Scheduler, error)

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run enters the scheduling loop, which will continue until the ctx is cancelled

type SchedulingAlgo

type SchedulingAlgo interface {
	// Schedule should assign jobs to nodes
	// Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided
	// It should return a slice containing all scheduled jobs.
	Schedule(ctx context.Context, txn *jobdb.Txn, jobDb *jobdb.JobDb) ([]*jobdb.Job, error)
}

SchedulingAlgo is an interface that should bne implemented by structs capable of assigning Jobs to nodes

type SchedulingConstraints

type SchedulingConstraints struct {
	PriorityClasses map[string]configuration.PriorityClass
	// Executor for which we're currently scheduling jobs.
	ExecutorId string
	// Resource pool of this executor.
	Pool string
	// Weights used when computing total resource usage.
	ResourceScarcity map[string]float64
	// Max number of jobs to scheduler per lease jobs call.
	MaximumJobsToSchedule uint
	// Max number of jobs to consider for a queue before giving up.
	MaxLookbackPerQueue uint
	// Jobs leased to this executor must be at least this large.
	// Used, e.g., to avoid scheduling CPU-only jobs onto clusters with GPUs.
	MinimumJobSize schedulerobjects.ResourceList
	// Per-queue resource limits.
	// Map from resource type to the limit for that resource.
	MaximalResourceFractionPerQueue map[string]float64
	// Limit- as a fraction of total resources across worker clusters- of resource types at each priority.
	// The limits are cumulative, i.e., the limit at priority p includes all higher levels.
	MaximalCumulativeResourceFractionPerQueueAndPriority map[int32]map[string]float64
	// Max resources to schedule per queue at a time.
	MaximalResourceFractionToSchedulePerQueue map[string]float64
	// Max resources to schedule at a time.
	MaximalResourceFractionToSchedule map[string]float64
	// Total resources across all worker clusters.
	// Used when computing resource limits.
	TotalResources schedulerobjects.ResourceList
}

SchedulingConstraints collects scheduling constraints, e.g., per-queue resource limits.

func SchedulingConstraintsFromSchedulingConfig

func SchedulingConstraintsFromSchedulingConfig(
	executorId, pool string,
	minimumJobSize schedulerobjects.ResourceList,
	config configuration.SchedulingConfig,
	totalResources schedulerobjects.ResourceList,
) *SchedulingConstraints

type SchedulingReportsRepository

type SchedulingReportsRepository struct {
	// Scheduling reports for the jobs that were most recently attempted to be scheduled.
	MostRecentJobSchedulingReports *lru.Cache
	// Scheduling reports for the most recently seen queues.
	MostRecentQueueSchedulingReports *lru.Cache
}

SchedulingReportsRepository stores reports on the most recent scheduling attempts.

func NewSchedulingReportsRepository

func NewSchedulingReportsRepository(maxQueueSchedulingReports, maxJobSchedulingReports int) *SchedulingReportsRepository

func (*SchedulingReportsRepository) Add

func (repo *SchedulingReportsRepository) Add(queueName string, report *JobSchedulingReport)

func (*SchedulingReportsRepository) AddMany

func (repo *SchedulingReportsRepository) AddMany(queueName string, reports []*JobSchedulingReport)

func (*SchedulingReportsRepository) AddSchedulingRoundReport

func (repo *SchedulingReportsRepository) AddSchedulingRoundReport(report *SchedulingRoundReport)

func (*SchedulingReportsRepository) GetJobReport

func (*SchedulingReportsRepository) GetJobSchedulingReport

func (repo *SchedulingReportsRepository) GetJobSchedulingReport(jobId uuid.UUID) (*JobSchedulingReport, bool)

func (*SchedulingReportsRepository) GetQueueReport

func (*SchedulingReportsRepository) GetQueueSchedulingReport

func (repo *SchedulingReportsRepository) GetQueueSchedulingReport(queueName string) (*QueueSchedulingReport, bool)

type SchedulingRoundReport

type SchedulingRoundReport struct {
	// Time at which the scheduling cycle started.
	Started time.Time
	// Time at which the scheduling cycle finished.
	Finished time.Time
	// Executor for which the scheduler was invoked.
	Executor string
	// Per-queue scheduling reports.
	QueueSchedulingRoundReports map[string]*QueueSchedulingRoundReport
	// Total resources across all clusters available at the start of the scheduling cycle.
	TotalResources schedulerobjects.ResourceList
	// Resources assigned across all queues during this scheduling cycle.
	ScheduledResourcesByPriority schedulerobjects.QuantityByPriorityAndResourceType
	// Total number of jobs successfully scheduled in this round.
	NumScheduledJobs int
	// Reason for why the scheduling round finished.
	TerminationReason string
	// contains filtered or unexported fields
}

SchedulingRoundReport captures the decisions made by the scheduler during one invocation.

func NewSchedulingRoundReport

func NewSchedulingRoundReport(
	totalResources schedulerobjects.ResourceList,
	priorityFactorByQueue map[string]float64,
	initialResourcesByQueueAndPriority map[string]schedulerobjects.QuantityByPriorityAndResourceType,
) *SchedulingRoundReport

func (*SchedulingRoundReport) AddJobSchedulingReport

func (report *SchedulingRoundReport) AddJobSchedulingReport(r *JobSchedulingReport, isEvictedJob bool)

AddJobSchedulingReport adds a job scheduling report to the report for this invocation of the scheduler. If updateTotals is true, automatically updates scheduled resources

func (*SchedulingRoundReport) ClearJobSpecs

func (report *SchedulingRoundReport) ClearJobSpecs()

ClearJobSpecs zeroes out job specs to reduce memory usage.

func (*SchedulingRoundReport) String

func (report *SchedulingRoundReport) String() string

func (*SchedulingRoundReport) SuccessfulJobSchedulingReports

func (report *SchedulingRoundReport) SuccessfulJobSchedulingReports() []*JobSchedulingReport

type StandaloneLeaderController

type StandaloneLeaderController struct {
	// contains filtered or unexported fields
}

StandaloneLeaderController returns a token that always indicates you are leader This can be used when only a single instance of the scheduler is needed

func NewStandaloneLeaderController

func NewStandaloneLeaderController() *StandaloneLeaderController

func (*StandaloneLeaderController) GetToken

func (lc *StandaloneLeaderController) GetToken() LeaderToken

func (*StandaloneLeaderController) Run added in v0.3.47

func (*StandaloneLeaderController) ValidateToken

func (lc *StandaloneLeaderController) ValidateToken(tok LeaderToken) bool

type SubmitChecker

type SubmitChecker struct {
	// contains filtered or unexported fields
}

func NewSubmitChecker

func NewSubmitChecker(
	executorTimeout time.Duration,
	schedulingConfig configuration.SchedulingConfig,
	executorRepository database.ExecutorRepository,
) *SubmitChecker

func (*SubmitChecker) CheckApiJobs

func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string)

func (*SubmitChecker) Run added in v0.3.49

func (srv *SubmitChecker) Run(ctx context.Context) error

Directories

Path Synopsis
Package schedulermocks is a generated GoMock package.
Package schedulermocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL