Documentation ¶
Index ¶
- Constants
- func CreateCheckInfo(succeeded bool, name, description, message string) dao.HealthCheckInfo
- func GetSchedulerHealthStatus(metrics metrics.CoreSchedulerMetrics, schedulerContext *ClusterContext) dao.SchedulerHealthDAOInfo
- type ClusterContext
- func (cc *ClusterContext) GetApplication(appID, partitionName string) *objects.Application
- func (cc *ClusterContext) GetLastHealthCheckResult() *dao.SchedulerHealthDAOInfo
- func (cc *ClusterContext) GetNode(nodeID, partitionName string) *objects.Node
- func (cc *ClusterContext) GetPartition(partitionName string) *PartitionContext
- func (cc *ClusterContext) GetPartitionMapClone() map[string]*PartitionContext
- func (cc *ClusterContext) GetPartitionWithoutClusterID(partitionName string) *PartitionContext
- func (cc *ClusterContext) GetPolicyGroup() string
- func (cc *ClusterContext) GetQueue(queueName string, partitionName string) *objects.Queue
- func (cc *ClusterContext) GetRMInfoMapClone() map[string]*RMInformation
- func (cc *ClusterContext) GetReservations(partitionName string) map[string]int
- func (cc *ClusterContext) GetStartTime() time.Time
- func (cc *ClusterContext) NeedPreemption() bool
- func (cc *ClusterContext) SetLastHealthCheckResult(c *dao.SchedulerHealthDAOInfo)
- func (cc *ClusterContext) SetRMInfo(rmID string, rmBuildInformation map[string]string)
- func (cc *ClusterContext) UpdateRMSchedulerConfig(rmID string) error
- func (cc *ClusterContext) UpdateSchedulerConfig(conf *configs.SchedulerConfig) error
- type DRFPreemptionPolicy
- type HealthChecker
- type PartitionContext
- func (pc *PartitionContext) AddApplication(app *objects.Application) error
- func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error
- func (pc *PartitionContext) AddRejectedApplication(rejectedApplication *objects.Application, rejectedMessage string)
- func (pc *PartitionContext) GetAllocatedResource() *resources.Resource
- func (pc *PartitionContext) GetApplications() []*objects.Application
- func (pc *PartitionContext) GetAppsByState(state string) []*objects.Application
- func (pc *PartitionContext) GetAppsInTerminatedState() []*objects.Application
- func (pc *PartitionContext) GetCompletedApplications() []*objects.Application
- func (pc *PartitionContext) GetCurrentState() string
- func (pc *PartitionContext) GetNode(nodeID string) *objects.Node
- func (pc *PartitionContext) GetNodeIterator() objects.NodeIterator
- func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy
- func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64
- func (pc *PartitionContext) GetNodes() []*objects.Node
- func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo
- func (pc *PartitionContext) GetQueue(name string) *objects.Queue
- func (pc *PartitionContext) GetQueueInfo() dao.QueueDAOInfo
- func (pc *PartitionContext) GetRejectedApplications() []*objects.Application
- func (pc *PartitionContext) GetRejectedAppsByState(state string) []*objects.Application
- func (pc *PartitionContext) GetStateDumpFilePath() string
- func (pc *PartitionContext) GetStateTime() time.Time
- func (pc *PartitionContext) GetTotalAllocationCount() int
- func (pc *PartitionContext) GetTotalApplicationCount() int
- func (pc *PartitionContext) GetTotalCompletedApplicationCount() int
- func (pc *PartitionContext) GetTotalNodeCount() int
- func (pc *PartitionContext) GetTotalPartitionResource() *resources.Resource
- type PreemptionPolicy
- type RMInformation
- type Scheduler
Constants ¶
const ( DefaultCleanRootInterval = 10000 * time.Millisecond // sleep between queue removal checks DefaultCleanExpiredAppsInterval = 24 * time.Hour // sleep between apps removal checks )
const ( LevelKey = "level" PhaseKey = "phase" NameKey = "name" StateKey = "state" InfoKey = "info" )
Variables ¶
This section is empty.
Functions ¶
func CreateCheckInfo ¶
func CreateCheckInfo(succeeded bool, name, description, message string) dao.HealthCheckInfo
func GetSchedulerHealthStatus ¶
func GetSchedulerHealthStatus(metrics metrics.CoreSchedulerMetrics, schedulerContext *ClusterContext) dao.SchedulerHealthDAOInfo
Types ¶
type ClusterContext ¶
func NewClusterContext ¶
func NewClusterContext(rmID, policyGroup string) (*ClusterContext, error)
Create a new cluster context to be used outside of the event system. test only
func (*ClusterContext) GetApplication ¶
func (cc *ClusterContext) GetApplication(appID, partitionName string) *objects.Application
Get the scheduling application based on the ID from the partition. Returns nil if the partition or app cannot be found. Visible for tests
func (*ClusterContext) GetLastHealthCheckResult ¶
func (cc *ClusterContext) GetLastHealthCheckResult() *dao.SchedulerHealthDAOInfo
func (*ClusterContext) GetNode ¶
func (cc *ClusterContext) GetNode(nodeID, partitionName string) *objects.Node
Get a scheduling node based on its name from the partition. Returns nil if the partition or node cannot be found. Visible for tests
func (*ClusterContext) GetPartition ¶
func (cc *ClusterContext) GetPartition(partitionName string) *PartitionContext
func (*ClusterContext) GetPartitionMapClone ¶
func (cc *ClusterContext) GetPartitionMapClone() map[string]*PartitionContext
func (*ClusterContext) GetPartitionWithoutClusterID ¶
func (cc *ClusterContext) GetPartitionWithoutClusterID(partitionName string) *PartitionContext
func (*ClusterContext) GetPolicyGroup ¶
func (cc *ClusterContext) GetPolicyGroup() string
Get the config name.
func (*ClusterContext) GetQueue ¶
func (cc *ClusterContext) GetQueue(queueName string, partitionName string) *objects.Queue
Get the scheduling queue based on the queue path name from the partition. Returns nil if the partition or queue cannot be found. Visible for tests
func (*ClusterContext) GetRMInfoMapClone ¶
func (cc *ClusterContext) GetRMInfoMapClone() map[string]*RMInformation
func (*ClusterContext) GetReservations ¶
func (cc *ClusterContext) GetReservations(partitionName string) map[string]int
Return the list of reservations for the partition. Returns nil if the partition cannot be found or an empty map if there are no reservations Visible for tests
func (*ClusterContext) GetStartTime ¶
func (cc *ClusterContext) GetStartTime() time.Time
func (*ClusterContext) NeedPreemption ¶
func (cc *ClusterContext) NeedPreemption() bool
func (*ClusterContext) SetLastHealthCheckResult ¶
func (cc *ClusterContext) SetLastHealthCheckResult(c *dao.SchedulerHealthDAOInfo)
func (*ClusterContext) SetRMInfo ¶
func (cc *ClusterContext) SetRMInfo(rmID string, rmBuildInformation map[string]string)
func (*ClusterContext) UpdateRMSchedulerConfig ¶
func (cc *ClusterContext) UpdateRMSchedulerConfig(rmID string) error
Locked version of the configuration update called outside of event system. Updates the current config via the config loader. Used in test only, normal updates use the internal call and the webservice must use the UpdateSchedulerConfig
func (*ClusterContext) UpdateSchedulerConfig ¶
func (cc *ClusterContext) UpdateSchedulerConfig(conf *configs.SchedulerConfig) error
Locked version of the configuration update called from the webservice NOTE: this call assumes one RM which is registered and uses that RM for the updates
type DRFPreemptionPolicy ¶
type DRFPreemptionPolicy struct { }
Preemption policy based-on DRF
func (*DRFPreemptionPolicy) DoPreemption ¶
func (m *DRFPreemptionPolicy) DoPreemption(scheduler *Scheduler)
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
func NewHealthChecker ¶
func NewHealthChecker() *HealthChecker
func NewHealthCheckerWithParameters ¶
func NewHealthCheckerWithParameters(period time.Duration) *HealthChecker
type PartitionContext ¶
type PartitionContext struct { RmID string // the RM the partition belongs to Name string // name of the partition (logging mainly) // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application // acquires a write lock of the application object. While holding the write lock a list of nodes is // requested from the partition. This requires a read lock on the partition. // If the partition write lock is held while manipulating an application a dead lock could occur. // Since application objects handle their own locks there is no requirement to hold the partition lock // while manipulating the application. // Similarly adding, updating or removing a node or a queue should only hold the partition write lock // while manipulating the partition information not while manipulating the underlying objects. sync.RWMutex // contains filtered or unexported fields }
func (*PartitionContext) AddApplication ¶
func (pc *PartitionContext) AddApplication(app *objects.Application) error
Add a new application to the partition. NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (*PartitionContext) AddNode ¶
func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error
Add the node to the partition and process the allocations that are reported by the node. NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (*PartitionContext) AddRejectedApplication ¶
func (pc *PartitionContext) AddRejectedApplication(rejectedApplication *objects.Application, rejectedMessage string)
func (*PartitionContext) GetAllocatedResource ¶
func (pc *PartitionContext) GetAllocatedResource() *resources.Resource
func (*PartitionContext) GetApplications ¶
func (pc *PartitionContext) GetApplications() []*objects.Application
func (*PartitionContext) GetAppsByState ¶
func (pc *PartitionContext) GetAppsByState(state string) []*objects.Application
func (*PartitionContext) GetAppsInTerminatedState ¶
func (pc *PartitionContext) GetAppsInTerminatedState() []*objects.Application
func (*PartitionContext) GetCompletedApplications ¶
func (pc *PartitionContext) GetCompletedApplications() []*objects.Application
func (*PartitionContext) GetCurrentState ¶
func (pc *PartitionContext) GetCurrentState() string
func (*PartitionContext) GetNode ¶
func (pc *PartitionContext) GetNode(nodeID string) *objects.Node
Get a node from the partition by nodeID.
func (*PartitionContext) GetNodeIterator ¶
func (pc *PartitionContext) GetNodeIterator() objects.NodeIterator
Create an ordered node iterator based on the node sort policy set for this partition. The iterator is nil if there are no unreserved nodes available.
func (*PartitionContext) GetNodeSortingPolicyType ¶
func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy
func (*PartitionContext) GetNodeSortingResourceWeights ¶
func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64
func (*PartitionContext) GetNodes ¶
func (pc *PartitionContext) GetNodes() []*objects.Node
func (*PartitionContext) GetPartitionQueues ¶
func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo
Get the queue info for the whole queue structure to pass to the webservice
func (*PartitionContext) GetQueue ¶
func (pc *PartitionContext) GetQueue(name string) *objects.Queue
Get the queue from the structure based on the fully qualified name. Wrapper around the unlocked version getQueueInternal() Visible by tests
func (*PartitionContext) GetQueueInfo ¶
func (pc *PartitionContext) GetQueueInfo() dao.QueueDAOInfo
Get the queue info for the whole queue structure to pass to the webservice
func (*PartitionContext) GetRejectedApplications ¶
func (pc *PartitionContext) GetRejectedApplications() []*objects.Application
func (*PartitionContext) GetRejectedAppsByState ¶
func (pc *PartitionContext) GetRejectedAppsByState(state string) []*objects.Application
used to find expired apps in rejected applications
func (*PartitionContext) GetStateDumpFilePath ¶
func (pc *PartitionContext) GetStateDumpFilePath() string
Get the state dump output file path from the partition. NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (*PartitionContext) GetStateTime ¶
func (pc *PartitionContext) GetStateTime() time.Time
func (*PartitionContext) GetTotalAllocationCount ¶
func (pc *PartitionContext) GetTotalAllocationCount() int
func (*PartitionContext) GetTotalApplicationCount ¶
func (pc *PartitionContext) GetTotalApplicationCount() int
func (*PartitionContext) GetTotalCompletedApplicationCount ¶
func (pc *PartitionContext) GetTotalCompletedApplicationCount() int
func (*PartitionContext) GetTotalNodeCount ¶
func (pc *PartitionContext) GetTotalNodeCount() int
func (*PartitionContext) GetTotalPartitionResource ¶
func (pc *PartitionContext) GetTotalPartitionResource() *resources.Resource
type PreemptionPolicy ¶
type PreemptionPolicy interface {
DoPreemption(scheduler *Scheduler)
}
type RMInformation ¶
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Main Scheduler service that starts the needed sub services
func NewScheduler ¶
func NewScheduler() *Scheduler
func (*Scheduler) GetClusterContext ¶
func (s *Scheduler) GetClusterContext() *ClusterContext
Visible by tests
func (*Scheduler) HandleEvent ¶
func (s *Scheduler) HandleEvent(ev interface{})
Implement methods for Scheduler events
func (*Scheduler) MultiStepSchedule ¶
The scheduler for testing which runs nAlloc times the normal schedule routine. Visible by tests
func (*Scheduler) SingleStepPreemption ¶
func (s *Scheduler) SingleStepPreemption()
Visible by tests
func (*Scheduler) StartService ¶
func (s *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool)
Start service