Documentation ¶
Overview ¶
Package sched implements task scheduling for Reflow.
A unit of work is encapsulated by a Task, and is submitted to the scheduler. Multiple tasks may be submitted simultaneously (and may lead to better packing). Tasks are packed into a set of Reflow allocs; these are dynamically sourced from the provided cluster as needed. The scheduler attempts to pack tasks into as few allocs as possible so that they may be reclaimed when they become idle.
The scheduler is given a Repository from which dependent objects are downloaded and to which results are uploaded. This repository is typically also the Reflow cache. A task fails if the necessary objects cannot be fetched, or if uploading fails.
If an alloc's keepalive fails, its running tasks are marked as lost and rescheduled.
Index ¶
- Constants
- func GetTaskStatsId(task *Task) string
- type AllocStats
- type AllocStatsData
- type Cluster
- type OverallStats
- type Scheduler
- type Stats
- func (s *Stats) AddAlloc(alloc *alloc)
- func (s *Stats) AddTasks(tasks []*Task)
- func (s *Stats) AssignTask(task *Task, alloc *alloc)
- func (s *Stats) GetStats() StatsData
- func (s *Stats) MarkAllocDead(alloc *alloc)
- func (s *Stats) Publish()
- func (s *Stats) PublishPrefix(prefix string)
- func (s *Stats) ReturnTask(task *Task, alloc *alloc)
- type StatsData
- type Task
- type TaskSet
- type TaskState
- type TaskStats
- type TaskStatsData
Constants ¶
const ExpVarScheduler = "scheduler"
ExpVarScheduler is the prefix of the scheduler stats exported name.
Variables ¶
This section is empty.
Functions ¶
func GetTaskStatsId ¶
Types ¶
type AllocStats ¶
type AllocStats struct { sync.Mutex `json:"-"` AllocStatsData }
AllocStats is the per alloc stats used to update stats.
func (*AllocStats) AssignTask ¶
func (a *AllocStats) AssignTask(task *Task)
AssignTask makes an alloc<->task association.
func (*AllocStats) Copy ¶
func (a *AllocStats) Copy() AllocStatsData
Copy returns an immutable snapshot of AllocStats.
func (*AllocStats) RemoveTask ¶
func (a *AllocStats) RemoveTask(task *Task)
RemoveTask removes the alloc<->task association.
type AllocStatsData ¶
type AllocStatsData struct { // Resources is the currently available resources. reflow.Resources // Dead indicates if this alloc is dead. Dead bool // TaskIDs is the list of tasks running in this alloc. TaskIDs map[string]int }
AllocStatsData is the per alloc stats snapshot.
type Cluster ¶
type Cluster interface { // Allocate returns an alloc with at least req.Min resources, or an // error. The requirement's width is used as a hint to size allocs // efficiently. Allocate(ctx context.Context, req reflow.Requirements, labels pool.Labels) (pool.Alloc, error) // CanAllocate returns whether this cluster can allocate the given amount of resources. CanAllocate(reflow.Resources) (bool, error) }
Cluster is the scheduler's cluster interface.
type OverallStats ¶
type OverallStats struct { // TotalAllocs is the total number of allocs in the system (live or dead). TotalAllocs int64 // TotalTasks is the total number of tasks (pending, running or completed). TotalTasks int64 }
OverallStats is the overall scheduler stats.
type Scheduler ¶
type Scheduler struct { // Transferer is used to manage data movement between // allocs and the scheduler's repository. Transferer reflow.Transferer // Mux is used to manage direct data transfers between blob stores (if supported) Mux blob.Mux // Cluster provides dynamic allocation of allocs. Cluster Cluster // Log logs scheduler actions. Log *log.Logger // TaskDB is the task reporting db. TaskDB taskdb.TaskDB // MaxPendingAllocs is the maximum number outstanding // alloc requests. MaxPendingAllocs int // MaxAllocIdleTime is the time after which an idle alloc is // collected. MaxAllocIdleTime time.Duration // DrainTimeout is the duration to wait to see if more tasks have been submitted // so that we can combine the requirements of those tasks together to make larger allocs. DrainTimeout time.Duration // MinAlloc is the smallest resource allocation that is made by // the scheduler. MinAlloc reflow.Resources // Labels is the set of labels applied to newly created allocs. Labels pool.Labels // Stats is the scheduler stats. Stats *Stats // contains filtered or unexported fields }
A Scheduler is responsible for managing a set of tasks and allocs, assigning (and reassigning) tasks to appropriate allocs. Scheduler can manage large numbers of tasks and allocs efficiently.
func New ¶
func New() *Scheduler
New returns a new Scheduler instance. The caller may customize its parameters before starting scheduling by invoking Scheduler.Do.
func (*Scheduler) Do ¶
Do commences scheduling. The scheduler runs until the provided context is canceled, after which the context error is returned.
func (*Scheduler) ExportStats ¶
func (s *Scheduler) ExportStats()
ExportStats exports scheduler stats as expvars.
type Stats ¶
type Stats struct { // Mutex protects all the data members. sync.Mutex `json:"-"` // OverallStats has the overall scheduler stats. OverallStats // Allocs has all the alloc stats, including dead ones. Allocs map[string]*AllocStats // Tasks has all the task state and stats, including completed/error tasks. Tasks map[string]*TaskStats }
Stats has all the scheduler stats, including alloc/task states and stats. It is thread safe and can be used to update stats.
func (*Stats) AddAlloc ¶
func (s *Stats) AddAlloc(alloc *alloc)
AddAlloc adds an alloc to the stats.
func (*Stats) AssignTask ¶
AssignTask assigns a task to an alloc.
func (*Stats) MarkAllocDead ¶
func (s *Stats) MarkAllocDead(alloc *alloc)
MarkAllocDead marks an alloc dead.
func (*Stats) PublishPrefix ¶
Publish publishes the stats as a go expvar with the given prefix.
func (*Stats) ReturnTask ¶
ReturnTask removes a task from the stats before returning it.
type StatsData ¶
type StatsData struct { // OverallStats has the overall scheduler stats. OverallStats // Allocs has all the alloc stats, including dead ones. Allocs map[string]AllocStatsData // Tasks has all the task state and stats, including completed/error tasks. Tasks map[string]TaskStatsData }
StatsData is a immutable snapshot of Stats, usually obtained by calling Stats.GetStats().
type Task ¶
type Task struct { // Config is the task's exec config, which is passed on to the // alloc after scheduling. Config reflow.ExecConfig // Repository to use for this task. // Repository is the repository from which dependent objects are // downloaded and to which result objects are uploaded. Repository reflow.Repository // Log receives any status log messages during task scheduling // and execution. Log *log.Logger // Err stores any task scheduling error. If Err != nil while the // task is TaskDone, then the task failed to run. Err error // Result stores the Reflow result returned by a successful // execution. Result reflow.Result // RunInfo stores log/inspect information from the exec. RunInfo reflow.ExecRunInfo // Exec is the exec which is running (or ran) the task. Exec is // set by the scheduler before the task enters TaskRunning state. Exec reflow.Exec // Priority is the task priority. Lower numbers indicate higher priority. // Higher priority tasks will get scheduler before any lower priority tasks. Priority int // PostUseChecksum indicates whether input filesets are checksummed after use. PostUseChecksum bool // ExpectedDuration is the duration the task is expected to take used only as a hint // by the scheduler for better scheduling. ExpectedDuration time.Duration // RunID that created this task. RunID taskdb.RunID // FlowID is the digest (flow.Digest) of the flow for which this task was created. FlowID digest.Digest // TaskDB is where the task row for this task is recorded and is set by the scheduler only after the task was attempted. TaskDB taskdb.TaskDB // contains filtered or unexported fields }
Task represents a schedulable unit of work. Tasks are submitted to a scheduler which in turn schedules them onto allocs. After submission, all coordination is performed through the task struct.
func NewTask ¶
func NewTask() *Task
NewTask returns a new, initialized task. The Task may be populated and then submitted to the scheduler.
func (*Task) ID ¶
ID is the identifier for this task. ID is only set when the scheduler attempts the task. Attempts to read the ID before it is set will panic.
func (*Task) Reset ¶
func (t *Task) Reset()
Reset resets the task. That is: - it resets the task's state to `TaskInit` - assigns a new id for the task - increases its attempt count.
type TaskSet ¶
TaskSet is a set of tasks.
type TaskState ¶
type TaskState int
TaskState enumerates the possible states of a task.
const ( // TaskInit is the initial state of a Task. No work has yet been done. TaskInit TaskState = iota // TaskStaging indicates that the task is currently staging input // data. TaskStaging // TaskRunning indicates the task is currently executing. TaskRunning // TaskLost indicates that the task has transiently failed and will be // retried by the scheduler. TaskLost // TaskDone indicates the task has completed. TaskDone )
type TaskStats ¶
type TaskStats struct { // Mutex protects TaskStatsData. sync.Mutex `json:"-"` // TaskStatsData are the task stats. TaskStatsData }
TaskStats is the per task info and stats used to update stats.
func (*TaskStats) Copy ¶
func (t *TaskStats) Copy() TaskStatsData
Copy returns a immutable snapshot of TaskStats.
type TaskStatsData ¶
type TaskStatsData struct { // Ident is the exec identifier of this task. Ident string // Type is the type of exec. Type string // State is the current state of the task. State int // Error if not nil, is the task error. Error error // RunID is the run the task belongs to. RunID string // FlowID is the flow corresponding to this task. FlowID string }
TaskStatsData is a snapshot of the task stats.