goalstate

package
v0.0.0-...-c0686e8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeleteJobFromActiveJobs

func DeleteJobFromActiveJobs(
	ctx context.Context, entity goalstate.Entity) error

DeleteJobFromActiveJobs deletes a terminal batch job from active jobs table

func EnqueueJobUpdate

func EnqueueJobUpdate(
	ctx context.Context,
	entity goalstate.Entity,
) error

EnqueueJobUpdate enqueues an ongoing update, if any, for a stateless job. It is noop for batch jobs.

func EnqueueJobWithDefaultDelay

func EnqueueJobWithDefaultDelay(
	jobID *peloton.JobID,
	goalStateDriver Driver,
	cachedJob cached.Job)

EnqueueJobWithDefaultDelay is a helper function to enqueue a job into the goal state engine with the default interval at which the job runtime updater is run. Using this function ensures that same job does not get enqueued too many times when multiple task updates for the job are received in a short duration of time. TODO(zhixin): pass in job type instead of cachedJob, remove GetJobType from the interface

func JobCreateTasks

func JobCreateTasks(ctx context.Context, entity goalstate.Entity) error

JobCreateTasks creates/recovers all tasks in the job

func JobDelete

func JobDelete(
	ctx context.Context,
	entity goalstate.Entity,
) error

JobDelete deletes a job from cache and DB

func JobEvaluateMaxRunningInstancesSLA

func JobEvaluateMaxRunningInstancesSLA(ctx context.Context, entity goalstate.Entity) error

JobEvaluateMaxRunningInstancesSLA evaluates the maximum running instances job SLA and determines instances to start if any.

func JobKill

func JobKill(ctx context.Context, entity goalstate.Entity) error

JobKill will stop all tasks in the job.

func JobKillAndDelete

func JobKillAndDelete(
	ctx context.Context,
	entity goalstate.Entity,
) error

JobKillAndDelete terminates each task in job, and makes sure tasks would not get restarted. It deletes the job if all tasks are in terminated states.

func JobKillAndUntrack

func JobKillAndUntrack(ctx context.Context, entity goalstate.Entity) error

JobKillAndUntrack kills all of the pods in the job, makes sure they would not start again and untrack the job if possible

func JobRecover

func JobRecover(ctx context.Context, entity goalstate.Entity) error

JobRecover tries to recover a partially created job. If job is not recoverable, it would untrack the job

func JobReloadRuntime

func JobReloadRuntime(
	ctx context.Context,
	entity goalstate.Entity,
) error

JobReloadRuntime reloads the job runtime into the cache

func JobRuntimeUpdater

func JobRuntimeUpdater(ctx context.Context, entity goalstate.Entity) error

JobRuntimeUpdater updates the job runtime. When the jobmgr leader fails over, the goal state driver runs syncFromDB which enqueues all recovered jobs into goal state, which will then run the job runtime updater and update the out-of-date runtime info.

func JobStart

func JobStart(
	ctx context.Context,
	entity goalstate.Entity,
) error

JobStart starts all tasks of the job

func JobStateInvalid

func JobStateInvalid(ctx context.Context, entity goalstate.Entity) error

JobStateInvalid dumps a sentry error to indicate that the job goal state, state combination is not valid

func JobUntrack

func JobUntrack(ctx context.Context, entity goalstate.Entity) error

JobUntrack deletes the job and tasks from the goal state engine and the cache.

func NewJobEntity

func NewJobEntity(id *peloton.JobID, driver *driver) goalstate.Entity

NewJobEntity implements the goal state Entity interface for jobs.

func NewTaskEntity

func NewTaskEntity(jobID *peloton.JobID, instanceID uint32, driver *driver) goalstate.Entity

NewTaskEntity implements the goal state Entity interface for tasks.

func NewUpdateEntity

func NewUpdateEntity(
	id *peloton.UpdateID,
	jobID *peloton.JobID,
	driver *driver) goalstate.Entity

NewUpdateEntity implements the goal state Entity interface for job updates.

func TaskDelete

func TaskDelete(ctx context.Context, entity goalstate.Entity) error

TaskDelete delete the task from cache and removes its runtime from the DB. It is used to reduce the instance count of a job.

func TaskExecutorShutdown

func TaskExecutorShutdown(ctx context.Context, entity goalstate.Entity) error

TaskExecutorShutdown is called when killing task timeout, it would shutdown the executor directly

func TaskFailRetry

func TaskFailRetry(ctx context.Context, entity goalstate.Entity) error

TaskFailRetry retries on task failure

func TaskInitialize

func TaskInitialize(ctx context.Context, entity goalstate.Entity) error

TaskInitialize does the following: 1. Sets the current state to TaskState_INITIALIZED 2. Sets the goal state depending on the JobType 3. Regenerates a new mesos task ID

func TaskLaunchRetry

func TaskLaunchRetry(ctx context.Context, entity goalstate.Entity) error

TaskLaunchRetry retries the launch after launch timeout as well as sends a message to resource manager to let it know that task has been launched.

func TaskReloadRuntime

func TaskReloadRuntime(ctx context.Context, entity goalstate.Entity) error

TaskReloadRuntime reloads task runtime into cache.

func TaskStart

func TaskStart(ctx context.Context, entity goalstate.Entity) error

TaskStart sends the task to resource manager for placement and changes the state to PENDING.

func TaskStateInvalid

func TaskStateInvalid(_ context.Context, entity goalstate.Entity) error

TaskStateInvalid dumps a sentry error to indicate that the task goal state, state combination is not valid

func TaskStop

func TaskStop(ctx context.Context, entity goalstate.Entity) error

TaskStop kills the task.

func TaskTerminatedRetry

func TaskTerminatedRetry(ctx context.Context, entity goalstate.Entity) error

TaskTerminatedRetry retries on task that is terminated

func UpdateAbortIfNeeded

func UpdateAbortIfNeeded(ctx context.Context, entity goalstate.Entity) error

UpdateAbortIfNeeded checks if the update identifier in the goal state engine is the same as the one in the job runtime updater (tracking the current job update). If not, then it aborts the update in the goal state engine and enqueue the current update.

func UpdateComplete

func UpdateComplete(ctx context.Context, entity goalstate.Entity) error

UpdateComplete indicates that all instances have been updated, and the update state should be marked complete.

func UpdateReload

func UpdateReload(ctx context.Context, entity goalstate.Entity) error

UpdateReload reloads the update from the DB.

func UpdateRun

func UpdateRun(ctx context.Context, entity goalstate.Entity) error

UpdateRun is responsible to check which instances have been updated, start the next set of instances to update and update the state of the job update in cache and DB.

func UpdateStart

func UpdateStart(ctx context.Context, entity goalstate.Entity) error

UpdateStart initializes the update. It will move the configuration version of the tasks which are not touched by this update to the new version. Then it will move the update state to ROLLING_FORWARD and enqueue to goal state engine to start the rolling update process.

func UpdateUntrack

func UpdateUntrack(ctx context.Context, entity goalstate.Entity) error

UpdateUntrack deletes the update from the cache and the goal state engine.

func UpdateWriteProgress

func UpdateWriteProgress(ctx context.Context, entity goalstate.Entity) error

UpdateWriteProgress write the current progress of update

Types

type Config

type Config struct {
	// MaxRetryDelay is the absolute maximum duration between any retry, capping
	// any backoff to this abount.
	MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
	// FailureRetryDelay is the delay for retry, if an operation failed. Backoff
	// will be applied for up to MaxRetryDelay.
	FailureRetryDelay time.Duration `yaml:"failure_retry_delay"`

	// LaunchTimeout is the timeout value for the LAUNCHED state.
	// If no update is received from Mesos within this timeout value,
	// the task will be re-queued to the resource manager for placement
	// with a new mesos task id.
	LaunchTimeout time.Duration `yaml:"launch_timeout"`
	// StartTimeout is the timeout value for the STARTING state.
	// If no update is received from Mesos within this timeout value,
	// the task will be re-queued to the resource manager for placement
	// with a new mesos task id.
	StartTimeout time.Duration `yaml:"start_timeout"`

	// JobRuntimeUpdateInterval is the interval at which batch jobs runtime updater is run.
	JobBatchRuntimeUpdateInterval time.Duration `yaml:"job_batch_runtime_update_interval"`
	// JobServiceRuntimeUpdateInterval is the interval at which service jobs runtime updater is run.
	JobServiceRuntimeUpdateInterval time.Duration `yaml:"job_service_runtime_update_interval"`

	// NumWorkerJobThreads is the number of worker threads in the pool
	// serving the job goal state engine. This number indicates the maximum
	// number of jobs which can be parallely processed by the goal state engine.
	NumWorkerJobThreads int `yaml:"job_worker_thread_count"`
	// NumWorkerTaskThreads is the number of worker threads in the pool
	// serving the task goal state engine. This number indicates the maximum
	// number of tasks which can be parallely processed by the goal state engine.
	NumWorkerTaskThreads int `yaml:"task_worker_thread_count"`
	// NumWorkerJobThreads is the number of worker threads in the pool
	// serving the job update goal state engine. This number indicates
	// the maximum number of job updates which can be parallely processed
	// by the goal state engine.
	NumWorkerUpdateThreads int `yaml:"update_worker_thread_count"`

	// InitialTaskBackoff defines the initial back-off delay to recreate
	// failed tasks. Back off is calculated as
	// min(InitialTaskBackOff * 2 ^ (failureCount - 1), MaxBackoff).
	// Default to 30s.
	InitialTaskBackoff time.Duration `yaml:"initial_task_backoff"`

	// InitialTaskBackoff defines the max back-off delay to recreate
	// failed tasks. Back off is calculated as
	// min(InitialTaskBackOff * 2 ^ (failureCount - 1), MaxBackoff).
	// Default to 1h.
	MaxTaskBackoff time.Duration `yaml:"max_task_backoff"`

	// RateLimiterConfig defines rate limiter config
	RateLimiterConfig RateLimiterConfig `yaml:"rate_limit"`
}

Config for the goalstate engine.

type Driver

type Driver interface {
	// EnqueueJob is used to enqueue a job into the goal state. It takes the job identifier
	// and the time at which the job should be evaluated by the goal state engine as inputs.
	EnqueueJob(jobID *peloton.JobID, deadline time.Time)
	// EnqueueTask is used to enqueue a task into the goal state. It takes the job identifier,
	// the instance identifier and the time at which the task should be evaluated by the
	// goal state engine as inputs.
	EnqueueTask(jobID *peloton.JobID, instanceID uint32, deadline time.Time)
	// EnqueueUpdate is used to enqueue a job update into the goal state. As
	// its input, it takes the job identifier, update identifier, and the
	// time at which the job update should be evaluated by the
	// goal state engine.
	EnqueueUpdate(
		jobID *peloton.JobID,
		updateID *peloton.UpdateID,
		deadline time.Time,
	)
	// DeleteJob deletes the job state from the goal state engine.
	DeleteJob(jobID *peloton.JobID)
	// DeleteTask deletes the task state from the goal state engine.
	DeleteTask(jobID *peloton.JobID, instanceID uint32)
	// DeleteUpdate deletes the job update state from the goal state engine.
	DeleteUpdate(jobID *peloton.JobID, updateID *peloton.UpdateID)
	// IsScheduledTask is a helper function to check if a given task is scheduled
	// for evaluation in the goal state engine.
	IsScheduledTask(jobID *peloton.JobID, instanceID uint32) bool
	// JobRuntimeDuration returns the mimimum inter-run duration between job
	// runtime updates. This duration is different for batch and service jobs.
	JobRuntimeDuration(jobType job.JobType) time.Duration
	// Start is used to start processing items in the goal state engine.
	Start()
	// Stop is used to stop the goal state engine.
	// If cleanUpCache is set to true, then all of the cache would be removed,
	// and it would be recovered when start again.
	// If cleanUpCache is not set, cache would be kept in the memory, and start
	// would skip cache recovery.
	Stop(cleanUpCache bool)
	// Started returns true if goal state engine has finished start process
	Started() bool
	// GetLockable returns an interface which controls lock/unlock operations in goal state engine
	GetLockable() lifecyclemgr.Lockable
}

Driver is the interface to enqueue jobs and tasks into the goal state engine for evaluation and then run the corresponding actions. The caller is also responsible for deleting from the goal state engine once the job/task is untracked from the cache.

func NewDriver

func NewDriver(
	d *yarpc.Dispatcher,
	jobStore storage.JobStore,
	taskStore storage.TaskStore,
	volumeStore storage.PersistentVolumeStore,
	updateStore storage.UpdateStore,
	ormStore *ormobjects.Store,
	jobFactory cached.JobFactory,
	jobType job.JobType,
	parentScope tally.Scope,
	cfg Config,
	hmVersion api.Version,
) Driver

NewDriver returns a new goal state driver object.

type JobAction

type JobAction string

JobAction is a string for job actions.

const (
	// NoJobAction implies do not take any action
	NoJobAction JobAction = "noop"
	// CreateTasksAction creates/recovers tasks in a job
	CreateTasksAction JobAction = "create_tasks"
	// KillAction kills all tasks in the job
	KillAction JobAction = "job_kill"
	// UntrackAction deletes the job and all its tasks
	UntrackAction JobAction = "untrack"
	// KillAndUntrackAction kills all tasks and untrack the job when possible
	KillAndUntrackAction JobAction = "kill_untrack"
	// JobStateInvalidAction is executed for an unexpected/invalid job goal state,
	// state combination and it prints a sentry error
	JobStateInvalidAction JobAction = "state_invalid"
	// RuntimeUpdateAction updates the job runtime
	RuntimeUpdateAction JobAction = "runtime_update"
	// EvaluateSLAAction evaluates job SLA
	EvaluateSLAAction JobAction = "evaluate_sla"
	// RecoverAction attempts to recover a partially created job
	RecoverAction JobAction = "recover"
	// DeleteFromActiveJobsAction deletes a jobID from active jobs list if
	// the job is a terminal BATCH job
	DeleteFromActiveJobsAction JobAction = "delete_from_active_jobs"
	// StartTasksAction starts all tasks of a job
	StartTasksAction JobAction = "job_start"
	// DeleteJobAction deletes a job from cache and DB
	DeleteJobAction JobAction = "delete"
	// ReloadRuntimeAction reloads the job runtime into the cache
	ReloadRuntimeAction JobAction = "reload"
	// KillAndDeleteJobAction kills a job and deletes it if possible
	KillAndDeleteJobAction JobAction = "kill_and_delete"
)

type JobMetrics

type JobMetrics struct {
	JobCreate           tally.Counter
	JobCreateFailed     tally.Counter
	JobRecoveryDuration tally.Gauge

	JobSucceeded    tally.Counter
	JobKilled       tally.Counter
	JobFailed       tally.Counter
	JobDeleted      tally.Counter
	JobInvalidState tally.Counter

	JobRuntimeUpdated               tally.Counter
	JobRuntimeUpdateFailed          tally.Counter
	JobMaxRunningInstancesExceeding tally.Counter

	JobRecalculateFromCache tally.Counter
}

JobMetrics contains all counters to track job metrics in goal state engine.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics is the struct containing all the counters that track job and task metrics in goal state.

func NewMetrics

func NewMetrics(scope tally.Scope) *Metrics

NewMetrics returns a new Metrics struct, with all metrics initialized and rooted at the given tally.Scope

type RateLimiterConfig

type RateLimiterConfig struct {
	// ExecutorShutdown rate limit config for executor shutdown call to hostmgr
	ExecutorShutdown TokenBucketConfig `yaml:"executor_shutdown"`
	// TaskKill rate limit config for task stop call to hostmgr
	TaskKill TokenBucketConfig `yaml:"task_kill"`
}

type TaskAction

type TaskAction string

TaskAction is a string for task actions.

const (
	// NoTaskAction implies do not take any action
	NoTaskAction TaskAction = "noop"
	// StartAction starts a task by sending it to resource manager
	StartAction TaskAction = "start_task"
	// StopAction kills the task
	StopAction TaskAction = "stop_task"
	// ExecutorShutdownAction shuts down executor directly after StopAction timeout
	ExecutorShutdownAction TaskAction = "executor_shutdown"
	// InitializeAction re-initializes the task and regenerates the mesos task id
	InitializeAction TaskAction = "initialize_task"
	// ReloadTaskRuntime reload the task runtime into cache
	ReloadTaskRuntime TaskAction = "reload_runtime"
	// LaunchRetryAction is run after task launch to either send to resource manager
	// that task has been successfully launched or re-initialize the task after lauch timeout
	LaunchRetryAction TaskAction = "launch_retry"
	// FailRetryAction retries a failed task
	FailRetryAction TaskAction = "fail_retry"
	// TerminatedRetryAction helps restart terminated tasks with throttling as well as
	// fail the task update if the task does not come up for max instance retries.
	TerminatedRetryAction TaskAction = "terminated_retry"
	// DeleteAction deletes the task from cache and its runtime from the DB
	DeleteAction TaskAction = "delete_task"
	// TaskStateInvalidAction is executed when a task enters
	// invalid current state and goal state combination, and it logs a sentry error
	TaskStateInvalidAction TaskAction = "state_invalid"
)

type TaskMetrics

type TaskMetrics struct {
	TaskCreate             tally.Counter
	TaskCreateFail         tally.Counter
	TaskRecovered          tally.Counter
	ExecutorShutdown       tally.Counter
	TaskLaunchTimeout      tally.Counter
	TaskInvalidState       tally.Counter
	TaskStartTimeout       tally.Counter
	RetryFailedLaunchTotal tally.Counter
	RetryFailedTasksTotal  tally.Counter
	RetryLostTasksTotal    tally.Counter
}

TaskMetrics contains all counters to track task metrics in goal state.

type TokenBucketConfig

type TokenBucketConfig struct {
	// Rate for the token bucket rate limit algorithm,
	// If Rate <=0, there would be no rate limit
	Rate rate.Limit
	// Burst for the token bucket rate limit algorithm,
	// If Burst <=0, there would be no rate limit
	Burst int
}

TokenBucketConfig is the config for rate limiting

type UpdateAction

type UpdateAction string

UpdateAction is a string for job update actions.

const (
	// NoUpdateAction implies do not take any action
	NoUpdateAction UpdateAction = "noop"
	// ReloadUpdateAction will reload the update from DB
	ReloadUpdateAction UpdateAction = "reload"
	// StartUpdateAction will start the update
	StartUpdateAction UpdateAction = "start"
	// RunUpdateAction will continue running the rolling update
	RunUpdateAction UpdateAction = "run"
	// CompleteUpdateAction will complete the update
	CompleteUpdateAction UpdateAction = "complete"
	// ClearUpdateAction clears the update
	ClearUpdateAction UpdateAction = "update_clear"
	// CheckForAbortAction checks if the update needs to be aborted
	CheckForAbortAction UpdateAction = "check_for_abort"
	// WriteProgressUpdateAction writes the latest update progress
	WriteProgressUpdateAction UpdateAction = "write_progress"
)

type UpdateMetrics

type UpdateMetrics struct {
	UpdateReload            tally.Counter
	UpdateComplete          tally.Counter
	UpdateCompleteFail      tally.Counter
	UpdateUntrack           tally.Counter
	UpdateStart             tally.Counter
	UpdateStartFail         tally.Counter
	UpdateRun               tally.Counter
	UpdateRunFail           tally.Counter
	UpdateWriteProgress     tally.Counter
	UpdateWriteProgressFail tally.Counter
}

UpdateMetrics contains all counters to track update metrics in the goal state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL