sproto

package
v0.750.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TerminateStoppedInstances represents the reason for terminating stopped instances.
	TerminateStoppedInstances = "stopped"
	// TerminateLongDisconnectedInstances represents the reason for terminating long
	// disconnected instances.
	TerminateLongDisconnectedInstances = "long disconnected"
	// TerminateLongIdleInstances represents the reason for terminating long idle instances.
	TerminateLongIdleInstances = "long idle"
	// InstanceNumberExceedsMaximum represents the reason for terminating instances because
	// the instance number exceeding the maximum.
	InstanceNumberExceedsMaximum = "instance number exceeding maximum"
)

Constant protocol for the reasons of terminating an instance.

View Source
const (
	// ResourcesTypeEnvVar is the name of the env var indicating the resource type to a task.
	ResourcesTypeEnvVar = "DET_RESOURCES_TYPE"
	// SlurmRendezvousIfaceEnvVar is the name of the env var for indicating the net iface on which
	// to rendezvous (horovodrun will use the IPs of the nodes on this interface to launch).
	SlurmRendezvousIfaceEnvVar = "DET_SLURM_RENDEZVOUS_IFACE"
	// SlurmProxyIfaceEnvVar is the env var for overriding the net iface used to proxy between
	// the master and agents.
	SlurmProxyIfaceEnvVar = "DET_SLURM_PROXY_IFACE"
	// ResourcesTypeK8sJob indicates the resources are a handle for a k8s pod.
	ResourcesTypeK8sJob ResourcesType = "k8s-job"
	// ResourcesTypeDockerContainer indicates the resources are a handle for a docker container.
	ResourcesTypeDockerContainer ResourcesType = "docker-container"
	// ResourcesTypeSlurmJob indicates the resources are a handle for a slurm job.
	ResourcesTypeSlurmJob ResourcesType = "slurm-job"
)
View Source
const DecimalExp = 1000

DecimalExp is a constant used by decimal.Decimal objects to denote its exponent.

View Source
const K8sExp = 30

K8sExp is a constant used by decimal.Decimal objects to denote the exponent for Kubernetes labels as k8s labels are limited to 63 characters.

View Source
const (
	// SuccessExitCode is the 0 zero value exit code.
	SuccessExitCode = 0
)

Variables

View Source
var (
	// HeadAnchor is an internal anchor for the head of the job queue.
	HeadAnchor = model.JobID("INTERNAL-head")
	// TailAnchor is an internal anchor for the tail of the job queue.
	TailAnchor = model.JobID("INTERNAL-tail")
)

ScheduledStates provides a list of ScheduledStates that are considered scheduled.

Functions

func ErrJobNotFound

func ErrJobNotFound(jobID model.JobID) error

ErrJobNotFound returns a standard job error.

func IsTransientSystemError

func IsTransientSystemError(err error) bool

IsTransientSystemError checks if the error is caused by the system and shouldn't count against `max_restarts`.

func IsUnrecoverableSystemError

func IsUnrecoverableSystemError(err error) bool

IsUnrecoverableSystemError checks if the error is absolutely unrecoverable.

func StringFromResourcePoolTypeProto

func StringFromResourcePoolTypeProto(t resourcepoolv1.ResourcePoolType) string

StringFromResourcePoolTypeProto returns a string from the protobuf resource pool type.

Types

type AQueue

type AQueue = map[model.JobID]*RMJobInfo

AQueue is a map of jobID to RMJobInfo.

type AgentSummary

type AgentSummary struct {
	Name   string
	IsIdle bool
}

AgentSummary contains information about an agent for external display.

type AllocateRequest

type AllocateRequest struct {
	// Identifying information.
	AllocationID      model.AllocationID
	TaskID            model.TaskID
	JobID             model.JobID
	RequestTime       time.Time
	JobSubmissionTime time.Time
	// IsUserVisible determines whether the AllocateRequest should
	// be considered in user-visible reports.
	IsUserVisible bool
	State         SchedulingState
	Name          string

	// Resource configuration.
	SlotsNeeded         int
	ResourcePool        string
	FittingRequirements FittingRequirements

	// Behavioral configuration.
	Preemptible bool
	IdleTimeout *IdleTimeoutConfig
	ProxyPorts  []*ProxyPortConfig
	Restore     bool
	ProxyTLS    bool

	// Logging context of the allocation actor.
	LogContext logger.Context

	BlockedNodes []string
}

AllocateRequest notifies resource managers to assign resources to a task.

type AllocationSummary

type AllocationSummary struct {
	TaskID         model.TaskID       `json:"task_id"`
	AllocationID   model.AllocationID `json:"allocation_id"`
	Name           string             `json:"name"`
	RegisteredTime time.Time          `json:"registered_time"`
	ResourcePool   string             `json:"resource_pool"`
	SlotsNeeded    int                `json:"slots_needed"`
	Resources      []ResourcesSummary `json:"resources"`
	SchedulerType  string             `json:"scheduler_type"`
	Priority       *int               `json:"priority"`
	ProxyPorts     []*ProxyPortConfig `json:"proxy_ports,omitempty"`
}

AllocationSummary contains information about a task for external display.

func (*AllocationSummary) Proto

Proto returns the proto representation of AllocationSummary.

type CapacityCheck

type CapacityCheck struct {
	Slots  int
	TaskID *model.TaskID
}

CapacityCheck checks the potential available slots in a resource pool.

type CapacityCheckResponse

type CapacityCheckResponse struct {
	SlotsAvailable   int
	CapacityExceeded bool
}

CapacityCheckResponse is the response to a CapacityCheck message.

type ChangeRP

type ChangeRP struct {
	ResourcePool string
}

ChangeRP notifies the task actor that to set itself for a new resource pool.

type ContainerLog

type ContainerLog struct {
	ContainerID cproto.ID
	Timestamp   time.Time

	// TODO(Brad): Pull message is totally pointless, does the same thing as aux message.
	PullMessage *string
	RunMessage  *aproto.RunMessage
	AuxMessage  *string

	// Level is typically unset, but set by parts of the system that know a log shouldn't
	// look as scary as is it. For example, it is set when an Allocation is killed intentionally
	// on the Killed logs.
	Level   *string
	Source  *string
	AgentID *string
}

ContainerLog notifies the task actor that a new log message is available for the container. It is used by the resource providers to communicate internally and with the task handlers.

func (ContainerLog) Message

func (c ContainerLog) Message() string

Message returns the textual content of this log message.

func (*ContainerLog) ResourcesEvent

func (*ContainerLog) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

func (ContainerLog) String

func (c ContainerLog) String() string

func (ContainerLog) ToTaskLog

func (c ContainerLog) ToTaskLog() *model.TaskLog

ToTaskLog converts a container log to a task log.

type DeleteJob

type DeleteJob struct {
	JobID model.JobID
}

DeleteJob instructs the RM to clean up all metadata associated with a job external to Determined.

type DeleteJobResponse

type DeleteJobResponse struct {
	Err <-chan error
}

DeleteJobResponse returns to the caller if the cleanup was successful or not.

func DeleteJobResponseOf

func DeleteJobResponseOf(input error) DeleteJobResponse

DeleteJobResponseOf returns a response containing the specified error.

func EmptyDeleteJobResponse

func EmptyDeleteJobResponse() DeleteJobResponse

EmptyDeleteJobResponse returns a response with an empty error chan.

type ExitCode

type ExitCode int

ExitCode is the process exit code of the container.

func FromContainerExitCode

func FromContainerExitCode(c *aproto.ExitCode) *ExitCode

FromContainerExitCode converts an aproto.ExitCode to an ExitCode. ExitCode's type is subject to change - it may become an enum instead where we interpret the type of exit for consumers.

type FailureType

type FailureType string

FailureType denotes the type of failure that resulted in the container stopping. Each FailureType must be handled by ./internal/task/allocation.go.

const (
	// ResourcesFailed denotes that the container ran but failed with a non-zero exit code.
	ResourcesFailed FailureType = "resources failed with non-zero exit code"

	// ResourcesAborted denotes the container was canceled before it was started.
	ResourcesAborted FailureType = "resources was aborted before it started"

	// ResourcesMissing denotes the resources were missing when the master asked about it.
	ResourcesMissing FailureType = "request for action on unknown resources"

	// TaskAborted denotes that the task was canceled before it was started.
	TaskAborted FailureType = "task was aborted before the task was started"

	// TaskError denotes that the task failed without an associated exit code.
	TaskError FailureType = "task failed without an associated exit code"

	// AgentFailed denotes that the agent failed while the container was running.
	AgentFailed FailureType = "agent failed while the container was running"

	// AgentError denotes that the agent failed to launch the container.
	AgentError FailureType = "agent failed to launch the container"

	// RestoreError denotes a failure to restore a running allocation on master blip.
	RestoreError FailureType = "RM failed to restore the allocation"

	// UnknownError denotes an internal error that did not map to a know failure type.
	UnknownError = "unknown agent failure: %s"
)

func FromContainerFailureType

func FromContainerFailureType(t aproto.FailureType) FailureType

FromContainerFailureType converts an aproto.FailureType to a FailureType. This mapping is not guaranteed to remain one to one; this conversion may do some level of interpretation.

func (FailureType) Proto

func (f FailureType) Proto() taskv1.FailureType

Proto returns the proto representation of the device type.

type FittingRequirements

type FittingRequirements struct {
	// SingleAgent specifies that the task must be located within a single agent.
	SingleAgent bool
}

FittingRequirements allow tasks to specify requirements for their placement.

type IdleTimeoutConfig

type IdleTimeoutConfig struct {
	ServiceID       string
	UseProxyState   bool
	UseRunnerState  bool
	TimeoutDuration time.Duration
	Debug           bool
}

IdleTimeoutConfig configures how idle timeouts should behave.

type InvalidResourcesRequestError

type InvalidResourcesRequestError struct {
	Cause error
}

InvalidResourcesRequestError is an unrecoverable validation error from the underlying RM.

func (InvalidResourcesRequestError) Error

func (*InvalidResourcesRequestError) ResourcesEvent

func (*InvalidResourcesRequestError) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

type KillTaskContainer

type KillTaskContainer struct {
	ContainerID cproto.ID

	LogContext logger.Context
}

KillTaskContainer notifies the agent to kill a task container.

type NotifyContainerRunning

type NotifyContainerRunning struct {
	AllocationID model.AllocationID
	Rank         int32
	NumPeers     int32
	NodeName     string
}

NotifyContainerRunning notifies the launcher (dispatcher) resource manager that the container is running.

type PendingPreemption

type PendingPreemption struct {
	AllocationID model.AllocationID
}

PendingPreemption notifies the task actor that it should release resources due to a pending system-triggered preemption.

type ProxyPortConfig

type ProxyPortConfig struct {
	ServiceID       string `json:"service_id"`
	Port            int    `json:"port"`
	ProxyTCP        bool   `json:"proxy_tcp"`
	Unauthenticated bool   `json:"unauthenticated"`
}

ProxyPortConfig configures a proxy the allocation should start.

func NewProxyPortConfig

func NewProxyPortConfig(input expconf.ProxyPortsConfig, taskID model.TaskID) []*ProxyPortConfig

NewProxyPortConfig converts expconf proxy configs into internal representation.

func (*ProxyPortConfig) Proto

Proto returns the proto representation of ProxyPortConfig.

type RMJobInfo

type RMJobInfo struct {
	JobsAhead      int
	State          SchedulingState
	RequestedSlots int
	AllocatedSlots int
}

RMJobInfo packs information available only to the RM that updates frequently.

type RecoverJobPosition

type RecoverJobPosition struct {
	JobID        model.JobID
	JobPosition  decimal.Decimal
	ResourcePool string
}

RecoverJobPosition gets sent from the experiment or command actor to the resource pool. Notifies the resource pool of the position of the job.

type ReleaseResources

type ReleaseResources struct {
	Reason string
	// If specified as true (default false), Requestor wants to force
	// a preemption attempt instead of an immediate kill.
	ForcePreemption bool
	ForceKill       bool
}

ReleaseResources notifies the task actor to release resources.

func (*ReleaseResources) ResourcesEvent

func (*ReleaseResources) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

type ResourceList

type ResourceList map[ResourcesID]Resources

ResourceList is a wrapper for a list of resources.

type Resources

type Resources interface {
	Summary() ResourcesSummary
	Start(logger.Context, tasks.TaskSpec, ResourcesRuntimeInfo) error
	Kill(logger.Context)
}

Resources is an interface that provides function for task actors to start tasks on assigned resources.

type ResourcesAllocated

type ResourcesAllocated struct {
	ID                model.AllocationID
	ResourcePool      string
	Resources         ResourceList
	JobSubmissionTime time.Time
	Recovered         bool
}

ResourcesAllocated notifies the task actor of assigned resources.

func (ResourcesAllocated) Clone

Clone clones ResourcesAllocated. Used to not pass mutable refs to other actors.

func (*ResourcesAllocated) ResourcesEvent

func (*ResourcesAllocated) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

type ResourcesEvent

type ResourcesEvent interface{ ResourcesEvent() }

ResourcesEvent describes a change in status or state of an allocation's resources.

type ResourcesFailedError

type ResourcesFailedError struct {
	FailureType FailureType
	ErrMsg      string
	ExitCode    *ExitCode
}

ResourcesFailedError contains information about restored resources' failure.

func NewResourcesFailure

func NewResourcesFailure(
	failureType FailureType, msg string, code *ExitCode,
) *ResourcesFailedError

NewResourcesFailure returns a resources failure message wrapping the type, msg and exit code.

func (ResourcesFailedError) Error

func (f ResourcesFailedError) Error() string

func (*ResourcesFailedError) Proto

Proto returns the proto representation of ResourcesFailure.

func (*ResourcesFailedError) ResourcesEvent

func (*ResourcesFailedError) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

type ResourcesID

type ResourcesID string

ResourcesID is the ID of some set of resources.

func FromContainerID

func FromContainerID(cID cproto.ID) ResourcesID

FromContainerID converts a cproto.ID to a ResourcesID.

type ResourcesReleased

type ResourcesReleased struct {
	AllocationID model.AllocationID
	ResourcesID  *ResourcesID
	ResourcePool string
}

ResourcesReleased notifies resource providers to return resources from a task.

type ResourcesReleasedEvent

type ResourcesReleasedEvent struct{}

ResourcesReleasedEvent notes when the RM has acknowledged resources are released.

func (ResourcesReleasedEvent) ResourcesEvent

func (ResourcesReleasedEvent) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

type ResourcesRuntimeInfo

type ResourcesRuntimeInfo struct {
	Token        string
	AgentRank    int
	IsMultiAgent bool
}

ResourcesRuntimeInfo is all the information provided at runtime to make a task spec.

type ResourcesStarted

type ResourcesStarted struct {
	Addresses []cproto.Address
	// NativeResourcesID is the native Docker hex container ID of the Determined container.
	NativeResourcesID string
}

ResourcesStarted contains the information needed by tasks from container started.

func FromContainerStarted

func FromContainerStarted(cs *aproto.ContainerStarted) *ResourcesStarted

FromContainerStarted converts an aproto.ContainerStarted message to ResourcesStarted.

func (*ResourcesStarted) Proto

Proto returns the proto representation of ResourcesStarted.

type ResourcesState

type ResourcesState string

ResourcesState is the state of some set of resources.

const (
	// Assigned state means that the resources have been assigned.
	Assigned ResourcesState = "ASSIGNED"
	// Pulling state means that the resources are pulling container images.
	Pulling ResourcesState = "PULLING"
	// Starting state means the service running on the resources is being started.
	Starting ResourcesState = "STARTING"
	// Running state means that the service on the resources is running.
	Running ResourcesState = "RUNNING"
	// Terminated state means that the resources have exited or has been aborted.
	Terminated ResourcesState = "TERMINATED"
	// Unknown state is a null value.
	Unknown ResourcesState = ""
)

func FromContainerState

func FromContainerState(state cproto.State) ResourcesState

FromContainerState converts a cproto.State to ResourcesState. This may shortly become much less granular (not a one to one mapping).

func (ResourcesState) BeforeOrEqual

func (s ResourcesState) BeforeOrEqual(other ResourcesState) bool

BeforeOrEqual returns if one state is chronologically before or the same as other.

func (ResourcesState) String

func (s ResourcesState) String() string

type ResourcesStateChanged

type ResourcesStateChanged struct {
	ResourcesID    ResourcesID
	ResourcesState ResourcesState

	ResourcesStarted *ResourcesStarted
	ResourcesStopped *ResourcesStopped
}

ResourcesStateChanged notifies that the task actor container state has been transitioned. It is used by the resource managers to communicate with the task handlers.

func FromContainerStateChanged

func FromContainerStateChanged(sc aproto.ContainerStateChanged) *ResourcesStateChanged

FromContainerStateChanged converts an aproto.ContainerStateChanged message to ResourcesStateChanged.

func (*ResourcesStateChanged) ResourcesEvent

func (*ResourcesStateChanged) ResourcesEvent()

ResourcesEvent implements ResourcesEvent.

func (ResourcesStateChanged) String

func (r ResourcesStateChanged) String() string

type ResourcesStopped

type ResourcesStopped struct {
	Failure *ResourcesFailedError
}

ResourcesStopped contains the information needed by tasks from container stopped.

func FromContainerStopped

func FromContainerStopped(cs *aproto.ContainerStopped) *ResourcesStopped

FromContainerStopped converts an aproto.ContainerStopped message to ResourcesStopped.

func ResourcesError

func ResourcesError(failureType FailureType, err error) ResourcesStopped

ResourcesError returns a resources stopped message wrapping the provided error. If the error is nil, a stack trace is provided instead.

func (*ResourcesStopped) Proto

Proto returns the proto representation of ResourcesStopped.

func (ResourcesStopped) String

func (r ResourcesStopped) String() string

type ResourcesSubscription

type ResourcesSubscription struct {
	// contains filtered or unexported fields
}

ResourcesSubscription is a subscription for streaming ResourcesEvents's. It must be closed when you are finished consuming events. Blocking on C forever can cause the publisher to backup and adversely affect the system.

func NewAllocationSubscription

func NewAllocationSubscription(
	inbox *queue.Queue[ResourcesEvent],
	cl ResourcesUnsubscribeFn,
) *ResourcesSubscription

NewAllocationSubscription create a new subcription.

func (*ResourcesSubscription) Close

func (a *ResourcesSubscription) Close()

Close unsubscribes us from further updates.

func (*ResourcesSubscription) Get

Get blocks until an event is published for our subscription's topic. When the subscription is closed, ResourcesReleasedEvent is returned.

func (*ResourcesSubscription) GetWithContext

func (a *ResourcesSubscription) GetWithContext(ctx context.Context) (ResourcesEvent, error)

GetWithContext blocks until an event is published for our subscription's topic or the context is canceled. When the subscription is closed, ResourcesReleasedEvent is returned.

func (*ResourcesSubscription) Len

func (a *ResourcesSubscription) Len() int

Len returns the count of pending events.

type ResourcesSummary

type ResourcesSummary struct {
	ResourcesID   ResourcesID                   `json:"resources_id"`
	ResourcesType ResourcesType                 `json:"resources_type"`
	AllocationID  model.AllocationID            `json:"allocation_id"`
	AgentDevices  map[aproto.ID][]device.Device `json:"agent_devices"`

	// Available if the RM can give information on the container level.
	ContainerID *cproto.ID `json:"container_id"`

	// Available if the RM knows the resource is already started / exited.
	Started *ResourcesStarted
	Exited  *ResourcesStopped
}

ResourcesSummary provides a summary of the resources comprising what we know at the time the allocation is granted, but for k8s it is granted before being scheduled so it isn't really much and `agent_devices` are missing for k8s.

func (*ResourcesSummary) Proto

Proto returns the proto representation of ResourcesSummary.

func (ResourcesSummary) Slots

func (s ResourcesSummary) Slots() int

Slots returns slot count for the resources.

type ResourcesType

type ResourcesType string

ResourcesType is the type of some set of resources. This should be purely informational.

type ResourcesUnsubscribeFn

type ResourcesUnsubscribeFn func()

ResourcesUnsubscribeFn closes a subscription.

type ScalingInfo

type ScalingInfo struct {
	DesiredNewInstances int
	Agents              map[string]AgentSummary
}

ScalingInfo describes the information that is needed for scaling.

func (*ScalingInfo) Update

func (s *ScalingInfo) Update(desiredNewInstanceNum int, agents map[string]AgentSummary) bool

Update updates its desired new instance number and the agent summaries.

type SchedulingState

type SchedulingState uint8 // CHECK perhaps could be defined in resource manager. cyclic import

SchedulingState denotes the scheduling state of a job and in order of its progression value.

const (
	// SchedulingStateQueued denotes a queued job waiting to be scheduled.
	SchedulingStateQueued SchedulingState = 0
	// SchedulingStateScheduledBackfilled denotes a job that is scheduled for execution as a backfill.
	SchedulingStateScheduledBackfilled SchedulingState = 1
	// SchedulingStateScheduled denotes a job that is scheduled for execution.
	SchedulingStateScheduled SchedulingState = 2
)

func SchedulingStateFromProto

func SchedulingStateFromProto(state jobv1.State) SchedulingState

SchedulingStateFromProto returns SchedulingState from proto representation.

func (SchedulingState) Proto

func (s SchedulingState) Proto() jobv1.State

Proto returns proto representation of SchedulingState.

type SetGroupMaxSlots

type SetGroupMaxSlots struct {
	MaxSlots     *int
	ResourcePool string
	JobID        model.JobID
}

SetGroupMaxSlots sets the maximum number of slots that a group can consume in the cluster.

type SetGroupPriority

type SetGroupPriority struct {
	Priority     int
	ResourcePool string
	JobID        model.JobID
}

SetGroupPriority sets the priority of the group in the priority scheduler.

type SetGroupWeight

type SetGroupWeight struct {
	Weight       float64
	ResourcePool string
	JobID        model.JobID
}

SetGroupWeight sets the weight of a group in the fair share scheduler.

type StartTaskContainer

type StartTaskContainer struct {
	AllocationID model.AllocationID
	aproto.StartContainer

	LogContext logger.Context
}

StartTaskContainer notifies the agent to start the task with the provided task spec.

type TerminateDecision

type TerminateDecision struct {
	InstanceIDs []string
	Reasons     map[string]string
}

TerminateDecision describes a terminating decision.

func (TerminateDecision) String

func (t TerminateDecision) String() string

String returns a representative string.

type ValidateResourcesRequest

type ValidateResourcesRequest struct {
	ResourcePool string
	Slots        int
	IsSingleNode bool
	TaskID       *model.TaskID
}

ValidateResourcesRequest is a message asking resource manager whether the given resource pool can (or, rather, if it's not impossible to) fulfill the request for the given amount of slots.

type ValidateResourcesResponse

type ValidateResourcesResponse struct {
	// Fulfillable values:
	// - false: impossible to fulfill
	// - true: ok or unknown
	Fulfillable bool
}

ValidateResourcesResponse is the response to ValidateResourcesRequest.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL