Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterCheckInProcessor(vendorType string, processor CheckInProcessor) error
- func RegisterExecutionStatusChangePostFunc(vendorType string, fc ExecutionStatusChangePostFunc) error
- func RegisterTaskStatusChangePostFunc(vendorType string, fc StatusChangePostFunc) error
- type CheckInProcessor
- type Execution
- type ExecutionManager
- type ExecutionStatusChangePostFunc
- type HookHandler
- type Job
- type Manager
- type StatusChangePostFunc
- type SweepJob
- type SweepManager
- type Task
Constants ¶
const ( ExecutionTriggerManual = "MANUAL" ExecutionTriggerSchedule = "SCHEDULE" ExecutionTriggerEvent = "EVENT" )
const definitions
const (
// ExecRetainCounts is the params key of execution retain count
ExecRetainCounts = "execution_retain_counts"
)
Variables ¶
var ( // ExecMgr is a global execution manager instance ExecMgr = NewExecutionManager() ErrTimeOut = errors.New("stopping the execution timeout") )
var ( // HkHandler is a global instance of the HookHandler HkHandler = NewHookHandler() )
var ( // Mgr is a global task manager instance Mgr = NewManager() )
var ( // SweepMgr is a global sweep manager instance. SweepMgr = NewSweepManager() )
Functions ¶
func RegisterCheckInProcessor ¶
func RegisterCheckInProcessor(vendorType string, processor CheckInProcessor) error
RegisterCheckInProcessor registers check in processor for the specific vendor type
func RegisterExecutionStatusChangePostFunc ¶
func RegisterExecutionStatusChangePostFunc(vendorType string, fc ExecutionStatusChangePostFunc) error
RegisterExecutionStatusChangePostFunc registers an execution status change post function for the specific vendor type
func RegisterTaskStatusChangePostFunc ¶
func RegisterTaskStatusChangePostFunc(vendorType string, fc StatusChangePostFunc) error
RegisterTaskStatusChangePostFunc registers a task status change post function for the specific vendor type
Types ¶
type CheckInProcessor ¶
CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook
type Execution ¶
type Execution struct { ID int64 `json:"id"` // indicate the execution type: replication/GC/retention/scan/etc. VendorType string `json:"vendor_type"` // the ID of vendor policy/rule/etc. e.g. replication policy ID VendorID int64 `json:"vendor_id"` Status string `json:"status"` // the detail message to explain the status in some cases. e.g. // 1. After creating the execution, there may be some errors before creating tasks, the // "StatusMessage" can contain the error message // 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case StatusMessage string `json:"status_message"` Metrics *dao.Metrics `json:"metrics"` // trigger type: manual/schedule/event Trigger string `json:"trigger"` // the customized attributes for different kinds of consumers ExtraAttrs map[string]interface{} `json:"extra_attrs"` StartTime time.Time `json:"start_time"` UpdateTime time.Time `json:"update_time"` EndTime time.Time `json:"end_time"` }
Execution is one run for one action. It contains one or more tasks and provides the summary view of the tasks
type ExecutionManager ¶
type ExecutionManager interface { // Create an execution. The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.), // and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention). // The "extraAttrs" can be used to set the customized attributes Create(ctx context.Context, vendorType string, vendorID int64, trigger string, extraAttrs ...map[string]interface{}) (id int64, err error) // Update the extra attributes of the specified execution UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error) // MarkDone marks the status of the specified execution as success. // It must be called to update the execution status if the created execution contains no tasks. // In other cases, the execution status can be calculated from the referenced tasks automatically // and no need to update it explicitly MarkDone(ctx context.Context, id int64, message string) (err error) // MarkError marks the status of the specified execution as error. // It must be called to update the execution status when failed to create tasks. // In other cases, the execution status can be calculated from the referenced tasks automatically // and no need to update it explicitly MarkError(ctx context.Context, id int64, message string) (err error) // Stop all linked tasks of the specified execution Stop(ctx context.Context, id int64) (err error) // StopAndWait stops all linked tasks of the specified execution and waits until all tasks are stopped // or get an error StopAndWait(ctx context.Context, id int64, timeout time.Duration) (err error) // StopAndWaitWithError calls the StopAndWait first, if it doesn't return error, then it call MarkError if the origError is not empty StopAndWaitWithError(ctx context.Context, id int64, timeout time.Duration, origError error) (err error) // Delete the specified execution and its tasks Delete(ctx context.Context, id int64) (err error) // Delete all executions and tasks of the specific vendor. They can be deleted only when all the executions/tasks // of the vendor are in final status DeleteByVendor(ctx context.Context, vendorType string, vendorID int64) (err error) // Get the specified execution Get(ctx context.Context, id int64) (execution *Execution, err error) // List executions according to the query // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' List(ctx context.Context, query *q.Query) (executions []*Execution, err error) // Count counts total of executions according to the query. // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' Count(ctx context.Context, query *q.Query) (int64, error) }
ExecutionManager manages executions. The execution and task managers provide an execution-task model to abstract the interactive with jobservice. All of the operations with jobservice should be delegated by them
func NewExecutionManager ¶
func NewExecutionManager() ExecutionManager
NewExecutionManager return an instance of the default execution manager
type ExecutionStatusChangePostFunc ¶
type ExecutionStatusChangePostFunc func(ctx context.Context, executionID int64, status string) (err error)
ExecutionStatusChangePostFunc is the function called after the execution status changed
type HookHandler ¶
type HookHandler struct {
// contains filtered or unexported fields
}
HookHandler handles the job status changing webhook
func NewHookHandler ¶
func NewHookHandler() *HookHandler
NewHookHandler creates a hook handler instance
func (*HookHandler) Handle ¶
func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error
Handle the job status changing webhook
type Job ¶
type Job struct { Name string Parameters job.Parameters Metadata *job.Metadata }
Job is the model represents the requested jobservice job
type Manager ¶
type Manager interface { // Create submits the job to jobservice and creates a corresponding task record. // An execution must be created first and the task will be linked to it. // The "extraAttrs" can be used to set the customized attributes Create(ctx context.Context, executionID int64, job *Job, extraAttrs ...map[string]interface{}) (id int64, err error) // Stop the specified task Stop(ctx context.Context, id int64) (err error) // Get the specified task Get(ctx context.Context, id int64) (task *Task, err error) // List the tasks according to the query // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' List(ctx context.Context, query *q.Query) (tasks []*Task, err error) // Update the extra attributes of the specified task UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error) // Get the log of the specified task GetLog(ctx context.Context, id int64) (log []byte, err error) // GetLogByJobID get the log of specified job id GetLogByJobID(ctx context.Context, jobID string) (log []byte, err error) // Count counts total of tasks according to the query. // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' Count(ctx context.Context, query *q.Query) (int64, error) // Update the status of the specified task Update(ctx context.Context, task *Task, props ...string) error // UpdateStatusInBatch updates the status of tasks in batch UpdateStatusInBatch(ctx context.Context, jobIDs []string, status string, batchSize int) error // ExecutionIDsByVendorAndStatus retrieve execution id by vendor type and status ExecutionIDsByVendorAndStatus(ctx context.Context, vendorType, status string) ([]int64, error) // ListScanTasksByReportUUID lists scan tasks by report uuid, although it's a specific case but it will be // more suitable to support multi database in the future. ListScanTasksByReportUUID(ctx context.Context, uuid string) (tasks []*Task, err error) // RetrieveStatusFromTask retrieve status from task RetrieveStatusFromTask(ctx context.Context, reportID string) string // IsTaskFinished checks if the scan task is finished by report UUID IsTaskFinished(ctx context.Context, reportID string) bool }
Manager manages tasks. The execution and task managers provide an execution-task model to abstract the interactive with jobservice. All of the operations with jobservice should be delegated by them
func NewManager ¶
func NewManager() Manager
NewManager creates an instance of the default task manager
type StatusChangePostFunc ¶
StatusChangePostFunc is the function called after the task status changed
type SweepJob ¶
type SweepJob struct {
// contains filtered or unexported fields
}
SweepJob used to cleanup the executions and tasks for different vendors.
func (*SweepJob) MaxCurrency ¶
MaxCurrency limit 1 concurrency of sweep job.
func (*SweepJob) ShouldRetry ¶
ShouldRetry indicates no need to retry sweep job.
type SweepManager ¶
type SweepManager interface { // ListCandidates lists the candidate execution ids which met the sweep criteria. ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error) // Clean deletes the tasks belonging to the execution which in final status and deletes executions. Clean(ctx context.Context, execID []int64) (err error) // FixDanglingStateExecution fixes the dangling state execution. FixDanglingStateExecution(ctx context.Context) error }
func NewSweepManager ¶
func NewSweepManager() SweepManager
type Task ¶
type Task struct { ID int64 `json:"id"` // indicate the task type: replication/GC/retention/scan/etc. VendorType string `json:"vendor_type"` ExecutionID int64 `json:"execution_id"` Status string `json:"status"` // the detail message to explain the status in some cases. e.g. // When the job is failed to submit to jobservice, this field can be used to explain the reason StatusMessage string `json:"status_message"` // the underlying job may retry several times RunCount int32 `json:"run_count"` // the ID of jobservice job JobID string `json:"job_id"` // the customized attributes for different kinds of consumers ExtraAttrs map[string]interface{} `json:"extra_attrs"` // the time that the task record created CreationTime time.Time `json:"creation_time"` // the time that the underlying job starts StartTime time.Time `json:"start_time"` UpdateTime time.Time `json:"update_time"` EndTime time.Time `json:"end_time"` StatusRevision int64 `json:"status_revision"` }
Task is the unit for running. It stores the jobservice job records and related information
func (*Task) GetBoolFromExtraAttrs ¶
GetBoolFromExtraAttrs returns the bool value specified by key
func (*Task) GetNumFromExtraAttrs ¶
GetNumFromExtraAttrs returns the num value specified by key
func (*Task) GetStringFromExtraAttrs ¶
GetStringFromExtraAttrs returns the string value specified by key