Documentation
¶
Overview ¶
Package reconciler provides a wrapper around a global state scheduler to be used by a per-worker pulling dispatcher.
The primary scheduler.Scheduler implementation intended to be used by reconciler is the quotascheduler algorithm as implemented in qslib/scheduler. The primary dispatcher client is intended to be swarming.
The reconciler tracks the queue of actions for workers that have pending actions (both those in the most recent pull call from client, and those not). For each worker, reconciler holds actions in the queue until they are acknowledged, and orchestrates task preemption.
Example ¶
package main import ( "context" "fmt" "time" "go.chromium.org/luci/common/data/stringset" "go.chromium.org/infra/qscheduler/qslib/reconciler" "go.chromium.org/infra/qscheduler/qslib/scheduler" ) func main() { ctx := context.Background() // Create a scheduler and reconciler. s := scheduler.New(time.Now()) r := reconciler.New() // Notify the reconciler of a newly enqueued task request. requestID := scheduler.RequestID("Request1") accountID := scheduler.AccountID("Account1") labels := stringset.NewFromSlice("label1") t := time.Now() waitRequest := &reconciler.TaskWaitingRequest{ AccountID: accountID, RequestID: requestID, ProvisionableLabels: labels, EnqueueTime: t, Time: t, } r.NotifyTaskWaiting(ctx, s, scheduler.NullEventSink, waitRequest) // Notify the reconciler of a new idle worker, and fetch an assignment // for it. This will fetch Request1 to run on it. workerID := scheduler.WorkerID("Worker1") idleWorker := &reconciler.IdleWorker{ID: workerID, Labels: labels} a := r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker) fmt.Printf("%s was assigned %s.\n", a[0].WorkerID, a[0].RequestID) // A subsequent call for this worker will return the same task, // because the previous assignment has not yet been acknowledged. a = r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker) fmt.Printf("%s was again assigned %s.\n", a[0].WorkerID, a[0].RequestID) // Acknowledge the that request is running on the worker. runningRequest := &reconciler.TaskRunningRequest{ RequestID: requestID, WorkerID: workerID, Time: time.Now(), } r.NotifyTaskRunning(ctx, s, scheduler.NullEventSink, runningRequest) // Now, a subsequent AssignTasks call for this worker will return // nothing, as there are no other tasks in the queue. a = r.AssignTasks(ctx, s, time.Now(), scheduler.NullEventSink, idleWorker) fmt.Printf("After ACK, there were %d new assignments.\n", len(a)) }
Output: Worker1 was assigned Request1. Worker1 was again assigned Request1. After ACK, there were 0 new assignments.
Index ¶
- Constants
- type Assignment
- type Cancellation
- type IdleWorker
- type State
- func (state *State) AddTaskError(requestID scheduler.RequestID, err error)
- func (state *State) AssignTasks(ctx context.Context, s *scheduler.Scheduler, t time.Time, ...) []Assignment
- func (state *State) Cancellations(ctx context.Context) []Cancellation
- func (state *State) NotifyTaskAbsent(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, ...)
- func (state *State) NotifyTaskRunning(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, ...)
- func (state *State) NotifyTaskWaiting(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, ...)
- func (state *State) ToProto() *protos.Reconciler
- type TaskAbsentRequest
- type TaskRunningRequest
- type TaskWaitingRequest
Examples ¶
Constants ¶
const WorkerQueueTimeout = time.Duration(10) * time.Minute
WorkerQueueTimeout is the time after which a task will return to the queue if it was assigned to a worker but the worker never picked it up.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type Assignment struct { // WorkerID is the ID the worker that is being assigned a task. WorkerID scheduler.WorkerID // RequestID is the ID of the task request that is being assigned. RequestID scheduler.RequestID // ProvisionRequired indicates whether the worker needs to be provisioned (in other // words, it is true if the worker does not possess the request's provisionable // labels.) ProvisionRequired bool }
Assignment represents a scheduler-initated operation to assign a task to a worker.
type Cancellation ¶
type Cancellation struct { // WorkerID is the id the worker where we should cancel a task. WorkerID string // RequestID is the id of the task that we should request. RequestID string // ErrorMessage is a description of the error that caused the task to be // cancelled, if it was cancelled due to error. ErrorMessage string }
Cancellation represents a scheduler-initated operation to cancel a task on a worker. The worker should be aborted if and only if it is currently running the given task.
TODO: Consider unifying this with Assignment, since it is in fact the same content.
type IdleWorker ¶
type IdleWorker struct { // ID is the ID of the idle worker. ID scheduler.WorkerID // Labels is the set of labels of the idle worker. Labels stringset.Set }
IdleWorker represents a worker that is idle and wants to have a task assigned.
type State ¶
type State struct {
// contains filtered or unexported fields
}
State is the state of a reconciler.
func NewFromProto ¶
func NewFromProto(proto *protos.Reconciler) *State
NewFromProto returns a new State instance from a proto representation.
func (*State) AddTaskError ¶
AddTaskError marks a given task as having failed due to an error, and in need of cancellation.
func (*State) AssignTasks ¶
func (state *State) AssignTasks(ctx context.Context, s *scheduler.Scheduler, t time.Time, events scheduler.EventSink, workers ...*IdleWorker) []Assignment
AssignTasks accepts one or more idle workers, and returns tasks to be assigned to those workers (if there are tasks available).
func (*State) Cancellations ¶
func (state *State) Cancellations(ctx context.Context) []Cancellation
Cancellations returns the set of workers and tasks that should be cancelled.
func (*State) NotifyTaskAbsent ¶
func (state *State) NotifyTaskAbsent(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskAbsentRequest)
NotifyTaskAbsent informs the quotascheduler about an absent task.
func (*State) NotifyTaskRunning ¶
func (state *State) NotifyTaskRunning(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskRunningRequest)
NotifyTaskRunning informs the quotascheduler about a running task.
func (*State) NotifyTaskWaiting ¶
func (state *State) NotifyTaskWaiting(ctx context.Context, s *scheduler.Scheduler, events scheduler.EventSink, update *TaskWaitingRequest)
NotifyTaskWaiting informs the quotascheduler about a waiting task.
func (*State) ToProto ¶
func (state *State) ToProto() *protos.Reconciler
ToProto converts a reconciler state to proto representation.
type TaskAbsentRequest ¶
type TaskAbsentRequest struct { // RequestID of the request that is running. RequestID scheduler.RequestID // Time at which the task was running. Time time.Time // WorkerID of the worker that is running the task. WorkerID scheduler.WorkerID }
TaskAbsentRequest encapsulates the arguments to NotifyTaskAbsent.
type TaskRunningRequest ¶
type TaskRunningRequest struct { // RequestID of the request that is running. RequestID scheduler.RequestID // Time at which the task was running. Time time.Time // WorkerID of the worker that is running the task. WorkerID scheduler.WorkerID }
TaskRunningRequest encapsulates the arguments to NotifyTaskRunning.
type TaskWaitingRequest ¶
type TaskWaitingRequest struct { // AccountID for the request. AccountID scheduler.AccountID // BaseLabels of the request that is waiting. BaseLabels stringset.Set // Time at which the task was first enqueued. EnqueueTime time.Time // ProvisionableLabels of the request that is waiting. ProvisionableLabels stringset.Set // RequestID of the request that is waiting. RequestID scheduler.RequestID // Tags is the set of tags for the request. Tags []string // Time at which the task was waiting. Time time.Time }
TaskWaitingRequest encapsulates the arguments to NotifyTaskWaiting.