Documentation ¶
Index ¶
- Constants
- Variables
- func ErrJobNotFound(jobID model.JobID) error
- func IsTransientSystemError(err error) bool
- func IsUnrecoverableSystemError(err error) bool
- func StringFromResourcePoolTypeProto(t resourcepoolv1.ResourcePoolType) string
- type AQueue
- type AgentSummary
- type AllocateRequest
- type AllocationSummary
- type CapacityCheck
- type CapacityCheckResponse
- type ChangeRP
- type ContainerLog
- type DeleteJob
- type DeleteJobResponse
- type ExitCode
- type FailureType
- type FittingRequirements
- type IdleTimeoutConfig
- type InvalidResourcesRequestError
- type KillTaskContainer
- type NotifyContainerRunning
- type PendingPreemption
- type PreemptionConfig
- type ProxyPortConfig
- type RMJobInfo
- type RecoverJobPosition
- type ReleaseResources
- type ResourceList
- type Resources
- type ResourcesAllocated
- type ResourcesEvent
- type ResourcesFailedError
- type ResourcesID
- type ResourcesReleased
- type ResourcesReleasedEvent
- type ResourcesRuntimeInfo
- type ResourcesStarted
- type ResourcesState
- type ResourcesStateChanged
- type ResourcesStopped
- type ResourcesSubscription
- type ResourcesSummary
- type ResourcesType
- type ResourcesUnsubscribeFn
- type ScalingInfo
- type SchedulingState
- type SetGroupMaxSlots
- type SetGroupPriority
- type SetGroupWeight
- type StartTaskContainer
- type TerminateDecision
- type ValidateResourcesRequest
- type ValidateResourcesResponse
Constants ¶
const ( // TerminateStoppedInstances represents the reason for terminating stopped instances. TerminateStoppedInstances = "stopped" // TerminateLongDisconnectedInstances represents the reason for terminating long // disconnected instances. TerminateLongDisconnectedInstances = "long disconnected" // TerminateLongIdleInstances represents the reason for terminating long idle instances. TerminateLongIdleInstances = "long idle" // InstanceNumberExceedsMaximum represents the reason for terminating instances because // the instance number exceeding the maximum. InstanceNumberExceedsMaximum = "instance number exceeding maximum" )
Constant protocol for the reasons of terminating an instance.
const ( // ResourcesTypeEnvVar is the name of the env var indicating the resource type to a task. ResourcesTypeEnvVar = "DET_RESOURCES_TYPE" // SlurmRendezvousIfaceEnvVar is the name of the env var for indicating the net iface on which // to rendezvous (horovodrun will use the IPs of the nodes on this interface to launch). SlurmRendezvousIfaceEnvVar = "DET_SLURM_RENDEZVOUS_IFACE" // SlurmProxyIfaceEnvVar is the env var for overriding the net iface used to proxy between // the master and agents. SlurmProxyIfaceEnvVar = "DET_SLURM_PROXY_IFACE" // ResourcesTypeK8sJob indicates the resources are a handle for a k8s pod. ResourcesTypeK8sJob ResourcesType = "k8s-job" // ResourcesTypeDockerContainer indicates the resources are a handle for a docker container. ResourcesTypeDockerContainer ResourcesType = "docker-container" // ResourcesTypeSlurmJob indicates the resources are a handle for a slurm job. ResourcesTypeSlurmJob ResourcesType = "slurm-job" )
const DecimalExp = 1000
DecimalExp is a constant used by decimal.Decimal objects to denote its exponent.
const K8sExp = 30
K8sExp is a constant used by decimal.Decimal objects to denote the exponent for Kubernetes labels as k8s labels are limited to 63 characters.
const (
// SuccessExitCode is the 0 zero value exit code.
SuccessExitCode = 0
)
Variables ¶
var ( // HeadAnchor is an internal anchor for the head of the job queue. HeadAnchor = model.JobID("INTERNAL-head") // TailAnchor is an internal anchor for the tail of the job queue. TailAnchor = model.JobID("INTERNAL-tail") )
var ScheduledStates = map[SchedulingState]bool{ SchedulingStateScheduled: true, SchedulingStateScheduledBackfilled: true, }
ScheduledStates provides a list of ScheduledStates that are considered scheduled.
Functions ¶
func ErrJobNotFound ¶
ErrJobNotFound returns a standard job error.
func IsTransientSystemError ¶
IsTransientSystemError checks if the error is caused by the system and shouldn't count against `max_restarts`.
func IsUnrecoverableSystemError ¶
IsUnrecoverableSystemError checks if the error is absolutely unrecoverable.
func StringFromResourcePoolTypeProto ¶
func StringFromResourcePoolTypeProto(t resourcepoolv1.ResourcePoolType) string
StringFromResourcePoolTypeProto returns a string from the protobuf resource pool type.
Types ¶
type AgentSummary ¶
AgentSummary contains information about an agent for external display.
type AllocateRequest ¶
type AllocateRequest struct { // Identifying information. AllocationID model.AllocationID TaskID model.TaskID JobID model.JobID RequestTime time.Time JobSubmissionTime time.Time // IsUserVisible determines whether the AllocateRequest should // be considered in user-visible reports. IsUserVisible bool State SchedulingState Name string // Resource configuration. SlotsNeeded int ResourcePool string FittingRequirements FittingRequirements // Behavioral configuration. Preemption PreemptionConfig IdleTimeout *IdleTimeoutConfig ProxyPorts []*ProxyPortConfig Restore bool ProxyTLS bool // Logging context of the allocation actor. LogContext logger.Context BlockedNodes []string }
AllocateRequest notifies resource managers to assign resources to a task.
type AllocationSummary ¶
type AllocationSummary struct { TaskID model.TaskID `json:"task_id"` AllocationID model.AllocationID `json:"allocation_id"` Name string `json:"name"` RegisteredTime time.Time `json:"registered_time"` ResourcePool string `json:"resource_pool"` SlotsNeeded int `json:"slots_needed"` Resources []ResourcesSummary `json:"resources"` SchedulerType string `json:"scheduler_type"` Priority *int `json:"priority"` ProxyPorts []*ProxyPortConfig `json:"proxy_ports,omitempty"` }
AllocationSummary contains information about a task for external display.
func (*AllocationSummary) Proto ¶
func (a *AllocationSummary) Proto() *taskv1.AllocationSummary
Proto returns the proto representation of AllocationSummary.
type CapacityCheck ¶
CapacityCheck checks the potential available slots in a resource pool.
type CapacityCheckResponse ¶
CapacityCheckResponse is the response to a CapacityCheck message.
type ChangeRP ¶
type ChangeRP struct {
ResourcePool string
}
ChangeRP notifies the task actor that to set itself for a new resource pool.
type ContainerLog ¶
type ContainerLog struct { ContainerID cproto.ID Timestamp time.Time // TODO(Brad): Pull message is totally pointless, does the same thing as aux message. PullMessage *string RunMessage *aproto.RunMessage AuxMessage *string // Level is typically unset, but set by parts of the system that know a log shouldn't // look as scary as is it. For example, it is set when an Allocation is killed intentionally // on the Killed logs. Level *string Source *string AgentID *string }
ContainerLog notifies the task actor that a new log message is available for the container. It is used by the resource providers to communicate internally and with the task handlers.
func (ContainerLog) Message ¶
func (c ContainerLog) Message() string
Message returns the textual content of this log message.
func (*ContainerLog) ResourcesEvent ¶
func (*ContainerLog) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
func (ContainerLog) String ¶
func (c ContainerLog) String() string
func (ContainerLog) ToTaskLog ¶
func (c ContainerLog) ToTaskLog() *model.TaskLog
ToTaskLog converts a container log to a task log.
type DeleteJob ¶
DeleteJob instructs the RM to clean up all metadata associated with a job external to Determined.
type DeleteJobResponse ¶
type DeleteJobResponse struct {
Err <-chan error
}
DeleteJobResponse returns to the caller if the cleanup was successful or not.
func DeleteJobResponseOf ¶
func DeleteJobResponseOf(input error) DeleteJobResponse
DeleteJobResponseOf returns a response containing the specified error.
func EmptyDeleteJobResponse ¶
func EmptyDeleteJobResponse() DeleteJobResponse
EmptyDeleteJobResponse returns a response with an empty error chan.
type ExitCode ¶
type ExitCode int
ExitCode is the process exit code of the container.
func FromContainerExitCode ¶
FromContainerExitCode converts an aproto.ExitCode to an ExitCode. ExitCode's type is subject to change - it may become an enum instead where we interpret the type of exit for consumers.
type FailureType ¶
type FailureType string
FailureType denotes the type of failure that resulted in the container stopping. Each FailureType must be handled by ./internal/task/allocation.go.
const ( // ResourcesFailed denotes that the container ran but failed with a non-zero exit code. ResourcesFailed FailureType = "resources failed with non-zero exit code" // ResourcesAborted denotes the container was canceled before it was started. ResourcesAborted FailureType = "resources was aborted before it started" // ResourcesMissing denotes the resources were missing when the master asked about it. ResourcesMissing FailureType = "request for action on unknown resources" // TaskAborted denotes that the task was canceled before it was started. TaskAborted FailureType = "task was aborted before the task was started" // TaskError denotes that the task failed without an associated exit code. TaskError FailureType = "task failed without an associated exit code" // AgentFailed denotes that the agent failed while the container was running. AgentFailed FailureType = "agent failed while the container was running" // AgentError denotes that the agent failed to launch the container. AgentError FailureType = "agent failed to launch the container" // RestoreError denotes a failure to restore a running allocation on master blip. RestoreError FailureType = "RM failed to restore the allocation" // UnknownError denotes an internal error that did not map to a know failure type. UnknownError = "unknown agent failure: %s" )
func FromContainerFailureType ¶
func FromContainerFailureType(t aproto.FailureType) FailureType
FromContainerFailureType converts an aproto.FailureType to a FailureType. This mapping is not guaranteed to remain one to one; this conversion may do some level of interpretation.
func (FailureType) Proto ¶
func (f FailureType) Proto() taskv1.FailureType
Proto returns the proto representation of the device type.
type FittingRequirements ¶
type FittingRequirements struct { // SingleAgent specifies that the task must be located within a single agent. SingleAgent bool }
FittingRequirements allow tasks to specify requirements for their placement.
type IdleTimeoutConfig ¶
type IdleTimeoutConfig struct { ServiceID string UseProxyState bool UseRunnerState bool TimeoutDuration time.Duration Debug bool }
IdleTimeoutConfig configures how idle timeouts should behave.
type InvalidResourcesRequestError ¶
type InvalidResourcesRequestError struct {
Cause error
}
InvalidResourcesRequestError is an unrecoverable validation error from the underlying RM.
func (InvalidResourcesRequestError) Error ¶
func (e InvalidResourcesRequestError) Error() string
func (*InvalidResourcesRequestError) ResourcesEvent ¶
func (*InvalidResourcesRequestError) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
type KillTaskContainer ¶
KillTaskContainer notifies the agent to kill a task container.
type NotifyContainerRunning ¶
type NotifyContainerRunning struct { AllocationID model.AllocationID Rank int32 NumPeers int32 NodeName string }
NotifyContainerRunning notifies the launcher (dispatcher) resource manager that the container is running.
type PendingPreemption ¶
type PendingPreemption struct {
AllocationID model.AllocationID
}
PendingPreemption notifies the task actor that it should release resources due to a pending system-triggered preemption.
type PreemptionConfig ¶
PreemptionConfig configures task preemption.
type ProxyPortConfig ¶
type ProxyPortConfig struct { ServiceID string `json:"service_id"` Port int `json:"port"` ProxyTCP bool `json:"proxy_tcp"` Unauthenticated bool `json:"unauthenticated"` }
ProxyPortConfig configures a proxy the allocation should start.
func NewProxyPortConfig ¶
func NewProxyPortConfig(input expconf.ProxyPortsConfig, taskID model.TaskID) []*ProxyPortConfig
NewProxyPortConfig converts expconf proxy configs into internal representation.
func (*ProxyPortConfig) Proto ¶
func (p *ProxyPortConfig) Proto() *taskv1.ProxyPortConfig
Proto returns the proto representation of ProxyPortConfig.
type RMJobInfo ¶
type RMJobInfo struct { JobsAhead int State SchedulingState RequestedSlots int AllocatedSlots int }
RMJobInfo packs information available only to the RM that updates frequently.
type RecoverJobPosition ¶
type RecoverJobPosition struct { JobID model.JobID JobPosition decimal.Decimal ResourcePool string }
RecoverJobPosition gets sent from the experiment or command actor to the resource pool. Notifies the resource pool of the position of the job.
type ReleaseResources ¶
type ReleaseResources struct { Reason string // If specified as true (default false), Requestor wants to force // a preemption attempt instead of an immediate kill. ForcePreemption bool ForceKill bool }
ReleaseResources notifies the task actor to release resources.
func (*ReleaseResources) ResourcesEvent ¶
func (*ReleaseResources) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
type ResourceList ¶
type ResourceList map[ResourcesID]Resources
ResourceList is a wrapper for a list of resources.
type Resources ¶
type Resources interface { Summary() ResourcesSummary Start(logger.Context, tasks.TaskSpec, ResourcesRuntimeInfo) error Kill(logger.Context) }
Resources is an interface that provides function for task actors to start tasks on assigned resources.
type ResourcesAllocated ¶
type ResourcesAllocated struct { ID model.AllocationID ResourcePool string Resources ResourceList JobSubmissionTime time.Time Recovered bool }
ResourcesAllocated notifies the task actor of assigned resources.
func (ResourcesAllocated) Clone ¶
func (ra ResourcesAllocated) Clone() *ResourcesAllocated
Clone clones ResourcesAllocated. Used to not pass mutable refs to other actors.
func (*ResourcesAllocated) ResourcesEvent ¶
func (*ResourcesAllocated) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
type ResourcesEvent ¶
type ResourcesEvent interface{ ResourcesEvent() }
ResourcesEvent describes a change in status or state of an allocation's resources.
type ResourcesFailedError ¶
type ResourcesFailedError struct { FailureType FailureType ErrMsg string ExitCode *ExitCode }
ResourcesFailedError contains information about restored resources' failure.
func NewResourcesFailure ¶
func NewResourcesFailure( failureType FailureType, msg string, code *ExitCode, ) *ResourcesFailedError
NewResourcesFailure returns a resources failure message wrapping the type, msg and exit code.
func (ResourcesFailedError) Error ¶
func (f ResourcesFailedError) Error() string
func (*ResourcesFailedError) Proto ¶
func (f *ResourcesFailedError) Proto() *taskv1.ResourcesFailure
Proto returns the proto representation of ResourcesFailure.
func (*ResourcesFailedError) ResourcesEvent ¶
func (*ResourcesFailedError) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
type ResourcesID ¶
type ResourcesID string
ResourcesID is the ID of some set of resources.
func FromContainerID ¶
func FromContainerID(cID cproto.ID) ResourcesID
FromContainerID converts a cproto.ID to a ResourcesID.
type ResourcesReleased ¶
type ResourcesReleased struct { AllocationID model.AllocationID ResourcesID *ResourcesID ResourcePool string }
ResourcesReleased notifies resource providers to return resources from a task.
type ResourcesReleasedEvent ¶
type ResourcesReleasedEvent struct{}
ResourcesReleasedEvent notes when the RM has acknowledged resources are released.
func (ResourcesReleasedEvent) ResourcesEvent ¶
func (ResourcesReleasedEvent) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
type ResourcesRuntimeInfo ¶
ResourcesRuntimeInfo is all the information provided at runtime to make a task spec.
type ResourcesStarted ¶
type ResourcesStarted struct { Addresses []cproto.Address // NativeResourcesID is the native Docker hex container ID of the Determined container. NativeResourcesID string }
ResourcesStarted contains the information needed by tasks from container started.
func FromContainerStarted ¶
func FromContainerStarted(cs *aproto.ContainerStarted) *ResourcesStarted
FromContainerStarted converts an aproto.ContainerStarted message to ResourcesStarted.
func (*ResourcesStarted) Proto ¶
func (r *ResourcesStarted) Proto() *taskv1.ResourcesStarted
Proto returns the proto representation of ResourcesStarted.
type ResourcesState ¶
type ResourcesState string
ResourcesState is the state of some set of resources.
const ( // Assigned state means that the resources have been assigned. Assigned ResourcesState = "ASSIGNED" // Pulling state means that the resources are pulling container images. Pulling ResourcesState = "PULLING" // Starting state means the service running on the resources is being started. Starting ResourcesState = "STARTING" // Running state means that the service on the resources is running. Running ResourcesState = "RUNNING" // Terminated state means that the resources have exited or has been aborted. Terminated ResourcesState = "TERMINATED" // Unknown state is a null value. Unknown ResourcesState = "" )
func FromContainerState ¶
func FromContainerState(state cproto.State) ResourcesState
FromContainerState converts a cproto.State to ResourcesState. This may shortly become much less granular (not a one to one mapping).
func (ResourcesState) BeforeOrEqual ¶
func (s ResourcesState) BeforeOrEqual(other ResourcesState) bool
BeforeOrEqual returns if one state is chronologically before or the same as other.
func (ResourcesState) String ¶
func (s ResourcesState) String() string
type ResourcesStateChanged ¶
type ResourcesStateChanged struct { ResourcesID ResourcesID ResourcesState ResourcesState ResourcesStarted *ResourcesStarted ResourcesStopped *ResourcesStopped }
ResourcesStateChanged notifies that the task actor container state has been transitioned. It is used by the resource managers to communicate with the task handlers.
func FromContainerStateChanged ¶
func FromContainerStateChanged(sc aproto.ContainerStateChanged) *ResourcesStateChanged
FromContainerStateChanged converts an aproto.ContainerStateChanged message to ResourcesStateChanged.
func (*ResourcesStateChanged) ResourcesEvent ¶
func (*ResourcesStateChanged) ResourcesEvent()
ResourcesEvent implements ResourcesEvent.
func (ResourcesStateChanged) String ¶
func (r ResourcesStateChanged) String() string
type ResourcesStopped ¶
type ResourcesStopped struct {
Failure *ResourcesFailedError
}
ResourcesStopped contains the information needed by tasks from container stopped.
func FromContainerStopped ¶
func FromContainerStopped(cs *aproto.ContainerStopped) *ResourcesStopped
FromContainerStopped converts an aproto.ContainerStopped message to ResourcesStopped.
func ResourcesError ¶
func ResourcesError(failureType FailureType, err error) ResourcesStopped
ResourcesError returns a resources stopped message wrapping the provided error. If the error is nil, a stack trace is provided instead.
func (*ResourcesStopped) Proto ¶
func (r *ResourcesStopped) Proto() *taskv1.ResourcesStopped
Proto returns the proto representation of ResourcesStopped.
func (ResourcesStopped) String ¶
func (r ResourcesStopped) String() string
type ResourcesSubscription ¶
type ResourcesSubscription struct {
// contains filtered or unexported fields
}
ResourcesSubscription is a subscription for streaming ResourcesEvents's. It must be closed when you are finished consuming events. Blocking on C forever can cause the publisher to backup and adversely affect the system.
func NewAllocationSubscription ¶
func NewAllocationSubscription( inbox *queue.Queue[ResourcesEvent], cl ResourcesUnsubscribeFn, ) *ResourcesSubscription
NewAllocationSubscription create a new subcription.
func (*ResourcesSubscription) Close ¶
func (a *ResourcesSubscription) Close()
Close unsubscribes us from further updates.
func (*ResourcesSubscription) Get ¶
func (a *ResourcesSubscription) Get() ResourcesEvent
Get blocks until an event is published for our subscription's topic. When the subscription is closed, ResourcesReleasedEvent is returned.
func (*ResourcesSubscription) GetWithContext ¶
func (a *ResourcesSubscription) GetWithContext(ctx context.Context) (ResourcesEvent, error)
GetWithContext blocks until an event is published for our subscription's topic or the context is canceled. When the subscription is closed, ResourcesReleasedEvent is returned.
func (*ResourcesSubscription) Len ¶
func (a *ResourcesSubscription) Len() int
Len returns the count of pending events.
type ResourcesSummary ¶
type ResourcesSummary struct { ResourcesID ResourcesID `json:"resources_id"` ResourcesType ResourcesType `json:"resources_type"` AllocationID model.AllocationID `json:"allocation_id"` AgentDevices map[aproto.ID][]device.Device `json:"agent_devices"` // Available if the RM can give information on the container level. ContainerID *cproto.ID `json:"container_id"` // Available if the RM knows the resource is already started / exited. Started *ResourcesStarted Exited *ResourcesStopped }
ResourcesSummary provides a summary of the resources comprising what we know at the time the allocation is granted, but for k8s it is granted before being scheduled so it isn't really much and `agent_devices` are missing for k8s.
func (*ResourcesSummary) Proto ¶
func (s *ResourcesSummary) Proto() *taskv1.ResourcesSummary
Proto returns the proto representation of ResourcesSummary.
func (ResourcesSummary) Slots ¶
func (s ResourcesSummary) Slots() int
Slots returns slot count for the resources.
type ResourcesType ¶
type ResourcesType string
ResourcesType is the type of some set of resources. This should be purely informational.
type ResourcesUnsubscribeFn ¶
type ResourcesUnsubscribeFn func()
ResourcesUnsubscribeFn closes a subscription.
type ScalingInfo ¶
type ScalingInfo struct { DesiredNewInstances int Agents map[string]AgentSummary }
ScalingInfo describes the information that is needed for scaling.
func (*ScalingInfo) Update ¶
func (s *ScalingInfo) Update(desiredNewInstanceNum int, agents map[string]AgentSummary) bool
Update updates its desired new instance number and the agent summaries.
type SchedulingState ¶
type SchedulingState uint8 // CHECK perhaps could be defined in resource manager. cyclic import
SchedulingState denotes the scheduling state of a job and in order of its progression value.
const ( // SchedulingStateQueued denotes a queued job waiting to be scheduled. SchedulingStateQueued SchedulingState = 0 // SchedulingStateScheduledBackfilled denotes a job that is scheduled for execution as a backfill. SchedulingStateScheduledBackfilled SchedulingState = 1 // SchedulingStateScheduled denotes a job that is scheduled for execution. SchedulingStateScheduled SchedulingState = 2 )
func SchedulingStateFromProto ¶
func SchedulingStateFromProto(state jobv1.State) SchedulingState
SchedulingStateFromProto returns SchedulingState from proto representation.
func (SchedulingState) Proto ¶
func (s SchedulingState) Proto() jobv1.State
Proto returns proto representation of SchedulingState.
type SetGroupMaxSlots ¶
SetGroupMaxSlots sets the maximum number of slots that a group can consume in the cluster.
type SetGroupPriority ¶
SetGroupPriority sets the priority of the group in the priority scheduler.
type SetGroupWeight ¶
SetGroupWeight sets the weight of a group in the fair share scheduler.
type StartTaskContainer ¶
type StartTaskContainer struct { AllocationID model.AllocationID aproto.StartContainer LogContext logger.Context }
StartTaskContainer notifies the agent to start the task with the provided task spec.
type TerminateDecision ¶
TerminateDecision describes a terminating decision.
func (TerminateDecision) String ¶
func (t TerminateDecision) String() string
String returns a representative string.
type ValidateResourcesRequest ¶
type ValidateResourcesRequest struct { ResourcePool string Slots int IsSingleNode bool TaskID *model.TaskID }
ValidateResourcesRequest is a message asking resource manager whether the given resource pool can (or, rather, if it's not impossible to) fulfill the request for the given amount of slots.
type ValidateResourcesResponse ¶
type ValidateResourcesResponse struct { // Fulfillable values: // - false: impossible to fulfill // - true: ok or unknown Fulfillable bool }
ValidateResourcesResponse is the response to ValidateResourcesRequest.