tasklist

package
v0.0.0-...-3511abf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GroupPriorityChangeRegistry = NewRegistry[model.JobID, func(int) error]()

GroupPriorityChangeRegistry is a registry of callbacks available for when a group's priority changes.

Functions

func AssignmentIsScheduled

func AssignmentIsScheduled(allocatedResources *sproto.ResourcesAllocated) bool

AssignmentIsScheduled determines if a resource allocation assignment is considered equivalent to being scheduled.

func FindAnchor

func FindAnchor(
	jobID model.JobID,
	anchorID model.JobID,
	aheadOf bool,
	taskList *TaskList,
	groups map[model.JobID]*Group,
	queuePositions JobSortState,
	k8s bool,
) (bool, model.JobID, int)

FindAnchor finds a second anchor and its priority and determines if the moving job needs a priority change to move ahead or behind the anchor.

func GetJobSubmissionTime

func GetJobSubmissionTime(taskList *TaskList, jobID model.JobID) (time.Time, error)

GetJobSubmissionTime returns the submission time for the first task found in the list for a job. we might RMs to have easier/faster access to this information than this.

func InitializeQueuePosition

func InitializeQueuePosition(aTime time.Time, isK8s bool) decimal.Decimal

InitializeQueuePosition constructs a new queue position from time and RM type.

func JobStats

func JobStats(taskList *TaskList) *jobv1.QueueStats

JobStats returns quick job-related stats about the TaskList.

func JobStatsByPool

func JobStatsByPool(taskList *TaskList, resourcePool string) *jobv1.QueueStats

JobStatsByPool returns quick job-related stats about the TaskList, by resource pool.

func NeedMove

func NeedMove(
	jobPos decimal.Decimal,
	anchorPos decimal.Decimal,
	secondPos decimal.Decimal,
	aheadOf bool,
) bool

NeedMove returns true if the jobPos indicates a job needs a move to be ahead of or behind the anchorPos.

func ReduceToJobQInfo

func ReduceToJobQInfo(reqs AllocReqs) map[model.JobID]*sproto.RMJobInfo

ReduceToJobQInfo takes a list of AllocateRequest and reduces it to a summary of the Job Queue.

func SortTasksWithPosition

func SortTasksWithPosition(
	taskList *TaskList,
	groups map[model.JobID]*Group,
	jobPositions JobSortState,
	k8s bool,
) []*sproto.AllocateRequest

SortTasksWithPosition returns a sorted view of the sproto.AllocateRequest's that make up the TaskList, sorted in priority order.

Types

type AllocReqs

type AllocReqs = []*sproto.AllocateRequest

AllocReqs is an alias for a list of Allocate Requests.

type Group

type Group struct {
	JobID    model.JobID
	MaxSlots *int
	Weight   float64
	Priority *int
}

Group manages the common state for a set of tasks that all share the same scheduling restrictions (e.g. max slots or fair share weight).

type JobSortState

type JobSortState map[model.JobID]decimal.Decimal

JobSortState models a job queue, and the positions of all jobs within it.

func InitializeJobSortState

func InitializeJobSortState(isK8s bool) JobSortState

InitializeJobSortState constructs a JobSortState based on the RM type.

func (JobSortState) RecoverJobPosition

func (j JobSortState) RecoverJobPosition(jobID model.JobID, position decimal.Decimal)

RecoverJobPosition explicitly sets the position of a job.

func (JobSortState) SetJobPosition

func (j JobSortState) SetJobPosition(
	jobID model.JobID,
	anchor1 model.JobID,
	anchor2 model.JobID,
	aheadOf bool,
	isK8s bool,
) (decimal.Decimal, error)

SetJobPosition sets the job position in the queue, relative to the anchors.

type Registry

type Registry[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Registry is a thread-safe map of key value pairs that supports callbacks on delete.

func NewRegistry

func NewRegistry[K comparable, V any]() *Registry[K, V]

NewRegistry creates a new Registry.

func (*Registry[K, V]) Add

func (r *Registry[K, V]) Add(key K, value V) error

Add adds the given key value pair to the registry. If the key already exists, an error is returned.

func (*Registry[K, V]) Delete

func (r *Registry[K, V]) Delete(key K) error

Delete deletes the given key from the registry. If the key does not exist, an error is returned.

func (*Registry[K, V]) Load

func (r *Registry[K, V]) Load(key K) (V, bool)

Load returns the value stored for the given key and whether the key was found.

func (*Registry[K, V]) OnDelete

func (r *Registry[K, V]) OnDelete(key K, callback func())

OnDelete registers a callback to be called when the given key is deleted. If the key does not exist, the callback is called async.

type TaskIterator

type TaskIterator struct {
	// contains filtered or unexported fields
}

TaskIterator is an iterator over some of AllocateRequests.

func (*TaskIterator) Next

func (i *TaskIterator) Next() bool

Next moves the iterator forward to the next AllocateRequest.

func (*TaskIterator) Value

func (i *TaskIterator) Value() *sproto.AllocateRequest

Value returns the AllocateRequest at the current position of the iterator.

type TaskList

type TaskList struct {
	// contains filtered or unexported fields
}

TaskList maintains all tasks in time order, and stores their allocation actor, active allocations and allocate requests.

func New

func New() *TaskList

New constructs a new TaskList.

func (*TaskList) AddAllocation

func (l *TaskList) AddAllocation(id model.AllocationID, assigned *sproto.ResourcesAllocated)

AddAllocation adds an allocation for the allocation actor and updates the sproto.AllocateRequest's sproto.SchedulingState.

func (*TaskList) AddAllocationRaw

func (l *TaskList) AddAllocationRaw(id model.AllocationID, assigned *sproto.ResourcesAllocated)

AddAllocationRaw adds an allocation for the allocation actor without modifying the sproto.AllocateRequest's sproto.SchedulingState.

func (*TaskList) AddTask

func (l *TaskList) AddTask(req *sproto.AllocateRequest) bool

AddTask adds a task to the TaskList.

func (*TaskList) Allocation

Allocation returns the allocation, or nil if there is none, for the allocation actor.

func (*TaskList) ForResourcePool

func (l *TaskList) ForResourcePool(name string) *TaskList

ForResourcePool returns a new TaskList filtered by resource pool.

func (*TaskList) IsScheduled

func (l *TaskList) IsScheduled(id model.AllocationID) bool

IsScheduled returns true if the allocation has resources.

func (*TaskList) Iterator

func (l *TaskList) Iterator() *TaskIterator

Iterator returns a TaskIterator that traverses the TaskList by request time.

func (*TaskList) Len

func (l *TaskList) Len() int

Len gives number of tasks in the TaskList.

func (*TaskList) RemoveAllocation

func (l *TaskList) RemoveAllocation(id model.AllocationID)

RemoveAllocation deletes any allocations for the allocation actor from the TaskList.

func (*TaskList) RemoveTaskByID

func (l *TaskList) RemoveTaskByID(id model.AllocationID) *sproto.AllocateRequest

RemoveTaskByID deletes the task and its allocation, if any, from the TaskList.

func (*TaskList) TaskByID

func (l *TaskList) TaskByID(id model.AllocationID) (*sproto.AllocateRequest, bool)

TaskByID returns the sproto.AllocateRequest for a task.

func (*TaskList) TaskSummaries

func (l *TaskList) TaskSummaries(
	groups map[model.JobID]*Group,
	schedulerType string,
) map[model.AllocationID]sproto.AllocationSummary

TaskSummaries returns a summary of allocations for tasks in the TaskList.

func (*TaskList) TaskSummary

func (l *TaskList) TaskSummary(
	id model.AllocationID,
	groups map[model.JobID]*Group,
	schedulerType string,
) *sproto.AllocationSummary

TaskSummary returns a summary for an allocation in the TaskList.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL