Documentation
¶
Overview ¶
Package scheduler provides Scheduler, which is an implementation of the quotascheduler algorithm. The algorithm priorities and matches requests to workers, tracks account balances, and ensures consistency between the scheduler's estimate of Request and Worker states and the client-supplied authoritative state.
scheduler.Scheduler is an implementation of the reconciler.Scheduler interface.
See the provided example in this packages godoc or doc_test.go for usage.
Example ¶
package main import ( "context" "fmt" "time" "go.chromium.org/luci/common/data/stringset" "go.chromium.org/infra/qscheduler/qslib/scheduler" ) func HandleAssignments([]*scheduler.Assignment) {} func IsOn(requestID scheduler.RequestID, workerID scheduler.WorkerID, s *scheduler.Scheduler) { fmt.Printf("%s is on %s? %v\n", requestID, workerID, s.IsAssigned(requestID, workerID)) } func main() { ctx := context.Background() // Create a scheduler. s := scheduler.New(time.Now()) // Create a quota account with no initial balance. accountConfig := scheduler.NewAccountConfig(0, nil, 1, []float32{1, 2, 3}, false, "") accountID := scheduler.AccountID("Account1") s.AddAccount(ctx, accountID, accountConfig, nil) // Update time, causing quota accounts to accumulate quota. s.UpdateTime(ctx, time.Now()) // Create a task request, and add it to the scheduler queue. requestID := scheduler.RequestID("Request1") request := scheduler.NewTaskRequest(requestID, accountID, stringset.NewFromSlice("Label1"), nil, time.Now()) s.AddRequest(ctx, request, time.Now(), []string{"tag1", "tag2"}, scheduler.NullEventSink) // Inform the scheduler of the existence of an idle worker. workerID := scheduler.WorkerID("Worker1") s.MarkIdle(ctx, workerID, stringset.NewFromSlice("Label2"), time.Now(), scheduler.NullEventSink) // False. IsOn(requestID, workerID, s) // Run a round of the scheduling algorithm, after updating time and accounts // again. t := time.Now() s.UpdateTime(ctx, t) // This will return a match between Request1 and Worker1. assignments := s.RunOnce(ctx, scheduler.NullEventSink) // True. IsOn(requestID, workerID, s) // Your code for handling these assignments goes here... HandleAssignments(assignments) // Update time, causing quota accounts to be charged for their running tasks // In this case, Account1 will be charged for Request1 running on Worker1. s.UpdateTime(ctx, time.Now()) // Notify the scheduler that the task has started running on that worker. // This is an acknowledgement of the above assignment. // Note: the account is already being charged for this task prior to the // notification. The notification ensures consistency of request and worker // state, but does not affect account state. s.NotifyTaskRunning(ctx, "Request1", "Worker1", time.Now(), scheduler.NullEventSink) // True. IsOn(requestID, workerID, s) // Update time, causing quota accounts to again accumulate quota or be charged // quota for their running tasks. s.UpdateTime(ctx, time.Now()) // Notifications that contradict the scheduler's state estimate will cause // inconsistent records to be deleted from the state. // Notify the scheduler that a different task is now running on Worker1, // causing records about that worker and previous request to be deleted. // Note that this deletion will not affect the current balance of Account1; // quota that was spent already on Request1 will not be refunded. s.NotifyTaskRunning(ctx, "Request2", "Worker1", time.Now(), scheduler.NullEventSink) // False. IsOn(requestID, workerID, s) }
Output: Request1 is on Worker1? false Request1 is on Worker1? true Request1 is on Worker1? true Request1 is on Worker1? false
Index ¶
- Constants
- type AccountConfig
- type AccountID
- type Assignment
- type AssignmentType
- type Balance
- type Config
- type EventSink
- type Priority
- type RequestID
- type Scheduler
- func (s *Scheduler) AddAccount(ctx context.Context, id AccountID, config *AccountConfig, ...)
- func (s *Scheduler) AddRequest(ctx context.Context, request *TaskRequest, t time.Time, tags []string, ...)
- func (s *Scheduler) Config() *Config
- func (s *Scheduler) DeleteAccount(aid AccountID)
- func (s *Scheduler) GetBalances() map[AccountID]Balance
- func (s *Scheduler) GetRequest(rid RequestID) (req *TaskRequest, ok bool)
- func (s *Scheduler) GetWaitingRequests() map[RequestID]*TaskRequest
- func (s *Scheduler) GetWorkers() map[WorkerID]*Worker
- func (s *Scheduler) IsAssigned(requestID RequestID, workerID WorkerID) bool
- func (s *Scheduler) MarkIdle(ctx context.Context, workerID WorkerID, labels stringset.Set, t time.Time, ...)
- func (s *Scheduler) NotifyTaskAbsent(ctx context.Context, requestID RequestID, t time.Time, e EventSink)
- func (s *Scheduler) NotifyTaskRunning(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, ...)
- func (s *Scheduler) ResetBalance(aid AccountID)
- func (s *Scheduler) RunOnce(ctx context.Context, e EventSink) []*Assignment
- func (s *Scheduler) RunningRequests() int
- func (s *Scheduler) ToProto() *protos.Scheduler
- func (s *Scheduler) ToSnapshot(poolID string) *Snapshot
- func (s *Scheduler) Unassign(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, ...) error
- func (s *Scheduler) UpdateTime(ctx context.Context, t time.Time)
- type Snapshot
- type TaskRequest
- type Worker
- type WorkerID
Examples ¶
Constants ¶
const ( // FreeBucket is the free priority bucket, where jobs may run even if they have // no quota account or have an empty quota account. FreeBucket Priority = NumPriorities // PromoteThreshold is the account balance at which the scheduler will consider // promoting jobs. PromoteThreshold = 600.0 // DemoteThreshold is the account balance at which the scheduler will consider // demoting jobs. DemoteThreshold = -600.0 )
const NumPriorities = 5
NumPriorities is the number of distinct priority buckets. For performance and code complexity reasons, this is a compile-time constant.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountConfig ¶
type AccountConfig struct { // ChargeRate is the rates (per second) at which per-priority accounts grow. // // Conceptually this is the time-averaged number of workers that this account // may use, at each priority level. ChargeRate Balance // MaxChargeSeconds is the maximum amount of time over which this account can // accumulate quota before hitting its cap. // // Conceptually this sets the time window over which the time averaged // utilization by this account is measured. Very bursty clients will need to // use a wider window, whereas very consistent clients will use a narrow one. MaxChargeSeconds float32 // MaxFanout is the maximum number of concurrent paid jobs for each combination // of all task labels that this account will pay for (0 = no limit). // // Additional jobs beyond this may run if there is idle capacity, but they // will run in the FreeBucket priority level (except if DisableFreeTasks // is true, in which case they will not run). MaxFanout int32 // PerLabelTaskLimits allows for extra limits to be enforced for any given // label, e.g. setting label-model:2 ensures that a maximum of two // concurrent jobs per model can run under this account. PerLabelTaskLimits map[string]int32 // If DisableFreeTasks is true, then jobs for this account will not start // running if they would be run at FreeBucket priority. DisableFreeTasks bool // Human readable description of account's intended purpose. Description string }
AccountConfig represents a single account's config. It is the native struct version of the AccountConfig proto.
func NewAccountConfig ¶
func NewAccountConfig(fanout int, labelLimits map[string]int32, chargeSeconds float32, chargeRate []float32, disableFreeTasks bool, desc string) *AccountConfig
NewAccountConfig creates a new Config instance.
func NewAccountConfigFromProto ¶
func NewAccountConfigFromProto(c *protos.AccountConfig) *AccountConfig
NewAccountConfigFromProto creates a new Config instance.
type Assignment ¶
type Assignment struct { // Type describes which kind of assignment this represents. Type AssignmentType // WorkerID of the worker to assign a new task to (and to preempt the previous // task of, if this is a AssignmentPreemptWorker mutator). WorkerID WorkerID // RequestID of the task to assign to that worker. RequestID RequestID // TaskToAbort is relevant only for the AssignmentPreemptWorker type. // It is the request ID of the task that should be preempted. TaskToAbort RequestID // Priority at which the task will run. Priority Priority // Time is the time at which this Assignment was determined. Time time.Time }
An Assignment represents a scheduler decision to assign a task to a worker.
type AssignmentType ¶
type AssignmentType int
AssignmentType is an enum of scheduler assignment types.
const ( // AssignmentIdleWorker indicates assigning a task to a currently idle worker. AssignmentIdleWorker AssignmentType = iota // AssignmentPreemptWorker indicates preempting a running task on a worker with a new task. AssignmentPreemptWorker )
type Balance ¶
type Balance [NumPriorities]float32
Balance is a vector that represents a cost or account balance.
type Config ¶
type Config struct { // AccountConfigs is a map of per-account AccountConfig. AccountConfigs map[AccountID]*AccountConfig // DisablePreemption, if true, causes scheduler to never preempt // any tasks. DisablePreemption bool // BotExpiration is the duration after which a bot will no longer be // considered idle, if the scheduler doesn't receive any assignment requests // for it. // // If 0 or unspecified, defaults to 300 seconds. BotExpiration time.Duration }
Config represents configuration fields that affect the behavior the quota scheduler pool.
func NewConfigFromProto ¶
func NewConfigFromProto(p *protos.SchedulerConfig) *Config
NewConfigFromProto creates an returns a new Config instance from a proto representation.
func (*Config) ToProto ¶
func (c *Config) ToProto() *protos.SchedulerConfig
ToProto convers a config to proto representation.
type EventSink ¶
type EventSink interface { // AddEvent emits a task event to this sink. AddEvent(*metrics.TaskEvent) // WithFields returns a child sink, that will emit events with the given // field overrides. WithFields(isCallback bool) EventSink }
EventSink defines the interface for a class that records scheduler events, for metrics or analytics purposes.
var NullEventSink EventSink = &nullEventSink{}
NullEventSink is a trivial MetricsSink that discards metrics.
type Priority ¶
type Priority int
Priority is a qscheduler priority level.
func BestPriorityFor ¶
BestPriorityFor determines the highest available priority for a quota account, given its balance.
If the account is out of quota, or if the supplied balance is a nil pointer, then this returns FreeBucket.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler encapsulates the state and configuration of a running quotascheduler for a single pool, and its methods provide an implementation of the quotascheduler algorithm.
func NewFromProto ¶
NewFromProto returns a new Scheduler from proto representation.
func NewWithConfig ¶
NewWithConfig returns a newly initialized Scheduler.
func (*Scheduler) AddAccount ¶
func (s *Scheduler) AddAccount(ctx context.Context, id AccountID, config *AccountConfig, initialBalance []float32)
AddAccount creates a new account with the given id, config, and initialBalance (or zero balance if nil).
If an account with that id already exists, then it is overwritten.
func (*Scheduler) AddRequest ¶
func (s *Scheduler) AddRequest(ctx context.Context, request *TaskRequest, t time.Time, tags []string, e EventSink)
AddRequest enqueues a new task request.
func (*Scheduler) DeleteAccount ¶
DeleteAccount deletes the given account.
func (*Scheduler) GetBalances ¶
GetBalances returns the account balances.
func (*Scheduler) GetRequest ¶
func (s *Scheduler) GetRequest(rid RequestID) (req *TaskRequest, ok bool)
GetRequest returns the (waiting or running) request for a given ID.
func (*Scheduler) GetWaitingRequests ¶
func (s *Scheduler) GetWaitingRequests() map[RequestID]*TaskRequest
GetWaitingRequests returns the waiting requests.
func (*Scheduler) GetWorkers ¶
GetWorkers returns the known workers.
func (*Scheduler) IsAssigned ¶
IsAssigned returns whether the given request is currently assigned to the given worker. It is provided for a consistency checks.
func (*Scheduler) MarkIdle ¶
func (s *Scheduler) MarkIdle(ctx context.Context, workerID WorkerID, labels stringset.Set, t time.Time, e EventSink)
MarkIdle marks the given worker as idle, and with the given provisionable, labels, as of the given time. If this call is contradicted by newer knowledge of state, then it does nothing.
Note: calls to MarkIdle come from bot reap calls from swarming.
func (*Scheduler) NotifyTaskAbsent ¶
func (s *Scheduler) NotifyTaskAbsent(ctx context.Context, requestID RequestID, t time.Time, e EventSink)
NotifyTaskAbsent informs the scheduler authoritatively that the given request is stopped (not running on a worker, and not in the queue) at the given time.
Supplied requestID must not be "".
func (*Scheduler) NotifyTaskRunning ¶
func (s *Scheduler) NotifyTaskRunning(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, e EventSink)
NotifyTaskRunning informs the scheduler authoritatively that the given task was running on the given worker at the given time.
Supplied requestID and workerID must not be "".
func (*Scheduler) ResetBalance ¶
ResetBalance resets the given account's balance to 0, if it exists.
func (*Scheduler) RunOnce ¶
func (s *Scheduler) RunOnce(ctx context.Context, e EventSink) []*Assignment
RunOnce performs a single round of the quota scheduler algorithm on a given state and config, and returns a slice of state mutations.
func (*Scheduler) RunningRequests ¶
RunningRequests gets the number of running task requests.
func (*Scheduler) ToProto ¶
ToProto returns a proto representation of the state and configuration of Scheduler.
func (*Scheduler) ToSnapshot ¶
ToSnapshot returns the snapshot of the pool's scheduler state.
func (*Scheduler) Unassign ¶
func (s *Scheduler) Unassign(ctx context.Context, requestID RequestID, workerID WorkerID, t time.Time, e EventSink) error
Unassign moves a request that was previously assigned to a worker back to the queue.
This is intended for internal use by reconciler, to heal scheduler state in cases where a worker was assigned a task but never successfully reaped it.
func (*Scheduler) UpdateTime ¶
UpdateTime updates the current time for a quotascheduler, and updates quota account balances accordingly, based on running jobs, account policies, and the time elapsed since the last update.
If the provided time is earlier than that last update, this does nothing.
type Snapshot ¶
type Snapshot struct { Accounts []*metrics.Account Tasks []*metrics.Task Workers []*metrics.Worker }
Snapshot represents the scheduler state at a specified timestamp.
type TaskRequest ¶
type TaskRequest struct { // ID is the ID of this request. ID RequestID // AccountID is the id of the account that this request charges to. AccountID AccountID // EnqueueTime is the time at which the request was enqueued. EnqueueTime time.Time // ProvisionableLabels is the set of Provisionable Labels for this task. ProvisionableLabels []string // BaseLabels is the set of base labels for this task. BaseLabels []string // contains filtered or unexported fields }
TaskRequest represents a queued or running task TaskRequest.
func NewTaskRequest ¶
func NewTaskRequest(id RequestID, accountID AccountID, provisionableLabels stringset.Set, baseLabels stringset.Set, enqueueTime time.Time) *TaskRequest
NewTaskRequest creates a new TaskRequest.
func (*TaskRequest) ConfirmedTime ¶
func (t *TaskRequest) ConfirmedTime() time.Time
ConfirmedTime returns the latest time at which the task request's state was confirmed by source of truth (swarming).
type Worker ¶
type Worker struct { // ID is the ID of this worker. ID WorkerID // Labels represents the set of Labels that this worker possesses. Labels stringset.Set // contains filtered or unexported fields }
Worker represents a running or idle Worker capable of running tasks.
func (*Worker) ConfirmedTime ¶
ConfirmedTime returns the latest time at which the worker's state was confirmed by source of truth (swarming).
func (*Worker) RunningPriority ¶
RunningPriority returns the priority of the currently running request.
func (*Worker) RunningRequest ¶
func (w *Worker) RunningRequest() *TaskRequest
RunningRequest returns the currently running request for this worker.