cached

package
v0.0.0-...-d2b1cdf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2019 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InstanceIDExceedsInstanceCountError = yarpcerrors.InvalidArgumentErrorf(
	"instance-id is beyond the instance count")

InstanceIDExceedsInstanceCountError is the error returned when an operation for an instance with instanceID larger than job instance count is requested

Functions

func GetInstancesToProcessForUpdate

func GetInstancesToProcessForUpdate(
	ctx context.Context,
	jobID *peloton.JobID,
	prevJobConfig *pbjob.JobConfig,
	newJobConfig *pbjob.JobConfig,
	taskStore storage.TaskStore,
	taskConfigV2Ops objects.TaskConfigV2Ops,
) (
	instancesAdded []uint32,
	instancesUpdated []uint32,
	instancesRemoved []uint32,
	instancesUnchanged []uint32,
	err error,
)

GetInstancesToProcessForUpdate determines the instances which have been updated in a given job update. Both the old and the new job configurations are provided as inputs, and it returns the instances which have been added and existing instances which have been updated.

func GetResourceManagerProcessingStates

func GetResourceManagerProcessingStates() []string

GetResourceManagerProcessingStates returns the active task states in Resource Manager

func GetUpdateProgress

func GetUpdateProgress(
	ctx context.Context,
	jobID *peloton.JobID,
	cachedUpdate Update,
	desiredConfigVersion uint64,
	instancesToCheck []uint32,
	taskStore storage.TaskStore,
) (instancesCurrent []uint32, instancesDone []uint32, instancesFailed []uint32, err error)

GetUpdateProgress iterates through instancesToCheck and check if they are running and their current config version is the same as the desired config version. TODO: find the right place to put the func

func HasControllerTask

func HasControllerTask(config jobmgrcommon.JobConfig) bool

HasControllerTask returns if a job has controller task in it, it can accept both cachedConfig and full JobConfig

func IsMesosOwnedState

func IsMesosOwnedState(state pbtask.TaskState) bool

IsMesosOwnedState returns true if the task state indicates that the task is present in mesos.

func IsResMgrOwnedState

func IsResMgrOwnedState(state pbtask.TaskState) bool

IsResMgrOwnedState returns true if the task state indicates that the task is either waiting for admission or being placed or being preempted.

func IsUpdateStateActive

func IsUpdateStateActive(state pbupdate.State) bool

IsUpdateStateActive returns true if the update is in active state

func IsUpdateStateTerminal

func IsUpdateStateTerminal(state pbupdate.State) bool

IsUpdateStateTerminal returns true if the update has reach terminal state

Types

type Job

type Job interface {
	WorkflowOps

	// Identifier of the job.
	ID() *peloton.JobID

	// CreateTaskConfigs creates task configurations in the DB
	CreateTaskConfigs(
		ctx context.Context,
		jobID *peloton.JobID,
		jobConfig *pbjob.JobConfig,
		configAddOn *models.ConfigAddOn,
		spec *stateless.JobSpec,
	) error

	// CreateTaskRuntimes creates the task runtimes in cache and DB.
	// Create and Update need to be different functions as the backing
	// storage calls are different.
	CreateTaskRuntimes(ctx context.Context, runtimes map[uint32]*pbtask.RuntimeInfo, owner string) error

	// PatchTasks patches runtime diff to the existing task cache. runtimeDiffs
	// is a kv map with key as the instance_id of the task to be updated.
	// Value of runtimeDiffs is RuntimeDiff, of which key is the field name
	// to be update, and value is the new value of the field. PatchTasks
	// would save the change in both cache and DB. If persisting to DB fails,
	// cache would be invalidated as well. The `force` flag affects only stateless
	// jobs. By default (with force flag unset), stateless jobs are patched in
	// a SLA aware manner i.e. only the tasks in the runtimeDiff which do not
	// violate the job SLA will be patched. If `force` flag is set, the diff
	// will be patched even if it violates job SLA. PatchTasks returns 2 lists
	// 1. list of instance_ids which were successfully patched and
	// 2. a list of instance_ids that should be retried.
	PatchTasks(
		ctx context.Context,
		runtimeDiffs map[uint32]jobmgrcommon.RuntimeDiff,
		force bool,
	) (instancesSucceeded []uint32, instancesToBeRetried []uint32, err error)

	// ReplaceTasks replaces task runtime and config in cache.
	// If forceReplace is false, it would check Revision version
	// and decide whether to replace the runtime and config.
	// If forceReplace is true, the func would always replace the runtime and config.
	ReplaceTasks(taskInfos map[uint32]*pbtask.TaskInfo, forceReplace bool) error

	// AddTask adds a new task to the job, and if already present,
	// just returns it. In addition if the task is not present, then
	// the runtime is recovered from DB as well. And
	// if the recovery does not succeed, the task is not
	// added to the cache either.
	AddTask(ctx context.Context, id uint32) (Task, error)

	// GetTask from the task id.
	GetTask(id uint32) Task

	// RemoveTask clear task out of cache.
	RemoveTask(id uint32)

	// GetAllTasks returns all tasks for the job
	GetAllTasks() map[uint32]Task

	// Create will be used to create the job configuration and runtime in DB.
	// Create and Update need to be different functions as the backing
	// storage calls are different.
	Create(
		ctx context.Context,
		config *pbjob.JobConfig,
		configAddOn *models.ConfigAddOn,
		spec *stateless.JobSpec,
	) error

	// RollingCreate is used to create the job configuration and runtime in DB.
	// It would create a workflow to manage the job creation, therefore the creation
	// process can be paused/resumed/aborted.
	RollingCreate(
		ctx context.Context,
		config *pbjob.JobConfig,
		configAddOn *models.ConfigAddOn,
		spec *stateless.JobSpec,
		updateConfig *pbupdate.UpdateConfig,
		opaqueData *peloton.OpaqueData,
	) error

	// Update updates job with the new runtime and config. If the request is to update
	// both DB and cache, it first attempts to persist the request in storage,
	// If that fails, it just returns back the error for now.
	// If successful, the cache is updated as well.
	// TODO: no config update should go through this API, divide this API into
	// config and runtime part
	Update(
		ctx context.Context,
		jobInfo *pbjob.JobInfo,
		configAddOn *models.ConfigAddOn,
		spec *stateless.JobSpec,
		req UpdateRequest,
	) error

	// CompareAndSetRuntime replaces the existing job runtime in cache and DB with
	// the job runtime supplied. CompareAndSetRuntime would use
	// RuntimeInfo.Revision.Version for concurrency control, and it would
	// update RuntimeInfo.Revision.Version automatically upon success. Caller
	// should not manually modify the value of RuntimeInfo.Revision.Version.
	// It returns the resultant jobRuntime with version updated.
	CompareAndSetRuntime(ctx context.Context, jobRuntime *pbjob.RuntimeInfo) (*pbjob.RuntimeInfo, error)

	// CompareAndSetConfig compares the version of config supplied and the
	// version of config in cache. If the version matches, it would update
	// the config in cache and DB with the config supplied (Notice: it does
	// NOT mean job would use the new job config, job would still use the
	// config which its runtime.ConfigurationVersion points to).
	// CompareAndSetConfig would update JobConfig.ChangeLog.Version
	// automatically upon success. Caller should not manually modify
	// the value of JobConfig.ChangeLog.Version.
	// It returns the resultant jobConfig with version updated.
	// JobSpec is also passed along so that it can be written as is to the DB
	CompareAndSetConfig(
		ctx context.Context,
		config *pbjob.JobConfig,
		configAddOn *models.ConfigAddOn,
		spec *stateless.JobSpec,
	) (jobmgrcommon.JobConfig, error)

	// CompareAndSetTask replaces the existing task runtime in DB and cache.
	// It uses RuntimeInfo.Revision.Version for concurrency control, and it would
	// update RuntimeInfo.Revision.Version automatically upon success.
	// Caller should not manually modify the value of RuntimeInfo.Revision.Version.
	// The `force` flag affects only stateless jobs. By default (with force flag
	// not set), for stateless job, if the task is becoming unavailable due to
	// host maintenance and update, then runtime is set only if it does not
	// violate the job SLA. If `force` flag is set, the task runtime will
	// be set even if it violates job SLA.
	CompareAndSetTask(
		ctx context.Context,
		id uint32,
		runtime *pbtask.RuntimeInfo,
		force bool,
	) (*pbtask.RuntimeInfo, error)

	// IsPartiallyCreated returns if job has not been fully created yet
	IsPartiallyCreated(config jobmgrcommon.JobConfig) bool

	// ValidateEntityVersion validates the entity version of the job is the
	// same as provided in the input, and if not, then returns an error.
	ValidateEntityVersion(ctx context.Context, version *v1alphapeloton.EntityVersion) error

	// GetRuntime returns the runtime of the job
	GetRuntime(ctx context.Context) (*pbjob.RuntimeInfo, error)

	// GetConfig returns the current config of the job
	GetConfig(ctx context.Context) (jobmgrcommon.JobConfig, error)

	// GetCachedConfig returns the job config if
	// present in the cache. Returns nil otherwise.
	GetCachedConfig() jobmgrcommon.JobConfig

	// GetJobType returns the job type in the job config stored in the cache
	// The type can be nil when we read it. It should be only used for
	// non-critical purpose (e.g calculate delay).
	// Logically this should be part of JobConfig
	// TODO(zhixin): remove GetJobType from the interface after
	// EnqueueJobWithDefaultDelay does not take cached job
	GetJobType() pbjob.JobType

	// SetTaskUpdateTime updates the task update times in the job cache
	SetTaskUpdateTime(t *float64)

	// GetFirstTaskUpdateTime gets the first task update time
	GetFirstTaskUpdateTime() float64

	// GetLastTaskUpdateTime gets the last task update time
	GetLastTaskUpdateTime() float64

	// UpdateResourceUsage adds the task resource usage from a terminal task
	// to the resource usage map for this job
	UpdateResourceUsage(taskResourceUsage map[string]float64)

	// GetResourceUsage gets the resource usage map for this job
	GetResourceUsage() map[string]float64

	// RecalculateResourceUsage recalculates the resource usage of a job
	// by adding together resource usage of all terminal tasks of this job.
	RecalculateResourceUsage(ctx context.Context)

	// CurrentState of the job.
	CurrentState() JobStateVector

	// GoalState of the job.
	GoalState() JobStateVector

	// Delete deletes the job from DB and clears the cache
	Delete(ctx context.Context) error

	// GetTaskStateCount returns the state/goal state count of all
	// tasks in a job, the total number of throttled tasks in
	// stateless jobs and the spread counts of a job
	GetTaskStateCount() (
		map[TaskStateSummary]int,
		int,
		JobSpreadCounts)

	// GetWorkflowStateCount returns the state count of all workflows in the cache
	GetWorkflowStateCount() map[pbupdate.State]int

	// RepopulateInstanceAvailabilityInfo repopulates the SLA information in the job cache
	RepopulateInstanceAvailabilityInfo(ctx context.Context) error

	// GetInstanceAvailabilityType return the instance availability type per instance
	// for the specified instances. If `instanceFilter` is empty then the instance
	// availability type for all instances of the job is returned
	GetInstanceAvailabilityType(
		ctx context.Context,
		instances ...uint32,
	) map[uint32]jobmgrcommon.InstanceAvailability_Type
}

Job in the cache. TODO there a lot of methods in this interface. To determine if this can be broken up into smaller pieces.

type JobConfigCache

type JobConfigCache interface {
	jobmgrcommon.JobConfig
	HasControllerTask() bool
}

JobConfigCache is a union of JobConfig and helper methods only available for cached config

type JobFactory

type JobFactory interface {
	// AddJob will create a Job if not present in cache,
	// else returns the current cached Job.
	AddJob(id *peloton.JobID) Job

	// ClearJob cleans up the job from the cache.
	ClearJob(jobID *peloton.JobID)

	// GetJob will return the current cached Job,
	// and nil if currently not in cache.
	GetJob(id *peloton.JobID) Job

	// GetAllJobs returns the list of all jobs in cache.
	GetAllJobs() map[string]Job

	// Start emitting metrics.
	Start()

	// Stop clears the current jobs and tasks in cache, stops metrics.
	Stop()
}

JobFactory is the entrypoint object into the cache which stores job and tasks. This only runs in the job manager leader.

func InitJobFactory

func InitJobFactory(
	jobStore storage.JobStore,
	taskStore storage.TaskStore,
	updateStore storage.UpdateStore,
	volumeStore storage.PersistentVolumeStore,
	ormStore *ormobjects.Store,
	parentScope tally.Scope,
	listeners []JobTaskListener,
) JobFactory

InitJobFactory initializes the job factory object.

type JobSpreadCounts

type JobSpreadCounts struct {
	// contains filtered or unexported fields
}

JobSpreadCounts contains task and host counts for jobs that use "spread" placement strategy. Counts are set to zero for invalid/inapplicable cases.

type JobStateVector

type JobStateVector struct {
	State        pbjob.JobState
	StateVersion uint64
}

JobStateVector defines the state of a job. This encapsulates both the actual state and the goal state.

type JobTaskListener

type JobTaskListener interface {
	// Name returns a user-friendly name for the listener
	Name() string

	// JobRuntimeChanged is invoked when the runtime for a job is updated
	// in cache and persistent store.
	JobRuntimeChanged(
		jobID *peloton.JobID,
		jobType pbjob.JobType,
		runtime *pbjob.RuntimeInfo,
	)

	// TaskRuntimeChanged is invoked when the runtime for a task is updated
	// in cache and persistent store.
	TaskRuntimeChanged(
		jobID *peloton.JobID,
		instanceID uint32,
		jobType pbjob.JobType,
		runtime *pbtask.RuntimeInfo,
		labels []*peloton.Label,
	)
}

JobTaskListener defines an interface that must to be implemented by a listener interested in job and task changes. The callbacks are invoked after updates to the cache are written through to the persistent store. Note that callbacks may not get invoked in the same order as the changes to objects in cache; the version field of the changed object (e.g. Changelog) is a better indicator of order. To keep things simple, the callbacks are invoked synchronously when the cached object is changed. Thus slow listeners can make operations on the cache slower, which must be avoided. Implementations must not

  • modify the provided objects in any way
  • do processing that can take a long time, such as blocking on locks or making remote calls. Such activities must be done in separate goroutines that are managed by the listener.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics is the struct containing all the counters that track internal state of the cache.

func NewMetrics

func NewMetrics(scope tally.Scope) *Metrics

NewMetrics returns a new Metrics struct, with all metrics initialized and rooted at the given tally.Scope

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option to create a workflow

func WithConfig

func WithConfig(
	jobConfig *pbjob.JobConfig,
	prevJobConfig *pbjob.JobConfig,
	configAddOn *models.ConfigAddOn,
	jobSpec *stateless.JobSpec,
) Option

WithConfig defines the original config and target config for the workflow. Workflow could use the configs to calculate the instances it would need to work on as well as verify if the update is a noop. It also includes the target job spec which would be stored to the DB as part of workflow creation.

func WithInstanceToProcess

func WithInstanceToProcess(
	instancesAdded []uint32,
	instancesUpdated []uint32,
	instancesRemoved []uint32,
) Option

WithInstanceToProcess defines the instances the workflow would work on. When it is provided, workflow would not calculate instances to process based on config.

func WithOpaqueData

func WithOpaqueData(opaqueData *peloton.OpaqueData) Option

WithOpaqueData defines the opaque data provided by the user to be stored with the update

type Task

type Task interface {
	// Identifier of the task.
	ID() uint32

	// Job identifier the task belongs to.
	JobID() *peloton.JobID

	// GetRuntime returns the task run time
	GetRuntime(ctx context.Context) (*pbtask.RuntimeInfo, error)

	// GetCacheRuntime returns the task run time stored in the cache.
	// It returns nil if the is no runtime in the cache.
	GetCacheRuntime() *pbtask.RuntimeInfo

	// GetLabels returns the task labels
	GetLabels(ctx context.Context) ([]*peloton.Label, error)

	// CurrentState of the task.
	CurrentState() TaskStateVector

	// GoalState of the task.
	GoalState() TaskStateVector

	// StateSummary of the task.
	StateSummary() TaskStateSummary

	// TerminationStatus of the task.
	TerminationStatus() *pbtask.TerminationStatus
}

Task in the cache.

type TaskMetrics

type TaskMetrics struct {
	TimeToAssignNonRevocable tally.Timer
	TimeToAssignRevocable    tally.Timer

	TimeToRunNonRevocable tally.Timer
	TimeToRunRevocable    tally.Timer

	MeanSpreadQuotient tally.Gauge
}

TaskMetrics contains counters for task that are managed by cache.

func NewTaskMetrics

func NewTaskMetrics(scope tally.Scope) *TaskMetrics

NewTaskMetrics returns a new TaskMetrics struct, with all metrics initialized and rooted at the given tally.Scope

type TaskStateSummary

type TaskStateSummary struct {
	CurrentState pbtask.TaskState
	GoalState    pbtask.TaskState
	HealthState  pbtask.HealthState
}

type TaskStateVector

type TaskStateVector struct {
	State         pbtask.TaskState
	ConfigVersion uint64
	MesosTaskID   *mesos.TaskID
}

TaskStateVector defines the state of a task. This encapsulates both the actual state and the goal state.

type Update

type Update interface {
	WorkflowStrategy

	// Identifier of the update
	ID() *peloton.UpdateID

	// Job identifier the update belongs to
	JobID() *peloton.JobID

	// Create creates the update in DB and cache
	Create(
		ctx context.Context,
		jobID *peloton.JobID,
		jobConfig jobmgrcommon.JobConfig,
		prevJobConfig *pbjob.JobConfig,
		configAddOn *models.ConfigAddOn,
		instanceAdded []uint32,
		instanceUpdated []uint32,
		instanceRemoved []uint32,
		workflowType models.WorkflowType,
		updateConfig *pbupdate.UpdateConfig,
		opaqueData *peloton.OpaqueData,
	) error

	// Modify modifies the update in DB and cache
	Modify(
		ctx context.Context,
		instancesAdded []uint32,
		instancesUpdated []uint32,
		instancesRemoved []uint32,
	) error

	// Update updates the update in DB and cache
	WriteProgress(ctx context.Context,
		state pbupdate.State,
		instancesDone []uint32,
		instanceFailed []uint32,
		instancesCurrent []uint32) error

	// Pause pauses the current update progress
	Pause(ctx context.Context, opaqueData *peloton.OpaqueData) error

	// Resume resumes a paused update, and update would change
	// to the state before pause
	Resume(ctx context.Context, opaqueData *peloton.OpaqueData) error

	// Recover recovers the update from DB into the cache
	Recover(ctx context.Context) error

	// Cancel is used to cancel the update
	Cancel(ctx context.Context, opaqueData *peloton.OpaqueData) error

	// Rollback is used to rollback the update.
	Rollback(
		ctx context.Context,
		currentConfig *pbjob.JobConfig,
		targetConfig *pbjob.JobConfig,
	) error

	// GetState returns the state of the update
	GetState() *UpdateStateVector

	// GetGoalState returns the goal state of the update
	GetGoalState() *UpdateStateVector

	// GetPrevState returns the previous state of the update
	GetPrevState() pbupdate.State

	// GetInstancesAdded returns the instance to be added with this update
	GetInstancesAdded() []uint32

	// GetInstancesUpdated returns the existing instances to be updated
	// with this update
	GetInstancesUpdated() []uint32

	// GetInstancesRemoved returns the existing instances to be removed
	// with this update
	GetInstancesRemoved() []uint32

	// GetInstancesCurrent returns the current set of instances being updated
	GetInstancesCurrent() []uint32

	// GetInstanceFailed returns the current set of instances marked as failed
	GetInstancesFailed() []uint32

	// GetInstancesDone returns the current set of instances updated
	GetInstancesDone() []uint32

	GetUpdateConfig() *pbupdate.UpdateConfig

	GetWorkflowType() models.WorkflowType

	// IsTaskInUpdateProgress returns true if a given task is
	// in progress for the given update, else returns false
	IsTaskInUpdateProgress(instanceID uint32) bool

	// IsTaskInFailed returns true if a given task is in the
	// instancesFailed list for the given update, else returns false
	IsTaskInFailed(instanceID uint32) bool

	// GetLastUpdateTime return the last update time of update object
	GetLastUpdateTime() time.Time
}

Update of a job being stored in the cache.

type UpdateRequest

type UpdateRequest int

UpdateRequest is used to indicate whether the caller wants to update only cache or update both database and cache. This is used during job manager recovery as only cache needs to be updated during recovery.

const (

	// UpdateCacheOnly updates only the cache. It should be used only during
	// recovery. Also, it requires passing the complete runtime information.
	UpdateCacheOnly UpdateRequest = iota + 1
	// UpdateCacheAndDB updates both DB and cache. The caller can pass the
	// complete runtime info or just a diff.
	UpdateCacheAndDB
)

type UpdateStateVector

type UpdateStateVector struct {
	// current update state
	State pbupdate.State
	// for state, it will be the old job config version
	// for goal state, it will be the desired job config version
	JobVersion uint64
	// For state, it will store the instances which have already been updated,
	// and for goal state, it will store all the instances which
	// need to be updated.
	Instances []uint32
}

UpdateStateVector is used to the represent the state and goal state of an update to the goal state engine.

type WorkflowOps

type WorkflowOps interface {
	// CreateWorkflow creates a workflow associated with
	// the calling object
	CreateWorkflow(
		ctx context.Context,
		workflowType models.WorkflowType,
		updateConfig *pbupdate.UpdateConfig,
		entityVersion *v1alphapeloton.EntityVersion,
		option ...Option,
	) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error)

	// PauseWorkflow pauses the current workflow, if any
	PauseWorkflow(
		ctx context.Context,
		entityVersion *v1alphapeloton.EntityVersion,
		option ...Option,
	) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error)

	// ResumeWorkflow resumes the current workflow, if any
	ResumeWorkflow(
		ctx context.Context,
		entityVersion *v1alphapeloton.EntityVersion,
		option ...Option,
	) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error)

	// AbortWorkflow aborts the current workflow, if any
	AbortWorkflow(
		ctx context.Context,
		entityVersion *v1alphapeloton.EntityVersion,
		option ...Option,
	) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error)

	// RollbackWorkflow rollbacks the current workflow, if any
	RollbackWorkflow(ctx context.Context) error

	// AddWorkflow add a workflow to the calling object
	AddWorkflow(updateID *peloton.UpdateID) Update

	// GetWorkflow gets the workflow to the calling object
	// it should only be used in place like handler, where
	// a read operation should not mutate cache
	GetWorkflow(updateID *peloton.UpdateID) Update

	// ClearWorkflow removes a workflow from the calling object
	ClearWorkflow(updateID *peloton.UpdateID)

	// GetAllWorkflows returns all workflows for the job
	GetAllWorkflows() map[string]Update
}

WorkflowOps defines operations on workflow

type WorkflowStrategy

type WorkflowStrategy interface {
	// IsInstanceComplete returns if an instance has reached the state
	// desired by the workflow
	IsInstanceComplete(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool
	// IsInstanceInProgress returns if an instance in the process of getting
	// to the state desired by the workflow
	IsInstanceInProgress(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool
	// IsInstanceFailed returns if an instance is failed when getting
	// to the state desired by the workflow
	// TODO: now a task can both get true for IsInstanceInProgress and
	// IsInstanceFailed, it should get true for only one of the func.
	// Now the correctness of code is guarded by order of func call.
	IsInstanceFailed(runtime *pbtask.RuntimeInfo, maxAttempts uint32) bool
	// GetRuntimeDiff accepts the current task runtime of an instance and the desired
	// job config, it returns the RuntimeDiff to move the instance to the state desired
	// by the workflow. Return nil if no action is needed.
	GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff
}

WorkflowStrategy is the strategy of driving instances to the desired state of the workflow

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL