task

package
v0.0.0-...-ec03ccd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecutionTriggerManual   = "MANUAL"
	ExecutionTriggerSchedule = "SCHEDULE"
	ExecutionTriggerEvent    = "EVENT"
)

const definitions

View Source
const (
	// ExecRetainCounts is the params key of execution retain count
	ExecRetainCounts = "execution_retain_counts"
)

Variables

View Source
var (
	// ExecMgr is a global execution manager instance
	ExecMgr    = NewExecutionManager()
	ErrTimeOut = errors.New("stopping the execution timeout")
)
View Source
var (
	// HkHandler is a global instance of the HookHandler
	HkHandler = NewHookHandler()
)
View Source
var (
	// Mgr is a global task manager instance
	Mgr = NewManager()
)
View Source
var (
	// SweepMgr is a global sweep manager instance.
	SweepMgr = NewSweepManager()
)

Functions

func RegisterCheckInProcessor

func RegisterCheckInProcessor(vendorType string, processor CheckInProcessor) error

RegisterCheckInProcessor registers check in processor for the specific vendor type

func RegisterExecutionStatusChangePostFunc

func RegisterExecutionStatusChangePostFunc(vendorType string, fc ExecutionStatusChangePostFunc) error

RegisterExecutionStatusChangePostFunc registers an execution status change post function for the specific vendor type

func RegisterTaskStatusChangePostFunc

func RegisterTaskStatusChangePostFunc(vendorType string, fc StatusChangePostFunc) error

RegisterTaskStatusChangePostFunc registers a task status change post function for the specific vendor type

Types

type CheckInProcessor

type CheckInProcessor func(ctx context.Context, task *Task, sc *job.StatusChange) (err error)

CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook

type Execution

type Execution struct {
	ID int64 `json:"id"`
	// indicate the execution type: replication/GC/retention/scan/etc.
	VendorType string `json:"vendor_type"`
	// the ID of vendor policy/rule/etc. e.g. replication policy ID
	VendorID int64  `json:"vendor_id"`
	Status   string `json:"status"`
	// the detail message to explain the status in some cases. e.g.
	// 1. After creating the execution, there may be some errors before creating tasks, the
	// "StatusMessage" can contain the error message
	// 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case
	StatusMessage string       `json:"status_message"`
	Metrics       *dao.Metrics `json:"metrics"`
	// trigger type: manual/schedule/event
	Trigger string `json:"trigger"`
	// the customized attributes for different kinds of consumers
	ExtraAttrs map[string]interface{} `json:"extra_attrs"`
	StartTime  time.Time              `json:"start_time"`
	UpdateTime time.Time              `json:"update_time"`
	EndTime    time.Time              `json:"end_time"`
}

Execution is one run for one action. It contains one or more tasks and provides the summary view of the tasks

func (*Execution) IsOnGoing

func (exec *Execution) IsOnGoing() bool

IsOnGoing returns true when the execution is running

type ExecutionManager

type ExecutionManager interface {
	// Create an execution. The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.),
	// and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention).
	// The "extraAttrs" can be used to set the customized attributes
	Create(ctx context.Context, vendorType string, vendorID int64, trigger string,
		extraAttrs ...map[string]interface{}) (id int64, err error)
	// Update the extra attributes of the specified execution
	UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error)
	// MarkDone marks the status of the specified execution as success.
	// It must be called to update the execution status if the created execution contains no tasks.
	// In other cases, the execution status can be calculated from the referenced tasks automatically
	// and no need to update it explicitly
	MarkDone(ctx context.Context, id int64, message string) (err error)
	// MarkError marks the status of the specified execution as error.
	// It must be called to update the execution status when failed to create tasks.
	// In other cases, the execution status can be calculated from the referenced tasks automatically
	// and no need to update it explicitly
	MarkError(ctx context.Context, id int64, message string) (err error)
	// Stop all linked tasks of the specified execution
	Stop(ctx context.Context, id int64) (err error)
	// StopAndWait stops all linked tasks of the specified execution and waits until all tasks are stopped
	// or get an error
	StopAndWait(ctx context.Context, id int64, timeout time.Duration) (err error)
	// StopAndWaitWithError calls the StopAndWait first, if it doesn't return error, then it call MarkError if the origError is not empty
	StopAndWaitWithError(ctx context.Context, id int64, timeout time.Duration, origError error) (err error)
	// Delete the specified execution and its tasks
	Delete(ctx context.Context, id int64) (err error)
	// Delete all executions and tasks of the specific vendor. They can be deleted only when all the executions/tasks
	// of the vendor are in final status
	DeleteByVendor(ctx context.Context, vendorType string, vendorID int64) (err error)
	// Get the specified execution
	Get(ctx context.Context, id int64) (execution *Execution, err error)
	// List executions according to the query
	// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
	List(ctx context.Context, query *q.Query) (executions []*Execution, err error)
	// Count counts total of executions according to the query.
	// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
	Count(ctx context.Context, query *q.Query) (int64, error)
}

ExecutionManager manages executions. The execution and task managers provide an execution-task model to abstract the interactive with jobservice. All of the operations with jobservice should be delegated by them

func NewExecutionManager

func NewExecutionManager() ExecutionManager

NewExecutionManager return an instance of the default execution manager

type ExecutionStatusChangePostFunc

type ExecutionStatusChangePostFunc func(ctx context.Context, executionID int64, status string) (err error)

ExecutionStatusChangePostFunc is the function called after the execution status changed

type HookHandler

type HookHandler struct {
	// contains filtered or unexported fields
}

HookHandler handles the job status changing webhook

func NewHookHandler

func NewHookHandler() *HookHandler

NewHookHandler creates a hook handler instance

func (*HookHandler) Handle

func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error

Handle the job status changing webhook

type Job

type Job struct {
	Name       string
	Parameters job.Parameters
	Metadata   *job.Metadata
}

Job is the model represents the requested jobservice job

type Manager

type Manager interface {
	// Create submits the job to jobservice and creates a corresponding task record.
	// An execution must be created first and the task will be linked to it.
	// The "extraAttrs" can be used to set the customized attributes
	Create(ctx context.Context, executionID int64, job *Job, extraAttrs ...map[string]interface{}) (id int64, err error)
	// Stop the specified task
	Stop(ctx context.Context, id int64) (err error)
	// Get the specified task
	Get(ctx context.Context, id int64) (task *Task, err error)
	// List the tasks according to the query
	// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
	List(ctx context.Context, query *q.Query) (tasks []*Task, err error)
	// Update the extra attributes of the specified task
	UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error)
	// Get the log of the specified task
	GetLog(ctx context.Context, id int64) (log []byte, err error)
	// GetLogByJobID get the log of specified job id
	GetLogByJobID(ctx context.Context, jobID string) (log []byte, err error)
	// Count counts total of tasks according to the query.
	// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
	Count(ctx context.Context, query *q.Query) (int64, error)
	// Update the status of the specified task
	Update(ctx context.Context, task *Task, props ...string) error
	// UpdateStatusInBatch updates the status of tasks in batch
	UpdateStatusInBatch(ctx context.Context, jobIDs []string, status string, batchSize int) error
	// ExecutionIDsByVendorAndStatus retrieve execution id by vendor type and status
	ExecutionIDsByVendorAndStatus(ctx context.Context, vendorType, status string) ([]int64, error)
	// ListScanTasksByReportUUID lists scan tasks by report uuid, although it's a specific case but it will be
	// more suitable to support multi database in the future.
	ListScanTasksByReportUUID(ctx context.Context, uuid string) (tasks []*Task, err error)
	// RetrieveStatusFromTask retrieve status from task
	RetrieveStatusFromTask(ctx context.Context, reportID string) string
	// IsTaskFinished checks if the scan task is finished by report UUID
	IsTaskFinished(ctx context.Context, reportID string) bool
}

Manager manages tasks. The execution and task managers provide an execution-task model to abstract the interactive with jobservice. All of the operations with jobservice should be delegated by them

func NewManager

func NewManager() Manager

NewManager creates an instance of the default task manager

type StatusChangePostFunc

type StatusChangePostFunc func(ctx context.Context, taskID int64, status string) (err error)

StatusChangePostFunc is the function called after the task status changed

type SweepJob

type SweepJob struct {
	// contains filtered or unexported fields
}

SweepJob used to cleanup the executions and tasks for different vendors.

func (*SweepJob) MaxCurrency

func (sj *SweepJob) MaxCurrency() uint

MaxCurrency limit 1 concurrency of sweep job.

func (*SweepJob) MaxFails

func (sj *SweepJob) MaxFails() uint

MaxFails of sweep job. Don't need to retry.

func (*SweepJob) Run

func (sj *SweepJob) Run(ctx job.Context, params job.Parameters) error

Run the sweep process.

func (*SweepJob) ShouldRetry

func (sj *SweepJob) ShouldRetry() bool

ShouldRetry indicates no need to retry sweep job.

func (*SweepJob) Validate

func (sj *SweepJob) Validate(_ job.Parameters) error

Validate the parameters of preheat job.

type SweepManager

type SweepManager interface {
	// ListCandidates lists the candidate execution ids which met the sweep criteria.
	ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error)
	// Clean deletes the tasks belonging to the execution which in final status and deletes executions.
	Clean(ctx context.Context, execID []int64) (err error)
	// FixDanglingStateExecution fixes the dangling state execution.
	FixDanglingStateExecution(ctx context.Context) error
}

func NewSweepManager

func NewSweepManager() SweepManager

type Task

type Task struct {
	ID int64 `json:"id"`
	// indicate the task type: replication/GC/retention/scan/etc.
	VendorType  string `json:"vendor_type"`
	ExecutionID int64  `json:"execution_id"`
	Status      string `json:"status"`
	// the detail message to explain the status in some cases. e.g.
	// When the job is failed to submit to jobservice, this field can be used to explain the reason
	StatusMessage string `json:"status_message"`
	// the underlying job may retry several times
	RunCount int32 `json:"run_count"`
	// the ID of jobservice job
	JobID string `json:"job_id"`
	// the customized attributes for different kinds of consumers
	ExtraAttrs map[string]interface{} `json:"extra_attrs"`
	// the time that the task record created
	CreationTime time.Time `json:"creation_time"`
	// the time that the underlying job starts
	StartTime      time.Time `json:"start_time"`
	UpdateTime     time.Time `json:"update_time"`
	EndTime        time.Time `json:"end_time"`
	StatusRevision int64     `json:"status_revision"`
}

Task is the unit for running. It stores the jobservice job records and related information

func (*Task) From

func (t *Task) From(task *dao.Task)

From constructs a task from DAO model

func (*Task) GetBoolFromExtraAttrs

func (t *Task) GetBoolFromExtraAttrs(key string) bool

GetBoolFromExtraAttrs returns the bool value specified by key

func (*Task) GetNumFromExtraAttrs

func (t *Task) GetNumFromExtraAttrs(key string) float64

GetNumFromExtraAttrs returns the num value specified by key

func (*Task) GetStringFromExtraAttrs

func (t *Task) GetStringFromExtraAttrs(key string) string

GetStringFromExtraAttrs returns the string value specified by key

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL