Documentation ¶
Index ¶
- Variables
- func GetInstancesToProcessForUpdate(ctx context.Context, jobID *peloton.JobID, prevJobConfig *pbjob.JobConfig, ...) (instancesAdded []uint32, instancesUpdated []uint32, instancesRemoved []uint32, ...)
- func GetResourceManagerProcessingStates() []string
- func GetUpdateProgress(ctx context.Context, jobID *peloton.JobID, cachedUpdate Update, ...) (instancesCurrent []uint32, instancesDone []uint32, instancesFailed []uint32, ...)
- func HasControllerTask(config jobmgrcommon.JobConfig) bool
- func IsMesosOwnedState(state pbtask.TaskState) bool
- func IsResMgrOwnedState(state pbtask.TaskState) bool
- func IsUpdateStateActive(state pbupdate.State) bool
- func IsUpdateStateTerminal(state pbupdate.State) bool
- type Job
- type JobConfigCache
- type JobFactory
- type JobSpreadCounts
- type JobStateVector
- type JobTaskListener
- type Metrics
- type Option
- type Task
- type TaskMetrics
- type TaskStateSummary
- type TaskStateVector
- type Update
- type UpdateRequest
- type UpdateStateVector
- type WorkflowOps
- type WorkflowStrategy
Constants ¶
This section is empty.
Variables ¶
var InstanceIDExceedsInstanceCountError = yarpcerrors.InvalidArgumentErrorf(
"instance-id is beyond the instance count")
InstanceIDExceedsInstanceCountError is the error returned when an operation for an instance with instanceID larger than job instance count is requested
Functions ¶
func GetInstancesToProcessForUpdate ¶
func GetInstancesToProcessForUpdate( ctx context.Context, jobID *peloton.JobID, prevJobConfig *pbjob.JobConfig, newJobConfig *pbjob.JobConfig, taskStore storage.TaskStore, taskConfigV2Ops objects.TaskConfigV2Ops, ) ( instancesAdded []uint32, instancesUpdated []uint32, instancesRemoved []uint32, instancesUnchanged []uint32, err error, )
GetInstancesToProcessForUpdate determines the instances which have been updated in a given job update. Both the old and the new job configurations are provided as inputs, and it returns the instances which have been added and existing instances which have been updated.
func GetResourceManagerProcessingStates ¶
func GetResourceManagerProcessingStates() []string
GetResourceManagerProcessingStates returns the active task states in Resource Manager
func GetUpdateProgress ¶
func GetUpdateProgress( ctx context.Context, jobID *peloton.JobID, cachedUpdate Update, desiredConfigVersion uint64, instancesToCheck []uint32, taskStore storage.TaskStore, ) (instancesCurrent []uint32, instancesDone []uint32, instancesFailed []uint32, err error)
GetUpdateProgress iterates through instancesToCheck and check if they are running and their current config version is the same as the desired config version. TODO: find the right place to put the func
func HasControllerTask ¶
func HasControllerTask(config jobmgrcommon.JobConfig) bool
HasControllerTask returns if a job has controller task in it, it can accept both cachedConfig and full JobConfig
func IsMesosOwnedState ¶
IsMesosOwnedState returns true if the task state indicates that the task is present in mesos.
func IsResMgrOwnedState ¶
IsResMgrOwnedState returns true if the task state indicates that the task is either waiting for admission or being placed or being preempted.
func IsUpdateStateActive ¶
IsUpdateStateActive returns true if the update is in active state
func IsUpdateStateTerminal ¶
IsUpdateStateTerminal returns true if the update has reach terminal state
Types ¶
type Job ¶
type Job interface { WorkflowOps // Identifier of the job. ID() *peloton.JobID // CreateTaskConfigs creates task configurations in the DB CreateTaskConfigs( ctx context.Context, jobID *peloton.JobID, jobConfig *pbjob.JobConfig, configAddOn *models.ConfigAddOn, spec *stateless.JobSpec, ) error // CreateTaskRuntimes creates the task runtimes in cache and DB. // Create and Update need to be different functions as the backing // storage calls are different. CreateTaskRuntimes(ctx context.Context, runtimes map[uint32]*pbtask.RuntimeInfo, owner string) error // PatchTasks patches runtime diff to the existing task cache. runtimeDiffs // is a kv map with key as the instance_id of the task to be updated. // Value of runtimeDiffs is RuntimeDiff, of which key is the field name // to be update, and value is the new value of the field. PatchTasks // would save the change in both cache and DB. If persisting to DB fails, // cache would be invalidated as well. The `force` flag affects only stateless // jobs. By default (with force flag unset), stateless jobs are patched in // a SLA aware manner i.e. only the tasks in the runtimeDiff which do not // violate the job SLA will be patched. If `force` flag is set, the diff // will be patched even if it violates job SLA. PatchTasks returns 2 lists // 1. list of instance_ids which were successfully patched and // 2. a list of instance_ids that should be retried. PatchTasks( ctx context.Context, runtimeDiffs map[uint32]jobmgrcommon.RuntimeDiff, force bool, ) (instancesSucceeded []uint32, instancesToBeRetried []uint32, err error) // ReplaceTasks replaces task runtime and config in cache. // If forceReplace is false, it would check Revision version // and decide whether to replace the runtime and config. // If forceReplace is true, the func would always replace the runtime and config. ReplaceTasks(taskInfos map[uint32]*pbtask.TaskInfo, forceReplace bool) error // AddTask adds a new task to the job, and if already present, // just returns it. In addition if the task is not present, then // the runtime is recovered from DB as well. And // if the recovery does not succeed, the task is not // added to the cache either. AddTask(ctx context.Context, id uint32) (Task, error) // GetTask from the task id. GetTask(id uint32) Task // RemoveTask clear task out of cache. RemoveTask(id uint32) // GetAllTasks returns all tasks for the job GetAllTasks() map[uint32]Task // Create will be used to create the job configuration and runtime in DB. // Create and Update need to be different functions as the backing // storage calls are different. Create( ctx context.Context, config *pbjob.JobConfig, configAddOn *models.ConfigAddOn, spec *stateless.JobSpec, ) error // RollingCreate is used to create the job configuration and runtime in DB. // It would create a workflow to manage the job creation, therefore the creation // process can be paused/resumed/aborted. RollingCreate( ctx context.Context, config *pbjob.JobConfig, configAddOn *models.ConfigAddOn, spec *stateless.JobSpec, updateConfig *pbupdate.UpdateConfig, opaqueData *peloton.OpaqueData, ) error // Update updates job with the new runtime and config. If the request is to update // both DB and cache, it first attempts to persist the request in storage, // If that fails, it just returns back the error for now. // If successful, the cache is updated as well. // TODO: no config update should go through this API, divide this API into // config and runtime part Update( ctx context.Context, jobInfo *pbjob.JobInfo, configAddOn *models.ConfigAddOn, spec *stateless.JobSpec, req UpdateRequest, ) error // CompareAndSetRuntime replaces the existing job runtime in cache and DB with // the job runtime supplied. CompareAndSetRuntime would use // RuntimeInfo.Revision.Version for concurrency control, and it would // update RuntimeInfo.Revision.Version automatically upon success. Caller // should not manually modify the value of RuntimeInfo.Revision.Version. // It returns the resultant jobRuntime with version updated. CompareAndSetRuntime(ctx context.Context, jobRuntime *pbjob.RuntimeInfo) (*pbjob.RuntimeInfo, error) // CompareAndSetConfig compares the version of config supplied and the // version of config in cache. If the version matches, it would update // the config in cache and DB with the config supplied (Notice: it does // NOT mean job would use the new job config, job would still use the // config which its runtime.ConfigurationVersion points to). // CompareAndSetConfig would update JobConfig.ChangeLog.Version // automatically upon success. Caller should not manually modify // the value of JobConfig.ChangeLog.Version. // It returns the resultant jobConfig with version updated. // JobSpec is also passed along so that it can be written as is to the DB CompareAndSetConfig( ctx context.Context, config *pbjob.JobConfig, configAddOn *models.ConfigAddOn, spec *stateless.JobSpec, ) (jobmgrcommon.JobConfig, error) // CompareAndSetTask replaces the existing task runtime in DB and cache. // It uses RuntimeInfo.Revision.Version for concurrency control, and it would // update RuntimeInfo.Revision.Version automatically upon success. // Caller should not manually modify the value of RuntimeInfo.Revision.Version. // The `force` flag affects only stateless jobs. By default (with force flag // not set), for stateless job, if the task is becoming unavailable due to // host maintenance and update, then runtime is set only if it does not // violate the job SLA. If `force` flag is set, the task runtime will // be set even if it violates job SLA. CompareAndSetTask( ctx context.Context, id uint32, runtime *pbtask.RuntimeInfo, force bool, ) (*pbtask.RuntimeInfo, error) // IsPartiallyCreated returns if job has not been fully created yet IsPartiallyCreated(config jobmgrcommon.JobConfig) bool // ValidateEntityVersion validates the entity version of the job is the // same as provided in the input, and if not, then returns an error. ValidateEntityVersion(ctx context.Context, version *v1alphapeloton.EntityVersion) error // GetRuntime returns the runtime of the job GetRuntime(ctx context.Context) (*pbjob.RuntimeInfo, error) // GetConfig returns the current config of the job GetConfig(ctx context.Context) (jobmgrcommon.JobConfig, error) // GetCachedConfig returns the job config if // present in the cache. Returns nil otherwise. GetCachedConfig() jobmgrcommon.JobConfig // GetJobType returns the job type in the job config stored in the cache // The type can be nil when we read it. It should be only used for // non-critical purpose (e.g calculate delay). // Logically this should be part of JobConfig // TODO(zhixin): remove GetJobType from the interface after // EnqueueJobWithDefaultDelay does not take cached job GetJobType() pbjob.JobType // SetTaskUpdateTime updates the task update times in the job cache SetTaskUpdateTime(t *float64) // GetFirstTaskUpdateTime gets the first task update time GetFirstTaskUpdateTime() float64 // GetLastTaskUpdateTime gets the last task update time GetLastTaskUpdateTime() float64 // UpdateResourceUsage adds the task resource usage from a terminal task // to the resource usage map for this job UpdateResourceUsage(taskResourceUsage map[string]float64) // GetResourceUsage gets the resource usage map for this job GetResourceUsage() map[string]float64 // RecalculateResourceUsage recalculates the resource usage of a job // by adding together resource usage of all terminal tasks of this job. RecalculateResourceUsage(ctx context.Context) // CurrentState of the job. CurrentState() JobStateVector // GoalState of the job. GoalState() JobStateVector // Delete deletes the job from DB and clears the cache Delete(ctx context.Context) error // GetTaskStateCount returns the state/goal state count of all // tasks in a job, the total number of throttled tasks in // stateless jobs and the spread counts of a job GetTaskStateCount() ( map[TaskStateSummary]int, int, JobSpreadCounts) // GetWorkflowStateCount returns the state count of all workflows in the cache GetWorkflowStateCount() map[pbupdate.State]int // RepopulateInstanceAvailabilityInfo repopulates the SLA information in the job cache RepopulateInstanceAvailabilityInfo(ctx context.Context) error // GetInstanceAvailabilityType return the instance availability type per instance // for the specified instances. If `instanceFilter` is empty then the instance // availability type for all instances of the job is returned GetInstanceAvailabilityType( ctx context.Context, instances ...uint32, ) map[uint32]jobmgrcommon.InstanceAvailability_Type }
Job in the cache. TODO there a lot of methods in this interface. To determine if this can be broken up into smaller pieces.
type JobConfigCache ¶
type JobConfigCache interface { jobmgrcommon.JobConfig HasControllerTask() bool }
JobConfigCache is a union of JobConfig and helper methods only available for cached config
type JobFactory ¶
type JobFactory interface { // AddJob will create a Job if not present in cache, // else returns the current cached Job. AddJob(id *peloton.JobID) Job // ClearJob cleans up the job from the cache. ClearJob(jobID *peloton.JobID) // GetJob will return the current cached Job, // and nil if currently not in cache. GetJob(id *peloton.JobID) Job // GetAllJobs returns the list of all jobs in cache. GetAllJobs() map[string]Job // Start emitting metrics. Start() // Stop clears the current jobs and tasks in cache, stops metrics. Stop() }
JobFactory is the entrypoint object into the cache which stores job and tasks. This only runs in the job manager leader.
func InitJobFactory ¶
func InitJobFactory( jobStore storage.JobStore, taskStore storage.TaskStore, updateStore storage.UpdateStore, volumeStore storage.PersistentVolumeStore, ormStore *ormobjects.Store, parentScope tally.Scope, listeners []JobTaskListener, ) JobFactory
InitJobFactory initializes the job factory object.
type JobSpreadCounts ¶
type JobSpreadCounts struct {
// contains filtered or unexported fields
}
JobSpreadCounts contains task and host counts for jobs that use "spread" placement strategy. Counts are set to zero for invalid/inapplicable cases.
type JobStateVector ¶
JobStateVector defines the state of a job. This encapsulates both the actual state and the goal state.
type JobTaskListener ¶
type JobTaskListener interface { // Name returns a user-friendly name for the listener Name() string // JobRuntimeChanged is invoked when the runtime for a job is updated // in cache and persistent store. JobRuntimeChanged( jobID *peloton.JobID, jobType pbjob.JobType, runtime *pbjob.RuntimeInfo, ) // TaskRuntimeChanged is invoked when the runtime for a task is updated // in cache and persistent store. TaskRuntimeChanged( jobID *peloton.JobID, instanceID uint32, jobType pbjob.JobType, runtime *pbtask.RuntimeInfo, labels []*peloton.Label, ) }
JobTaskListener defines an interface that must to be implemented by a listener interested in job and task changes. The callbacks are invoked after updates to the cache are written through to the persistent store. Note that callbacks may not get invoked in the same order as the changes to objects in cache; the version field of the changed object (e.g. Changelog) is a better indicator of order. To keep things simple, the callbacks are invoked synchronously when the cached object is changed. Thus slow listeners can make operations on the cache slower, which must be avoided. Implementations must not
- modify the provided objects in any way
- do processing that can take a long time, such as blocking on locks or making remote calls. Such activities must be done in separate goroutines that are managed by the listener.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics is the struct containing all the counters that track internal state of the cache.
func NewMetrics ¶
NewMetrics returns a new Metrics struct, with all metrics initialized and rooted at the given tally.Scope
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option to create a workflow
func WithConfig ¶
func WithConfig( jobConfig *pbjob.JobConfig, prevJobConfig *pbjob.JobConfig, configAddOn *models.ConfigAddOn, jobSpec *stateless.JobSpec, ) Option
WithConfig defines the original config and target config for the workflow. Workflow could use the configs to calculate the instances it would need to work on as well as verify if the update is a noop. It also includes the target job spec which would be stored to the DB as part of workflow creation.
func WithInstanceToProcess ¶
func WithInstanceToProcess( instancesAdded []uint32, instancesUpdated []uint32, instancesRemoved []uint32, ) Option
WithInstanceToProcess defines the instances the workflow would work on. When it is provided, workflow would not calculate instances to process based on config.
func WithOpaqueData ¶
func WithOpaqueData(opaqueData *peloton.OpaqueData) Option
WithOpaqueData defines the opaque data provided by the user to be stored with the update
type Task ¶
type Task interface { // Identifier of the task. ID() uint32 // Job identifier the task belongs to. JobID() *peloton.JobID // GetRuntime returns the task run time GetRuntime(ctx context.Context) (*pbtask.RuntimeInfo, error) // GetCacheRuntime returns the task run time stored in the cache. // It returns nil if the is no runtime in the cache. GetCacheRuntime() *pbtask.RuntimeInfo // GetLabels returns the task labels GetLabels(ctx context.Context) ([]*peloton.Label, error) // CurrentState of the task. CurrentState() TaskStateVector // GoalState of the task. GoalState() TaskStateVector // StateSummary of the task. StateSummary() TaskStateSummary // TerminationStatus of the task. TerminationStatus() *pbtask.TerminationStatus }
Task in the cache.
type TaskMetrics ¶
type TaskMetrics struct { TimeToAssignNonRevocable tally.Timer TimeToAssignRevocable tally.Timer TimeToRunNonRevocable tally.Timer TimeToRunRevocable tally.Timer MeanSpreadQuotient tally.Gauge }
TaskMetrics contains counters for task that are managed by cache.
func NewTaskMetrics ¶
func NewTaskMetrics(scope tally.Scope) *TaskMetrics
NewTaskMetrics returns a new TaskMetrics struct, with all metrics initialized and rooted at the given tally.Scope
type TaskStateSummary ¶
type TaskStateVector ¶
type TaskStateVector struct { State pbtask.TaskState ConfigVersion uint64 MesosTaskID *mesos.TaskID }
TaskStateVector defines the state of a task. This encapsulates both the actual state and the goal state.
type Update ¶
type Update interface { WorkflowStrategy // Identifier of the update ID() *peloton.UpdateID // Job identifier the update belongs to JobID() *peloton.JobID // Create creates the update in DB and cache Create( ctx context.Context, jobID *peloton.JobID, jobConfig jobmgrcommon.JobConfig, prevJobConfig *pbjob.JobConfig, configAddOn *models.ConfigAddOn, instanceAdded []uint32, instanceUpdated []uint32, instanceRemoved []uint32, workflowType models.WorkflowType, updateConfig *pbupdate.UpdateConfig, opaqueData *peloton.OpaqueData, ) error // Modify modifies the update in DB and cache Modify( ctx context.Context, instancesAdded []uint32, instancesUpdated []uint32, instancesRemoved []uint32, ) error // Update updates the update in DB and cache WriteProgress(ctx context.Context, state pbupdate.State, instancesDone []uint32, instanceFailed []uint32, instancesCurrent []uint32) error // Pause pauses the current update progress Pause(ctx context.Context, opaqueData *peloton.OpaqueData) error // Resume resumes a paused update, and update would change // to the state before pause Resume(ctx context.Context, opaqueData *peloton.OpaqueData) error // Recover recovers the update from DB into the cache Recover(ctx context.Context) error // Cancel is used to cancel the update Cancel(ctx context.Context, opaqueData *peloton.OpaqueData) error // Rollback is used to rollback the update. Rollback( ctx context.Context, currentConfig *pbjob.JobConfig, targetConfig *pbjob.JobConfig, ) error // GetState returns the state of the update GetState() *UpdateStateVector // GetGoalState returns the goal state of the update GetGoalState() *UpdateStateVector // GetPrevState returns the previous state of the update GetPrevState() pbupdate.State // GetInstancesAdded returns the instance to be added with this update GetInstancesAdded() []uint32 // GetInstancesUpdated returns the existing instances to be updated // with this update GetInstancesUpdated() []uint32 // GetInstancesRemoved returns the existing instances to be removed // with this update GetInstancesRemoved() []uint32 // GetInstancesCurrent returns the current set of instances being updated GetInstancesCurrent() []uint32 // GetInstanceFailed returns the current set of instances marked as failed GetInstancesFailed() []uint32 // GetInstancesDone returns the current set of instances updated GetInstancesDone() []uint32 GetUpdateConfig() *pbupdate.UpdateConfig GetWorkflowType() models.WorkflowType // IsTaskInUpdateProgress returns true if a given task is // in progress for the given update, else returns false IsTaskInUpdateProgress(instanceID uint32) bool // IsTaskInFailed returns true if a given task is in the // instancesFailed list for the given update, else returns false IsTaskInFailed(instanceID uint32) bool // GetLastUpdateTime return the last update time of update object GetLastUpdateTime() time.Time }
Update of a job being stored in the cache.
type UpdateRequest ¶
type UpdateRequest int
UpdateRequest is used to indicate whether the caller wants to update only cache or update both database and cache. This is used during job manager recovery as only cache needs to be updated during recovery.
const ( // UpdateCacheOnly updates only the cache. It should be used only during // recovery. Also, it requires passing the complete runtime information. UpdateCacheOnly UpdateRequest = iota + 1 // UpdateCacheAndDB updates both DB and cache. The caller can pass the // complete runtime info or just a diff. UpdateCacheAndDB )
type UpdateStateVector ¶
type UpdateStateVector struct { // current update state State pbupdate.State // for state, it will be the old job config version // for goal state, it will be the desired job config version JobVersion uint64 // For state, it will store the instances which have already been updated, // and for goal state, it will store all the instances which // need to be updated. Instances []uint32 }
UpdateStateVector is used to the represent the state and goal state of an update to the goal state engine.
type WorkflowOps ¶
type WorkflowOps interface { // CreateWorkflow creates a workflow associated with // the calling object CreateWorkflow( ctx context.Context, workflowType models.WorkflowType, updateConfig *pbupdate.UpdateConfig, entityVersion *v1alphapeloton.EntityVersion, option ...Option, ) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) // PauseWorkflow pauses the current workflow, if any PauseWorkflow( ctx context.Context, entityVersion *v1alphapeloton.EntityVersion, option ...Option, ) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) // ResumeWorkflow resumes the current workflow, if any ResumeWorkflow( ctx context.Context, entityVersion *v1alphapeloton.EntityVersion, option ...Option, ) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) // AbortWorkflow aborts the current workflow, if any AbortWorkflow( ctx context.Context, entityVersion *v1alphapeloton.EntityVersion, option ...Option, ) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) // RollbackWorkflow rollbacks the current workflow, if any RollbackWorkflow(ctx context.Context) error // AddWorkflow add a workflow to the calling object AddWorkflow(updateID *peloton.UpdateID) Update // GetWorkflow gets the workflow to the calling object // it should only be used in place like handler, where // a read operation should not mutate cache GetWorkflow(updateID *peloton.UpdateID) Update // ClearWorkflow removes a workflow from the calling object ClearWorkflow(updateID *peloton.UpdateID) // GetAllWorkflows returns all workflows for the job GetAllWorkflows() map[string]Update }
WorkflowOps defines operations on workflow
type WorkflowStrategy ¶
type WorkflowStrategy interface { // IsInstanceComplete returns if an instance has reached the state // desired by the workflow IsInstanceComplete(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool // IsInstanceInProgress returns if an instance in the process of getting // to the state desired by the workflow IsInstanceInProgress(desiredConfigVersion uint64, runtime *pbtask.RuntimeInfo) bool // IsInstanceFailed returns if an instance is failed when getting // to the state desired by the workflow // TODO: now a task can both get true for IsInstanceInProgress and // IsInstanceFailed, it should get true for only one of the func. // Now the correctness of code is guarded by order of func call. IsInstanceFailed(runtime *pbtask.RuntimeInfo, maxAttempts uint32) bool // GetRuntimeDiff accepts the current task runtime of an instance and the desired // job config, it returns the RuntimeDiff to move the instance to the state desired // by the workflow. Return nil if no action is needed. GetRuntimeDiff(jobConfig *pbjob.JobConfig) jobmgrcommon.RuntimeDiff }
WorkflowStrategy is the strategy of driving instances to the desired state of the workflow