Documentation ¶
Index ¶
- Constants
- func InitPolicyFactory() error
- func InitScheduler(parent tally.Scope, tree respool.Tree, taskSchedulingPeriod time.Duration, ...)
- func InitTaskTracker(parent tally.Scope, config *Config)
- type Config
- type Metrics
- type Policy
- type PolicyFactory
- type PolicyFunc
- type RMTask
- func (rmTask *RMTask) AddBackoff() error
- func (rmTask *RMTask) GetCurrentState() RMTaskState
- func (rmTask *RMTask) RequeueUnPlaced(reason string) error
- func (rmTask *RMTask) Respool() respool.ResPool
- func (rmTask *RMTask) RunTimeStats() *RunTimeStats
- func (rmTask *RMTask) Task() *resmgr.Task
- func (rmTask *RMTask) Terminate()
- func (rmTask *RMTask) TransitFromTo(stateFrom string, stateTo string, options ...state.Option) error
- func (rmTask *RMTask) TransitTo(stateTo string, options ...state.Option) error
- func (rmTask *RMTask) UpdateStartTime(startTime time.Time)
- type RMTaskState
- type Reconciler
- type RunTimeStats
- type Scheduler
- type Tracker
- type TransObs
- type TransitionObserver
Constants ¶
const (
// ExponentialBackOffPolicy is Backoff Policy Name
ExponentialBackOffPolicy = "exponential-policy"
)
Variables ¶
This section is empty.
Functions ¶
func InitPolicyFactory ¶
func InitPolicyFactory() error
InitPolicyFactory method registers all the policies in the factory If registration fails it returns the error , which should be handeled at the caller end.
func InitScheduler ¶
func InitScheduler( parent tally.Scope, tree respool.Tree, taskSchedulingPeriod time.Duration, rmTaskTracker Tracker)
InitScheduler initializes a Task Scheduler
func InitTaskTracker ¶
InitTaskTracker initialize the task tracker
Types ¶
type Config ¶
type Config struct { // Timeout for rm task in statemachine from launching to ready state LaunchingTimeout time.Duration `yaml:"launching_timeout"` // Timeout for rm task in statemachine from placing to ready state PlacingTimeout time.Duration `yaml:"placing_timeout"` // Timeout for rm task in statemachine from preempting to running state PreemptingTimeout time.Duration `yaml:"preempting_timeout"` // Timeout for rm task in statemachine from reserved to pending state ReservingTimeout time.Duration `yaml:"reserving_timeout"` // This is the backoff period how much it will backoff // in each retry attempt. PlacementRetryBackoff time.Duration `yaml:"placement_retry_backoff"` // This is number of retry attempts in each placement retry cycle. PlacementAttemptsPerCycle float64 `yaml:"placement_attempts_percycle"` // This is number of cycles which placement is going to repeat and // unplaced tasks after that are qualified for host reservation. PlacementRetryCycle float64 `yaml:"placement_retry_cycle"` // This is the policy name for the backoff // which is going to dictate the backoff PolicyName string `yaml:"backoff_policy_name"` // This flag will enable/disable the placement backoff policies EnablePlacementBackoff bool `yaml:"enable_placement_backoff"` // This flag will enable/disable SLA tracking of tasks EnableSLATracking bool `yaml:"enable_sla_tracking"` // This flag will enable/disable host reservation of tasks EnableHostReservation bool `yaml:"enable_host_reservation"` }
Config is Resource Manager Task specific configuration
type Metrics ¶
type Metrics struct { ReadyQueueLen tally.Gauge TasksCountInTracker tally.Gauge TaskStatesGauge map[task.TaskState]tally.Gauge ResourcesHeldByTaskState map[task.TaskState]scalar.GaugeMaps LeakedResources scalar.GaugeMaps ReconciliationSuccess tally.Counter ReconciliationFail tally.Counter OrphanTasks tally.Gauge }
Metrics is a placeholder for all metrics in task.
func NewMetrics ¶
NewMetrics returns a new instance of task.Metrics.
type Policy ¶
type Policy interface { // GetNextBackoffDuration returns the next backoff duration // based on policy implementation GetNextBackoffDuration(task *resmgr.Task, config *Config) float64 // IsCycleCompleted returns true when one placement cycle has been completed IsCycleCompleted(task *resmgr.Task, config *Config) bool // contains filtered or unexported methods }
Policy is the interface for calculating the next duration for the back off time based on its implementation.
type PolicyFactory ¶
type PolicyFactory struct {
// contains filtered or unexported fields
}
PolicyFactory is the factory for different backoff policies Any backoff policy can be implemented through this way. Policy object can be obtained via CreateBackOffPolicyeate call
var Factory *PolicyFactory
Factory is global singelton variable for the factory
func GetFactory ¶
func GetFactory() *PolicyFactory
GetFactory returns the factory object. Expectation is InitFactory should have been called before this function call otherwise , Factory will be nil
func (*PolicyFactory) CreateBackOffPolicy ¶
func (pf *PolicyFactory) CreateBackOffPolicy(conf *Config) (Policy, error)
CreateBackOffPolicy checks all the registered policies and return the policy object
func (*PolicyFactory) NewExponentialPolicy ¶
func (pf *PolicyFactory) NewExponentialPolicy(config *Config) (Policy, error)
NewExponentialPolicy is the create object for exponentialPolicy
type PolicyFunc ¶
PolicyFunc is the factory for different backoff policies
type RMTask ¶
type RMTask struct {
// contains filtered or unexported fields
}
RMTask is the wrapper around resmgr.task for state machine
func CreateRMTask ¶
func CreateRMTask( scope tally.Scope, t *resmgr.Task, handler *eventstream.Handler, respool respool.ResPool, taskConfig *Config) (*RMTask, error)
CreateRMTask creates the RM task from resmgr.task
func (*RMTask) AddBackoff ¶
AddBackoff adds the backoff to the RMtask based on backoff policy
func (*RMTask) GetCurrentState ¶
func (rmTask *RMTask) GetCurrentState() RMTaskState
GetCurrentState returns the current state
func (*RMTask) RequeueUnPlaced ¶
RequeueUnPlaced Requeues the task which couldn't be placed.
func (*RMTask) RunTimeStats ¶
func (rmTask *RMTask) RunTimeStats() *RunTimeStats
RunTimeStats returns the runtime stats of the RMTask
func (*RMTask) TransitFromTo ¶
func (rmTask *RMTask) TransitFromTo( stateFrom string, stateTo string, options ...state.Option) error
TransitFromTo transits a task from a source to target state. If the from state doesn't match the current state and error is returned.
func (*RMTask) UpdateStartTime ¶
UpdateStartTime updates the start time of the RMTask
type RMTaskState ¶
type RMTaskState struct { // The state the task is in State task.TaskState // The reason for being in the state Reason string // Last time the state was updated LastUpdateTime time.Time }
RMTaskState represents the state of the rm task
type Reconciler ¶
type Reconciler struct {
// contains filtered or unexported fields
}
Reconciler reconciles the tasks between the tracker and the store
func NewReconciler ¶
func NewReconciler( tracker activeTasksTracker, taskStore storage.TaskStore, parent tally.Scope, reconciliationPeriod time.Duration, ) *Reconciler
NewReconciler returns a new reconciler
type RunTimeStats ¶
RunTimeStats is the container for run time stats of the resmgr task
type Scheduler ¶
type Scheduler interface { // Start starts the task scheduler goroutines Start() error // Stop stops the task scheduler goroutines Stop() error // Enqueues gang (task list) into resource pool ready queue EnqueueGang(gang *resmgrsvc.Gang) error // Dequeues gang (task list) from the resource pool ready queue DequeueGang(maxWaitTime time.Duration, taskType resmgr.TaskType) (*resmgrsvc.Gang, error) // Adds an invalid task so that it can be removed from the ready queue later. AddInvalidTask(task *peloton.TaskID) }
Scheduler defines the interface of task scheduler which schedules tasks from the pending queues of resource pools to a ready queue using different scheduling policies.
type Tracker ¶
type Tracker interface { // AddTask adds the task to state machine AddTask( t *resmgr.Task, handler *eventstream.Handler, respool respool.ResPool, config *Config) error // GetTask gets the RM task for taskID GetTask(t *peloton.TaskID) *RMTask // SetPlacementHost Sets the placement for the tasks. SetPlacement(placement *resmgr.Placement) // DeleteTask deletes the task from the map DeleteTask(t *peloton.TaskID) // MarkItDone marks the task done and add back those // resources to respool MarkItDone(mesosTaskID string) error // MarkItInvalid marks the task done and invalidate them // in to respool by that they can be removed from the queue MarkItInvalid(mesosTaskID string) error // TasksByHosts returns all tasks of the given type running on the given hosts. TasksByHosts(hosts []string, taskType resmgr.TaskType) map[string][]*RMTask // AddResources adds the task resources to respool AddResources(taskID *peloton.TaskID) error // GetSize returns the number of the tasks in tracker GetSize() int64 // Clear cleans the tracker with all the tasks Clear() // GetActiveTasks returns task states map GetActiveTasks(jobID string, respoolID string, states []string) map[string][]*RMTask // UpdateMetrics updates the task metrics UpdateMetrics(from task.TaskState, to task.TaskState, taskResources *scalar.Resources) // GetOrphanTask gets the orphan RMTask for the given mesos-task-id GetOrphanTask(mesosTaskID string) *RMTask // GetOrphanTasks returns orphan tasks GetOrphanTasks(respoolID string) []*RMTask }
Tracker is the interface for resource manager to track all the tasks TODO: Get rid of peloton-task-id from tracker
type TransObs ¶
type TransObs struct {
// contains filtered or unexported fields
}
TransObs implements TransitionObserver
type TransitionObserver ¶
TransitionObserver is the interface for observing a state transition
func NewTransitionObserver ¶
func NewTransitionObserver( enabled bool, scope tally.Scope, respoolPath string, ) TransitionObserver
NewTransitionObserver returns the a new observer for the respool and the task tagged with the relevant tags