Documentation ¶
Index ¶
- Constants
- Variables
- func DebugCtlTaskFramework(disable bool)
- type ActiveRoutine
- type Condition
- func WithAccount(op Op, value string) Condition
- func WithAccountID(op Op, value uint32) Condition
- func WithCronTaskId(op Op, value uint64) Condition
- func WithLastHeartbeat(op Op, value int64) Condition
- func WithLimitCond(limit int) Condition
- func WithTaskEpochCond(op Op, value uint32) Condition
- func WithTaskExecutorCond(op Op, value task.TaskCode) Condition
- func WithTaskIDCond(op Op, value uint64) Condition
- func WithTaskIDDesc() Condition
- func WithTaskMetadataId(op Op, value string) Condition
- func WithTaskParentTaskIDCond(op Op, value string) Condition
- func WithTaskRunnerCond(op Op, value string) Condition
- func WithTaskStatusCond(value ...task.TaskStatus) Condition
- func WithTaskType(op Op, value string) Condition
- type Getter
- type Op
- type RunnerOption
- func WithOptions(queryLimit int, parallelism int, maxWaitTasks int, fetchInterval time.Duration, ...) RunnerOption
- func WithRunnerFetchInterval(interval time.Duration) RunnerOption
- func WithRunnerFetchLimit(limit int) RunnerOption
- func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption
- func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption
- func WithRunnerHeartbeatTimeout(timeout time.Duration) RunnerOption
- func WithRunnerLogger(logger *zap.Logger) RunnerOption
- func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption
- func WithRunnerParallelism(parallelism int) RunnerOption
- func WithRunnerRetryInterval(interval time.Duration) RunnerOption
- type TaskExecutor
- type TaskHandler
- type TaskRunner
- type TaskService
- type TaskServiceHolder
- type TaskStorage
- type TaskStorageFactory
Constants ¶
const ( CondLimit condCode = iota CondTaskID CondTaskRunner CondTaskStatus CondTaskEpoch CondTaskParentTaskID CondTaskExecutor CondTaskType CondOrderByDesc CondAccountID CondAccount CondLastHeartbeat CondCronTaskId CondTaskMetadataId )
Variables ¶
var ( // EQ record == condition EQ = Op(1) // GT record > condition GT = Op(2) // GE record >= condition GE = Op(3) // LT record < condition LT = Op(4) // LE record <= condition LE = Op(5) // IN record in condition IN = Op(6) // LIKE record LIKE condition LIKE = Op(7) OpName = map[Op]string{ EQ: "=", GT: ">", GE: ">=", LT: "<", LE: "<=", IN: "IN", LIKE: "LIKE", } )
var (
ErrNotReady = moerr.NewInvalidStateNoCtx("task store not ready")
)
Functions ¶
func DebugCtlTaskFramework ¶ added in v1.2.0
func DebugCtlTaskFramework(disable bool)
DebugCtlTaskFramework disable task framework
Types ¶
type ActiveRoutine ¶ added in v1.1.0
type ActiveRoutine interface { // Resume resumes the go routine of the daemon task. Resume() error // Pause pauses the go routine of the daemon task. Pause() error // Cancel cancels the go routine of the daemon task. Cancel() error }
ActiveRoutine is an interface that the go routine of the daemon task should implement.
type Condition ¶
type Condition func(*conditions)
Condition options for query tasks
func WithAccount ¶ added in v1.1.0
WithAccount set task account condition.
func WithAccountID ¶ added in v1.1.0
WithAccountID set task account ID condition.
func WithCronTaskId ¶ added in v1.1.2
func WithLastHeartbeat ¶ added in v1.1.0
WithLastHeartbeat set last heartbeat condition.
func WithTaskEpochCond ¶
WithTaskEpochCond set task epoch condition
func WithTaskExecutorCond ¶
WithTaskExecutorCond set task executor condition
func WithTaskIDCond ¶
WithTaskIDCond set task id condition
func WithTaskIDDesc ¶
func WithTaskIDDesc() Condition
WithTaskIDDesc set query with order by task id desc
func WithTaskMetadataId ¶ added in v1.2.0
func WithTaskParentTaskIDCond ¶
WithTaskParentTaskIDCond set task ParentTaskID condition
func WithTaskRunnerCond ¶
WithTaskRunnerCond set task runner condition
func WithTaskStatusCond ¶
func WithTaskStatusCond(value ...task.TaskStatus) Condition
WithTaskStatusCond set status condition
func WithTaskType ¶ added in v1.1.0
WithTaskType set task type condition.
type Getter ¶ added in v1.2.0
type Getter func() (TaskService, bool)
type RunnerOption ¶
type RunnerOption func(*taskRunner)
RunnerOption option for create task runner
func WithOptions ¶
func WithOptions( queryLimit int, parallelism int, maxWaitTasks int, fetchInterval time.Duration, fetchTimeout time.Duration, retryInterval time.Duration, heartbeatInterval time.Duration, heartbeatTimeout time.Duration, ) RunnerOption
WithOptions set all options needed by taskRunner
func WithRunnerFetchInterval ¶
func WithRunnerFetchInterval(interval time.Duration) RunnerOption
WithRunnerFetchInterval set fetch tasks interval duration
func WithRunnerFetchLimit ¶
func WithRunnerFetchLimit(limit int) RunnerOption
WithRunnerFetchLimit set fetch tasks limit
func WithRunnerFetchTimeout ¶
func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption
WithRunnerFetchTimeout set fetch timeout
func WithRunnerHeartbeatInterval ¶
func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption
WithRunnerHeartbeatInterval set heartbeat duration
func WithRunnerHeartbeatTimeout ¶ added in v1.1.0
func WithRunnerHeartbeatTimeout(timeout time.Duration) RunnerOption
WithRunnerHeartbeatTimeout set heartbeat timeout.
func WithRunnerLogger ¶
func WithRunnerLogger(logger *zap.Logger) RunnerOption
WithRunnerLogger set logger
func WithRunnerMaxWaitTasks ¶
func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption
WithRunnerMaxWaitTasks set the maximum number of tasks waiting to be executed, more than that will block fetching tasks.
func WithRunnerParallelism ¶
func WithRunnerParallelism(parallelism int) RunnerOption
WithRunnerParallelism set the parallelism for execute tasks.
func WithRunnerRetryInterval ¶
func WithRunnerRetryInterval(interval time.Duration) RunnerOption
WithRunnerRetryInterval set retry interval duration for operation
type TaskExecutor ¶
TaskExecutor which is responsible for the execution logic of a specific Task, and the function exists to represent the completion of the task execution. In the process of task execution task may be interrupted at any time, so the implementation needs to frequently check the state of the Context, in the Context.Done(), as soon as possible to exit. Epoch is 1 means the task is executed for the first time, otherwise it means that the task is rescheduled, the task may be completed or not.
type TaskHandler ¶ added in v1.1.0
type TaskRunner ¶
type TaskRunner interface { // ID returns the TaskRunner ID ID() string // Start start the runner, after runner starts it will start to periodically load the tasks assigned to // the current executor, as well as periodically send heartbeats. Start() error // Stop stop the runner, all running tasks will be terminated Stop() error // Parallelism maximum number of concurrently executing Tasks Parallelism() int // RegisterExecutor register the task executor RegisterExecutor(code task.TaskCode, executor TaskExecutor) // GetExecutor returns the task executor GetExecutor(code task.TaskCode) TaskExecutor // Attach attaches the active go-routine to the daemon task. Attach(ctx context.Context, taskID uint64, routine ActiveRoutine) error }
TaskRunner each runner can execute multiple task concurrently
func NewTaskRunner ¶
func NewTaskRunner(runnerID string, service TaskService, claimFn func(string) bool, opts ...RunnerOption) TaskRunner
NewTaskRunner new task runner. The TaskRunner can be created by CN nodes and pull tasks from TaskService to execute periodically.
type TaskService ¶
type TaskService interface { // Close close the task service Close() error // CreateAsyncTask Creates an asynchronous task that executes a single time, this method is idempotent, the // same task is not created repeatedly based on multiple calls. CreateAsyncTask(context.Context, task.TaskMetadata) error // CreateBatch is similar to Create, but with a batch task list CreateBatch(context.Context, []task.TaskMetadata) error // CreateCronTask is similar to Create, but create a task that runs periodically, with the period // described using a Cron expression. CreateCronTask(ctx context.Context, task task.TaskMetadata, cronExpr string) error // Allocate allocate task runner fot spec task. Allocate(ctx context.Context, value task.AsyncTask, taskRunner string) error // Complete task completed. The result used to indicate whether the execution succeeded or failed Complete(ctx context.Context, taskRunner string, task task.AsyncTask, result task.ExecuteResult) error // Heartbeat sending a heartbeat tells the scheduler that the specified task is running normally. // If the scheduler does not receive the heartbeat for a long time, it will reassign the task executor // to execute the task. Returning `ErrInvalidTask` means that the Task has been reassigned or has // ended, and the Task execution needs to be terminated immediately。 Heartbeat(ctx context.Context, task task.AsyncTask) error // QueryAsyncTask query tasks by conditions QueryAsyncTask(context.Context, ...Condition) ([]task.AsyncTask, error) // QueryCronTask query cron tasks by conditions QueryCronTask(context.Context, ...Condition) ([]task.CronTask, error) // CreateDaemonTask creates a daemon task that will run in background for long time. CreateDaemonTask(ctx context.Context, value task.TaskMetadata, details *task.Details) error // QueryDaemonTask returns all daemon tasks which match the conditions. QueryDaemonTask(ctx context.Context, conds ...Condition) ([]task.DaemonTask, error) // UpdateDaemonTask updates the daemon task record. UpdateDaemonTask(ctx context.Context, tasks []task.DaemonTask, cond ...Condition) (int, error) // HeartbeatDaemonTask sends heartbeat to daemon task. HeartbeatDaemonTask(ctx context.Context, task task.DaemonTask) error // StartScheduleCronTask start schedule cron tasks. A timer will be started to pull the latest CronTask // from the TaskStore at regular intervals, and a timer will be maintained in memory for all Cron's to be // triggered at regular intervals. StartScheduleCronTask() // StopScheduleCronTask stop schedule cron tasks. StopScheduleCronTask() // GetStorage returns the task storage GetStorage() TaskStorage }
TaskService Asynchronous Task Service, which provides scheduling execution and management of asynchronous tasks. CN, DN, HAKeeper, LogService will all hold this service.
func NewTaskService ¶
func NewTaskService( rt runtime.Runtime, store TaskStorage) TaskService
NewTaskService create a task service based on a task storage.
type TaskServiceHolder ¶
type TaskServiceHolder interface { // Close close the holder Close() error // Get returns the taskservice Get() (TaskService, bool) // Create create the taskservice Create(command logservicepb.CreateTaskService) error }
TaskServiceHolder create and hold the task service in the cn, tn and log node. Create the TaskService from the heartbeat's CreateTaskService schedule command.
func NewTaskServiceHolder ¶
func NewTaskServiceHolder( rt runtime.Runtime, addressFactory func(context.Context, bool) (string, error)) TaskServiceHolder
NewTaskServiceHolder create a task service hold, it will create task storage and task service from the hakeeper's schedule command.
func NewTaskServiceHolderWithTaskStorageFactorySelector ¶
func NewTaskServiceHolderWithTaskStorageFactorySelector( rt runtime.Runtime, addressFactory func(context.Context, bool) (string, error), selector func(string, string, string) TaskStorageFactory) TaskServiceHolder
NewTaskServiceHolderWithTaskStorageFactorySelector is similar to NewTaskServiceHolder, but with a special task storage facroty selector
type TaskStorage ¶
type TaskStorage interface { // Close close the task storage Close() error // PingContext ping the db with context PingContext(context.Context) error // AddAsyncTask adds async tasks and returns number of successful added AddAsyncTask(context.Context, ...task.AsyncTask) (int, error) // UpdateAsyncTask updates async tasks and returns number of successful updated UpdateAsyncTask(context.Context, []task.AsyncTask, ...Condition) (int, error) // DeleteAsyncTask deletes tasks and returns number of successful deleted DeleteAsyncTask(context.Context, ...Condition) (int, error) // QueryAsyncTask queries tasks by conditions QueryAsyncTask(context.Context, ...Condition) ([]task.AsyncTask, error) // AddCronTask add cron task and returns number of successful added AddCronTask(context.Context, ...task.CronTask) (int, error) // QueryCronTask query all cron tasks QueryCronTask(context.Context, ...Condition) ([]task.CronTask, error) // UpdateCronTask crontask generates tasks periodically, and this update // needs to be in a transaction. Update cron task and insert a new task. // This update must be transactional and needs to be done conditionally // using CronTask.TriggerTimes and the task.Metadata.ID field. UpdateCronTask(context.Context, task.CronTask, task.AsyncTask) (int, error) // AddDaemonTask adds daemon tasks and returns number of successful added. AddDaemonTask(ctx context.Context, tasks ...task.DaemonTask) (int, error) // UpdateDaemonTask updates daemon tasks and returns number of successful updated. UpdateDaemonTask(ctx context.Context, tasks []task.DaemonTask, conds ...Condition) (int, error) // DeleteDaemonTask deletes daemon tasks and returns number of successful deleted. DeleteDaemonTask(ctx context.Context, condition ...Condition) (int, error) // QueryDaemonTask queries daemon tasks by conditions. QueryDaemonTask(ctx context.Context, condition ...Condition) ([]task.DaemonTask, error) // HeartbeatDaemonTask update the last heartbeat field of the task. HeartbeatDaemonTask(ctx context.Context, task []task.DaemonTask) (int, error) }
TaskStorage task storage
func NewMemTaskStorage ¶
func NewMemTaskStorage() TaskStorage
type TaskStorageFactory ¶
type TaskStorageFactory interface {
Create(address string) (TaskStorage, error)
}
func NewFixedTaskStorageFactory ¶
func NewFixedTaskStorageFactory(store TaskStorage) TaskStorageFactory
NewFixedTaskStorageFactory creates a fixed task storage factory which always returns the special taskstorage