task

package
v0.0.0-...-ddc1a4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// ExponentialBackOffPolicy is Backoff Policy Name
	ExponentialBackOffPolicy = "exponential-policy"
)

Variables

This section is empty.

Functions

func InitPolicyFactory

func InitPolicyFactory() error

InitPolicyFactory method registers all the policies in the factory If registration fails it returns the error , which should be handeled at the caller end.

func InitScheduler

func InitScheduler(
	parent tally.Scope,
	tree respool.Tree,
	taskSchedulingPeriod time.Duration,
	rmTaskTracker Tracker)

InitScheduler initializes a Task Scheduler

func InitTaskTracker

func InitTaskTracker(
	parent tally.Scope,
	config *Config)

InitTaskTracker initialize the task tracker

Types

type Config

type Config struct {
	// Timeout for rm task in statemachine from launching to ready state
	LaunchingTimeout time.Duration `yaml:"launching_timeout"`
	// Timeout for rm task in statemachine from placing to ready state
	PlacingTimeout time.Duration `yaml:"placing_timeout"`
	// Timeout for rm task in statemachine from preempting to running state
	PreemptingTimeout time.Duration `yaml:"preempting_timeout"`
	// Timeout for rm task in statemachine from reserved to pending state
	ReservingTimeout time.Duration `yaml:"reserving_timeout"`
	// This is the backoff period how much it will backoff
	// in each retry attempt.
	PlacementRetryBackoff time.Duration `yaml:"placement_retry_backoff"`
	// This is number of retry attempts in each placement retry cycle.
	PlacementAttemptsPerCycle float64 `yaml:"placement_attempts_percycle"`
	// This is number of cycles which placement is going to repeat and
	// unplaced tasks after that are qualified for host reservation.
	PlacementRetryCycle float64 `yaml:"placement_retry_cycle"`
	// This is the policy name for the backoff
	// which is going to dictate the backoff
	PolicyName string `yaml:"backoff_policy_name"`
	// This flag will enable/disable the placement backoff policies
	EnablePlacementBackoff bool `yaml:"enable_placement_backoff"`
	// This flag will enable/disable SLA tracking of tasks
	EnableSLATracking bool `yaml:"enable_sla_tracking"`
	// This flag will enable/disable host reservation of tasks
	EnableHostReservation bool `yaml:"enable_host_reservation"`
}

Config is Resource Manager Task specific configuration

type Metrics

type Metrics struct {
	ReadyQueueLen tally.Gauge

	TasksCountInTracker tally.Gauge

	TaskStatesGauge map[task.TaskState]tally.Gauge

	ResourcesHeldByTaskState map[task.TaskState]scalar.GaugeMaps
	LeakedResources          scalar.GaugeMaps

	ReconciliationSuccess tally.Counter
	ReconciliationFail    tally.Counter

	OrphanTasks tally.Gauge
}

Metrics is a placeholder for all metrics in task.

func NewMetrics

func NewMetrics(scope tally.Scope) *Metrics

NewMetrics returns a new instance of task.Metrics.

type Policy

type Policy interface {
	// GetNextBackoffDuration returns the next backoff duration
	// based on policy implementation
	GetNextBackoffDuration(task *resmgr.Task, config *Config) float64
	// IsCycleCompleted returns true when one placement cycle has been completed
	IsCycleCompleted(task *resmgr.Task, config *Config) bool
	// contains filtered or unexported methods
}

Policy is the interface for calculating the next duration for the back off time based on its implementation.

type PolicyFactory

type PolicyFactory struct {
	// contains filtered or unexported fields
}

PolicyFactory is the factory for different backoff policies Any backoff policy can be implemented through this way. Policy object can be obtained via CreateBackOffPolicyeate call

var Factory *PolicyFactory

Factory is global singelton variable for the factory

func GetFactory

func GetFactory() *PolicyFactory

GetFactory returns the factory object. Expectation is InitFactory should have been called before this function call otherwise , Factory will be nil

func (*PolicyFactory) CreateBackOffPolicy

func (pf *PolicyFactory) CreateBackOffPolicy(conf *Config) (Policy, error)

CreateBackOffPolicy checks all the registered policies and return the policy object

func (*PolicyFactory) NewExponentialPolicy

func (pf *PolicyFactory) NewExponentialPolicy(config *Config) (Policy, error)

NewExponentialPolicy is the create object for exponentialPolicy

type PolicyFunc

type PolicyFunc func(config *Config) (Policy, error)

PolicyFunc is the factory for different backoff policies

type RMTask

type RMTask struct {
	// contains filtered or unexported fields
}

RMTask is the wrapper around resmgr.task for state machine

func CreateRMTask

func CreateRMTask(
	scope tally.Scope,
	t *resmgr.Task,
	handler *eventstream.Handler,
	respool respool.ResPool,
	taskConfig *Config) (*RMTask, error)

CreateRMTask creates the RM task from resmgr.task

func (*RMTask) AddBackoff

func (rmTask *RMTask) AddBackoff() error

AddBackoff adds the backoff to the RMtask based on backoff policy

func (*RMTask) GetCurrentState

func (rmTask *RMTask) GetCurrentState() RMTaskState

GetCurrentState returns the current state

func (*RMTask) RequeueUnPlaced

func (rmTask *RMTask) RequeueUnPlaced(reason string) error

RequeueUnPlaced Requeues the task which couldn't be placed.

func (*RMTask) Respool

func (rmTask *RMTask) Respool() respool.ResPool

Respool returns the respool of the RMTask.

func (*RMTask) RunTimeStats

func (rmTask *RMTask) RunTimeStats() *RunTimeStats

RunTimeStats returns the runtime stats of the RMTask

func (*RMTask) Task

func (rmTask *RMTask) Task() *resmgr.Task

Task returns the task of the RMTask.

func (*RMTask) Terminate

func (rmTask *RMTask) Terminate()

Terminate the rm task

func (*RMTask) TransitFromTo

func (rmTask *RMTask) TransitFromTo(
	stateFrom string,
	stateTo string,
	options ...state.Option) error

TransitFromTo transits a task from a source to target state. If the from state doesn't match the current state and error is returned.

func (*RMTask) TransitTo

func (rmTask *RMTask) TransitTo(stateTo string, options ...state.Option) error

TransitTo transitions to the target state

func (*RMTask) UpdateStartTime

func (rmTask *RMTask) UpdateStartTime(startTime time.Time)

UpdateStartTime updates the start time of the RMTask

type RMTaskState

type RMTaskState struct {
	// The state the task is in
	State task.TaskState
	// The reason for being in the state
	Reason string
	// Last time the state was updated
	LastUpdateTime time.Time
}

RMTaskState represents the state of the rm task

type Reconciler

type Reconciler struct {
	// contains filtered or unexported fields
}

Reconciler reconciles the tasks between the tracker and the store

func NewReconciler

func NewReconciler(
	tracker activeTasksTracker,
	taskStore storage.TaskStore,
	parent tally.Scope,
	reconciliationPeriod time.Duration,
) *Reconciler

NewReconciler returns a new reconciler

func (*Reconciler) Start

func (t *Reconciler) Start() error

Start starts the reconciler

func (*Reconciler) Stop

func (t *Reconciler) Stop() error

Stop stops the reconciler

type RunTimeStats

type RunTimeStats struct {
	StartTime time.Time
}

RunTimeStats is the container for run time stats of the resmgr task

type Scheduler

type Scheduler interface {
	// Start starts the task scheduler goroutines
	Start() error
	// Stop stops the task scheduler goroutines
	Stop() error
	// Enqueues gang (task list) into resource pool ready queue
	EnqueueGang(gang *resmgrsvc.Gang) error
	// Dequeues gang (task list) from the resource pool ready queue
	DequeueGang(maxWaitTime time.Duration, taskType resmgr.TaskType) (*resmgrsvc.Gang, error)
	// Adds an invalid task so that it can be removed from the ready queue later.
	AddInvalidTask(task *peloton.TaskID)
}

Scheduler defines the interface of task scheduler which schedules tasks from the pending queues of resource pools to a ready queue using different scheduling policies.

func GetScheduler

func GetScheduler() Scheduler

GetScheduler returns the task scheduler instance

type Tracker

type Tracker interface {

	// AddTask adds the task to state machine
	AddTask(
		t *resmgr.Task,
		handler *eventstream.Handler,
		respool respool.ResPool,
		config *Config) error

	// GetTask gets the RM task for taskID
	GetTask(t *peloton.TaskID) *RMTask

	// SetPlacementHost Sets the placement for the tasks.
	SetPlacement(placement *resmgr.Placement)

	// DeleteTask deletes the task from the map
	DeleteTask(t *peloton.TaskID)

	// MarkItDone marks the task done and add back those
	// resources to respool
	MarkItDone(mesosTaskID string) error

	// MarkItInvalid marks the task done and invalidate them
	// in to respool by that they can be removed from the queue
	MarkItInvalid(mesosTaskID string) error

	// TasksByHosts returns all tasks of the given type running on the given hosts.
	TasksByHosts(hosts []string, taskType resmgr.TaskType) map[string][]*RMTask

	// AddResources adds the task resources to respool
	AddResources(taskID *peloton.TaskID) error

	// GetSize returns the number of the tasks in tracker
	GetSize() int64

	// Clear cleans the tracker with all the tasks
	Clear()

	// GetActiveTasks returns task states map
	GetActiveTasks(jobID string, respoolID string, states []string) map[string][]*RMTask

	// UpdateMetrics updates the task metrics
	UpdateMetrics(from task.TaskState, to task.TaskState, taskResources *scalar.Resources)

	// GetOrphanTask gets the orphan RMTask for the given mesos-task-id
	GetOrphanTask(mesosTaskID string) *RMTask

	// GetOrphanTasks returns orphan tasks
	GetOrphanTasks(respoolID string) []*RMTask
}

Tracker is the interface for resource manager to track all the tasks TODO: Get rid of peloton-task-id from tracker

func GetTracker

func GetTracker() Tracker

GetTracker gets the singleton object of the tracker

type TransObs

type TransObs struct {
	// contains filtered or unexported fields
}

TransObs implements TransitionObserver

func (*TransObs) Observe

func (obs *TransObs) Observe(mesosTaskID string, currentState task.TaskState)

Observe implements TransitionObserver

type TransitionObserver

type TransitionObserver interface {
	Observe(taskID string, transitedTo task.TaskState)
}

TransitionObserver is the interface for observing a state transition

func NewTransitionObserver

func NewTransitionObserver(
	enabled bool,
	scope tally.Scope,
	respoolPath string,
) TransitionObserver

NewTransitionObserver returns the a new observer for the respool and the task tagged with the relevant tags

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL