Documentation ¶
Index ¶
- Constants
- type DispatchExited
- type DispatchStateChange
- type DispatcherResourceManager
- func (m *DispatcherResourceManager) Allocate(msg sproto.AllocateRequest) (*sproto.ResourcesSubscription, error)
- func (*DispatcherResourceManager) CreateNamespace(string, string, bool) error
- func (*DispatcherResourceManager) DefaultNamespace(string) (*string, error)
- func (m *DispatcherResourceManager) DeleteJob(msg sproto.DeleteJob) (sproto.DeleteJobResponse, error)
- func (*DispatcherResourceManager) DeleteNamespace(string) error
- func (m *DispatcherResourceManager) DisableAgent(msg *apiv1.DisableAgentRequest) (*apiv1.DisableAgentResponse, error)
- func (m *DispatcherResourceManager) DisableSlot(*apiv1.DisableSlotRequest) (resp *apiv1.DisableSlotResponse, err error)
- func (m *DispatcherResourceManager) DispatchExpLogMessage(msg dispatchExpLogMessage)
- func (m *DispatcherResourceManager) DispatchStateChange(msg DispatchStateChange)
- func (m *DispatcherResourceManager) EnableAgent(msg *apiv1.EnableAgentRequest) (*apiv1.EnableAgentResponse, error)
- func (m *DispatcherResourceManager) EnableSlot(*apiv1.EnableSlotRequest) (resp *apiv1.EnableSlotResponse, err error)
- func (m *DispatcherResourceManager) ExternalPreemptionPending(msg sproto.PendingPreemption) error
- func (m *DispatcherResourceManager) GetAgent(msg *apiv1.GetAgentRequest) (*apiv1.GetAgentResponse, error)
- func (m *DispatcherResourceManager) GetAgents() (*apiv1.GetAgentsResponse, error)
- func (m *DispatcherResourceManager) GetAllocationSummaries() (map[model.AllocationID]sproto.AllocationSummary, error)
- func (m *DispatcherResourceManager) GetDefaultAuxResourcePool() (rm.ResourcePoolName, error)
- func (m *DispatcherResourceManager) GetDefaultComputeResourcePool() (rm.ResourcePoolName, error)
- func (m *DispatcherResourceManager) GetExternalJobs(rpName rm.ResourcePoolName) ([]*jobv1.Job, error)
- func (m *DispatcherResourceManager) GetJobQ(rpName rm.ResourcePoolName) (map[model.JobID]*sproto.RMJobInfo, error)
- func (m *DispatcherResourceManager) GetJobQueueStatsRequest(msg *apiv1.GetJobQueueStatsRequest) (*apiv1.GetJobQueueStatsResponse, error)
- func (*DispatcherResourceManager) GetNamespaceResourceQuota(string, string) (*float64, error)
- func (m *DispatcherResourceManager) GetResourcePools() (*apiv1.GetResourcePoolsResponse, error)
- func (*DispatcherResourceManager) GetSlot(*apiv1.GetSlotRequest) (*apiv1.GetSlotResponse, error)
- func (*DispatcherResourceManager) GetSlots(*apiv1.GetSlotsRequest) (*apiv1.GetSlotsResponse, error)
- func (m *DispatcherResourceManager) HealthCheck() []model.ResourceManagerHealth
- func (m *DispatcherResourceManager) IsReattachEnabled() bool
- func (m *DispatcherResourceManager) IsReattachEnabledForRP(rpName string) bool
- func (m *DispatcherResourceManager) IsReattachableOnlyAfterStarted() bool
- func (m *DispatcherResourceManager) KillDispatcherResources(msg KillDispatcherResources)
- func (m *DispatcherResourceManager) NotifyContainerRunning(msg sproto.NotifyContainerRunning) error
- func (m *DispatcherResourceManager) RecoverJobPosition(sproto.RecoverJobPosition)
- func (m *DispatcherResourceManager) Release(msg sproto.ResourcesReleased)
- func (*DispatcherResourceManager) RemoveEmptyNamespace(string, string) error
- func (m *DispatcherResourceManager) ResolveResourcePool(name rm.ResourcePoolName, workspace, slots int) (rm.ResourcePoolName, error)
- func (m *DispatcherResourceManager) ResourceQueryPostActions(dispatchID string, owner string)
- func (m *DispatcherResourceManager) SchedulePendingTasks()
- func (m *DispatcherResourceManager) SetGroupMaxSlots(msg sproto.SetGroupMaxSlots)
- func (*DispatcherResourceManager) SetGroupPriority(sproto.SetGroupPriority) error
- func (*DispatcherResourceManager) SetGroupWeight(sproto.SetGroupWeight) error
- func (*DispatcherResourceManager) SetResourceQuota(int, string, string) error
- func (m *DispatcherResourceManager) StartDispatcherResources(msg StartDispatcherResources)
- func (m *DispatcherResourceManager) TaskContainerDefaults(resourcePoolName rm.ResourcePoolName, ...) (model.TaskContainerDefaultsConfig, error)
- func (m *DispatcherResourceManager) ValidateResourcePool(name rm.ResourcePoolName) error
- func (*DispatcherResourceManager) ValidateResources(req sproto.ValidateResourcesRequest) ([]command.LaunchWarning, error)
- func (*DispatcherResourceManager) VerifyNamespaceExists(string, string) error
- type DispatcherResources
- type KillDispatcherResources
- type StartDispatcherResources
Constants ¶
const SlurmPrologReasonCode = "Prolog"
SlurmPrologReasonCode is the Slurm Prolog Reason Code.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DispatchExited ¶
DispatchExited notifies the dispatcher that the give dispatch exited.
type DispatchStateChange ¶
type DispatchStateChange struct { DispatchID string State launcher.DispatchState IsPullingImage bool HPCJobID string }
DispatchStateChange notifies the dispatcher that the give dispatch has changed state.
type DispatcherResourceManager ¶
type DispatcherResourceManager struct {
// contains filtered or unexported fields
}
DispatcherResourceManager manages the lifecycle of dispatcher resources.
"jobCancelQueue" is a FIFO queue where job cancelation requests are placed waiting for the "jobCancelQueueWorker()" to remove it from the queue and send it to "stopLauncherJob()" so that the job termination request can be sent to the launcher.
"inflightCancelations" is a list of allocation IDs for job cancelations that are in progress. That is, "stopLauncherJob()" is running for that allocation ID. The "stopLauncherJob()" function will add the allocation ID to the list upon entry and remove it from the list upon exit.
func New ¶
func New( db *db.PgDB, echo *echoV4.Echo, cfg *config.ResourceManagerWithPoolsConfig, opts *aproto.MasterSetAgentOptions, cert *tls.Certificate, ) (*DispatcherResourceManager, error)
New returns a new dispatcher resource manager.
func (*DispatcherResourceManager) Allocate ¶
func (m *DispatcherResourceManager) Allocate(msg sproto.AllocateRequest) (*sproto.ResourcesSubscription, error)
Allocate adds a task to the queue to be allocated.
func (*DispatcherResourceManager) CreateNamespace ¶
func (*DispatcherResourceManager) CreateNamespace(string, string, bool) error
CreateNamespace is unsupported.
func (*DispatcherResourceManager) DefaultNamespace ¶
func (*DispatcherResourceManager) DefaultNamespace(string) (*string, error)
DefaultNamespace is unsupported.
func (*DispatcherResourceManager) DeleteJob ¶
func (m *DispatcherResourceManager) DeleteJob( msg sproto.DeleteJob, ) (sproto.DeleteJobResponse, error)
DeleteJob delete resources associated with a job from the launcher. Note to developers: this function doesn't acquire a lock and, ideally, we won't make it.
func (*DispatcherResourceManager) DeleteNamespace ¶
func (*DispatcherResourceManager) DeleteNamespace(string) error
DeleteNamespace is unsupported.
func (*DispatcherResourceManager) DisableAgent ¶
func (m *DispatcherResourceManager) DisableAgent(msg *apiv1.DisableAgentRequest, ) (*apiv1.DisableAgentResponse, error)
DisableAgent adds an agent to the exclude list when launching jobs. Note to developers: this function doesn't acquire a lock and, ideally, we won't make it.
func (*DispatcherResourceManager) DisableSlot ¶
func (m *DispatcherResourceManager) DisableSlot(*apiv1.DisableSlotRequest, ) (resp *apiv1.DisableSlotResponse, err error)
DisableSlot implements 'det slot disable...' functionality.
func (*DispatcherResourceManager) DispatchExpLogMessage ¶
func (m *DispatcherResourceManager) DispatchExpLogMessage(msg dispatchExpLogMessage)
DispatchExpLogMessage publishes a log for the dispatch-associated task. It is called by the launcher monitor event handler.
func (*DispatcherResourceManager) DispatchStateChange ¶
func (m *DispatcherResourceManager) DispatchStateChange(msg DispatchStateChange)
DispatchStateChange records state changes and propagates them to allocations. It is called by the launcher monitor event handler. Note to developers: this function locks so don't make API or DB calls without optimization.
func (*DispatcherResourceManager) EnableAgent ¶
func (m *DispatcherResourceManager) EnableAgent( msg *apiv1.EnableAgentRequest, ) (*apiv1.EnableAgentResponse, error)
EnableAgent removes an agent from the exclude list when launching jobs. Note to developers: this function doesn't acquire a lock and, ideally, we won't make it.
func (*DispatcherResourceManager) EnableSlot ¶
func (m *DispatcherResourceManager) EnableSlot(*apiv1.EnableSlotRequest, ) (resp *apiv1.EnableSlotResponse, err error)
EnableSlot implements 'det slot enable...' functionality.
func (*DispatcherResourceManager) ExternalPreemptionPending ¶ added in v0.750.0
func (m *DispatcherResourceManager) ExternalPreemptionPending(msg sproto.PendingPreemption) error
ExternalPreemptionPending notifies a task of a preemption from the underlying resource manager.
func (*DispatcherResourceManager) GetAgent ¶
func (m *DispatcherResourceManager) GetAgent( msg *apiv1.GetAgentRequest, ) (*apiv1.GetAgentResponse, error)
GetAgent implements rm.ResourceManager. Note to developers: this function must not acquire locks, since it is called to saturate UIs.
func (*DispatcherResourceManager) GetAgents ¶
func (m *DispatcherResourceManager) GetAgents() (*apiv1.GetAgentsResponse, error)
GetAgents implements rm.ResourceManager. Note to developers: this function must not acquire locks, since it is polled to saturate the UI.
func (*DispatcherResourceManager) GetAllocationSummaries ¶
func (m *DispatcherResourceManager) GetAllocationSummaries() ( map[model.AllocationID]sproto.AllocationSummary, error, )
GetAllocationSummaries implements rm.ResourceManager.
func (*DispatcherResourceManager) GetDefaultAuxResourcePool ¶
func (m *DispatcherResourceManager) GetDefaultAuxResourcePool() (rm.ResourcePoolName, error)
GetDefaultAuxResourcePool implements rm.ResourceManager.
func (*DispatcherResourceManager) GetDefaultComputeResourcePool ¶
func (m *DispatcherResourceManager) GetDefaultComputeResourcePool() (rm.ResourcePoolName, error)
GetDefaultComputeResourcePool implements rm.ResourceManager.
func (*DispatcherResourceManager) GetExternalJobs ¶
func (m *DispatcherResourceManager) GetExternalJobs(rpName rm.ResourcePoolName) ([]*jobv1.Job, error)
GetExternalJobs implements rm.ResourceManager.
func (*DispatcherResourceManager) GetJobQ ¶
func (m *DispatcherResourceManager) GetJobQ(rpName rm.ResourcePoolName) ( map[model.JobID]*sproto.RMJobInfo, error, )
GetJobQ implements rm.ResourceManager.
func (*DispatcherResourceManager) GetJobQueueStatsRequest ¶
func (m *DispatcherResourceManager) GetJobQueueStatsRequest( msg *apiv1.GetJobQueueStatsRequest, ) (*apiv1.GetJobQueueStatsResponse, error)
GetJobQueueStatsRequest implements rm.ResourceManager. This and other job queue saturation points should be refactored to not take locks.
func (*DispatcherResourceManager) GetNamespaceResourceQuota ¶
func (*DispatcherResourceManager) GetNamespaceResourceQuota(string, string) (*float64, error)
GetNamespaceResourceQuota is not supported.
func (*DispatcherResourceManager) GetResourcePools ¶
func (m *DispatcherResourceManager) GetResourcePools() ( *apiv1.GetResourcePoolsResponse, error, )
GetResourcePools retrieves details regarding hpc resources of the underlying system. Note to developers: this function must not acquire locks, since it is polled to saturate the UI.
func (*DispatcherResourceManager) GetSlot ¶
func (*DispatcherResourceManager) GetSlot(*apiv1.GetSlotRequest) (*apiv1.GetSlotResponse, error)
GetSlot is unsupported.
func (*DispatcherResourceManager) GetSlots ¶
func (*DispatcherResourceManager) GetSlots(*apiv1.GetSlotsRequest) (*apiv1.GetSlotsResponse, error)
GetSlots is unsupported.
func (*DispatcherResourceManager) HealthCheck ¶
func (m *DispatcherResourceManager) HealthCheck() []model.ResourceManagerHealth
HealthCheck tries to call launcher and check if it is reachable.
func (*DispatcherResourceManager) IsReattachEnabled ¶
func (m *DispatcherResourceManager) IsReattachEnabled() bool
IsReattachEnabled is always true for dispatcher-based job schedulers.
func (*DispatcherResourceManager) IsReattachEnabledForRP ¶
func (m *DispatcherResourceManager) IsReattachEnabledForRP(rpName string) bool
IsReattachEnabledForRP returns true for all resource pools.
func (*DispatcherResourceManager) IsReattachableOnlyAfterStarted ¶
func (m *DispatcherResourceManager) IsReattachableOnlyAfterStarted() bool
IsReattachableOnlyAfterStarted is always false for dispatcher-based job schedulers as the start_time is not set on our allocations.
func (*DispatcherResourceManager) KillDispatcherResources ¶
func (m *DispatcherResourceManager) KillDispatcherResources(msg KillDispatcherResources)
KillDispatcherResources puts a kill request on the queue. Note to developers: If it acquires locks, it must be fast (no DB or API calls).
func (*DispatcherResourceManager) NotifyContainerRunning ¶
func (m *DispatcherResourceManager) NotifyContainerRunning(msg sproto.NotifyContainerRunning) error
NotifyContainerRunning receives a notification from the container to let the master know that the container is running. Note to developers: this function doesn't need to acquire a lock. Let's keep it that way.
func (*DispatcherResourceManager) RecoverJobPosition ¶
func (m *DispatcherResourceManager) RecoverJobPosition(sproto.RecoverJobPosition)
RecoverJobPosition implements rm.ResourceManager.
func (*DispatcherResourceManager) Release ¶
func (m *DispatcherResourceManager) Release(msg sproto.ResourcesReleased)
Release implements rm.ResourceManager.
func (*DispatcherResourceManager) RemoveEmptyNamespace ¶
func (*DispatcherResourceManager) RemoveEmptyNamespace(string, string) error
RemoveEmptyNamespace is not supported.
func (*DispatcherResourceManager) ResolveResourcePool ¶
func (m *DispatcherResourceManager) ResolveResourcePool(name rm.ResourcePoolName, workspace, slots int, ) (rm.ResourcePoolName, error)
ResolveResourcePool returns the resolved slurm partition or an error if it doesn't exist or can't be resolved due to internal errors. Note to developers: this function doesn't acquire a lock and, ideally, we won't make it, since it is called a lot.
func (*DispatcherResourceManager) ResourceQueryPostActions ¶
func (m *DispatcherResourceManager) ResourceQueryPostActions( dispatchID string, owner string, )
ResourceQueryPostActions performs actions to clean up after any dispatch completion (either a Slurm resource query, or launched manifest allocation). In the case of retrieving the details of HPC Resources, the job is synchronous and is not being monitored, removeDispatchEnvironment is called to remove the slurm-resources-info file. We use dispatcher REST API calls to instruct the dispatcher to clean up. On success, the Dispatch (if present) is removed from the DB (if present). When querying Slurm resource information, the DispatchID is not registered with the DB, so we do not log an error if we fail to delete it. On any REST failure where we cannot confirm the dispatch has been removed by the launcher, we skip any attempt to delete the Dispatch from the DB. The Dispatch is left in the DB, for a future cleanup attempt on startup. Called only from fetchHpcResourceDetails and always run via go routine except the one time during startup to retrieve initial cluster cache.
func (*DispatcherResourceManager) SchedulePendingTasks ¶
func (m *DispatcherResourceManager) SchedulePendingTasks()
SchedulePendingTasks is called periodically to respond to allocations with resources when we have capacity to launch. Note to developers: this function only locks over DB calls in the restore path. Let's keep it this way.
func (*DispatcherResourceManager) SetGroupMaxSlots ¶
func (m *DispatcherResourceManager) SetGroupMaxSlots( msg sproto.SetGroupMaxSlots, )
SetGroupMaxSlots implements rm.ResourceManager.
func (*DispatcherResourceManager) SetGroupPriority ¶
func (*DispatcherResourceManager) SetGroupPriority(sproto.SetGroupPriority) error
SetGroupPriority implements rm.ResourceManager.
func (*DispatcherResourceManager) SetGroupWeight ¶
func (*DispatcherResourceManager) SetGroupWeight(sproto.SetGroupWeight) error
SetGroupWeight implements rm.ResourceManager.
func (*DispatcherResourceManager) SetResourceQuota ¶
func (*DispatcherResourceManager) SetResourceQuota(int, string, string) error
SetResourceQuota is unsupported.
func (*DispatcherResourceManager) StartDispatcherResources ¶
func (m *DispatcherResourceManager) StartDispatcherResources(msg StartDispatcherResources)
StartDispatcherResources starts an async process to launch a task. Note to developers: If it acquires locks, it must be fast (no DB or API calls).
func (*DispatcherResourceManager) TaskContainerDefaults ¶
func (m *DispatcherResourceManager) TaskContainerDefaults( resourcePoolName rm.ResourcePoolName, defaultConfig model.TaskContainerDefaultsConfig, ) (model.TaskContainerDefaultsConfig, error)
TaskContainerDefaults returns TaskContainerDefaults for the specified pool. Note to developers: this function doesn't need to acquire a lock. Let's keep it that way.
func (*DispatcherResourceManager) ValidateResourcePool ¶
func (m *DispatcherResourceManager) ValidateResourcePool(name rm.ResourcePoolName) error
ValidateResourcePool validates that the given resource pool exists. Note to developers: this function doesn't acquire a lock and, ideally, we won't make it, since it is called a lot.
func (*DispatcherResourceManager) ValidateResources ¶
func (*DispatcherResourceManager) ValidateResources( req sproto.ValidateResourcesRequest, ) ([]command.LaunchWarning, error)
ValidateResources implements rm.ResourceManager.
func (*DispatcherResourceManager) VerifyNamespaceExists ¶
func (*DispatcherResourceManager) VerifyNamespaceExists(string, string) error
VerifyNamespaceExists is unsupported.
type DispatcherResources ¶
type DispatcherResources struct {
// contains filtered or unexported fields
}
DispatcherResources information.
func (DispatcherResources) Kill ¶
func (r DispatcherResources) Kill(_ logger.Context)
Kill notifies the pods actor that it should stop the pod.
func (DispatcherResources) Start ¶
func (r DispatcherResources) Start( _ logger.Context, spec tasks.TaskSpec, rri sproto.ResourcesRuntimeInfo, ) error
Start notifies the pods actor that it should launch a pod for the provided task spec.
func (DispatcherResources) Summary ¶
func (r DispatcherResources) Summary() sproto.ResourcesSummary
Summary summarizes a container allocation.
type KillDispatcherResources ¶
type KillDispatcherResources struct { ResourcesID sproto.ResourcesID AllocationID model.AllocationID }
KillDispatcherResources tells the dispatcher RM to clean up the resources with the given resources ID.
type StartDispatcherResources ¶
type StartDispatcherResources struct { AllocationID model.AllocationID ResourcesID sproto.ResourcesID Spec tasks.TaskSpec UserConfiguredPriority bool }
StartDispatcherResources comment to keep "golint" from complaining.