Documentation ¶
Index ¶
- Variables
- func DebugCtlTaskFramwork(disable bool)
- type Condition
- func WithLimitCond(limit int) Condition
- func WithTaskEpochCond(op Op, value uint32) Condition
- func WithTaskExecutorCond(op Op, value uint32) Condition
- func WithTaskIDCond(op Op, value uint64) Condition
- func WithTaskIDDesc() Condition
- func WithTaskParentTaskIDCond(op Op, value string) Condition
- func WithTaskRunnerCond(op Op, value string) Condition
- func WithTaskStatusCond(op Op, value task.TaskStatus) Condition
- type Op
- type RunnerOption
- func WithOptions(queryLimit int, parallelism int, maxWaitTasks int, fetchInterval time.Duration, ...) RunnerOption
- func WithRunnerFetchInterval(interval time.Duration) RunnerOption
- func WithRunnerFetchLimit(limit int) RunnerOption
- func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption
- func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption
- func WithRunnerLogger(logger *zap.Logger) RunnerOption
- func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption
- func WithRunnerParallelism(parallelism int) RunnerOption
- func WithRunnerRetryInterval(interval time.Duration) RunnerOption
- type TaskExecutor
- type TaskRunner
- type TaskService
- type TaskServiceHolder
- type TaskStorage
- type TaskStorageFactory
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func DebugCtlTaskFramwork ¶
func DebugCtlTaskFramwork(disable bool)
DebugCtlTaskFramwork disable task framework
Types ¶
type Condition ¶
type Condition func(*conditions)
Condition options for query tasks
func WithTaskEpochCond ¶
WithTaskEpochCond set task epoch condition
func WithTaskExecutorCond ¶
WithTaskExecutorCond set task executor condition
func WithTaskIDCond ¶
WithTaskIDCond set task id condition
func WithTaskIDDesc ¶
func WithTaskIDDesc() Condition
WithTaskIDDesc set query with order by task id desc
func WithTaskParentTaskIDCond ¶
WithTaskParentTaskIDCond set task ParentTaskID condition
func WithTaskRunnerCond ¶
WithTaskRunnerCond set task runner condition
func WithTaskStatusCond ¶
func WithTaskStatusCond(op Op, value task.TaskStatus) Condition
WithTaskStatusCond set status condition
type RunnerOption ¶
type RunnerOption func(*taskRunner)
RunnerOption option for create task runner
func WithOptions ¶
func WithOptions( queryLimit int, parallelism int, maxWaitTasks int, fetchInterval time.Duration, fetchTimeout time.Duration, retryInterval time.Duration, heartbeatInterval time.Duration, ) RunnerOption
WithOptions set all options needed by taskRunner
func WithRunnerFetchInterval ¶
func WithRunnerFetchInterval(interval time.Duration) RunnerOption
WithRunnerFetchInterval set fetch tasks interval duration
func WithRunnerFetchLimit ¶
func WithRunnerFetchLimit(limit int) RunnerOption
WithRunnerFetchLimit set fetch tasks limit
func WithRunnerFetchTimeout ¶
func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption
WithRunnerFetchTimeout set fetch timeout
func WithRunnerHeartbeatInterval ¶
func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption
WithRunnerHeartbeatInterval set heartbeat duration
func WithRunnerLogger ¶
func WithRunnerLogger(logger *zap.Logger) RunnerOption
WithRunnerLogger set logger
func WithRunnerMaxWaitTasks ¶
func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption
WithRunnerMaxWaitTasks set the maximum number of tasks waiting to be executed, more than that will block fetching tasks.
func WithRunnerParallelism ¶
func WithRunnerParallelism(parallelism int) RunnerOption
WithRunnerParallelism set the parallelism for execute tasks.
func WithRunnerRetryInterval ¶
func WithRunnerRetryInterval(interval time.Duration) RunnerOption
WithRunnerRetryInterval set retry interval duration for operation
type TaskExecutor ¶
TaskExecutor which is responsible for the execution logic of a specific Task, and the function exits to represent the completion of the task execution. In the process of task execution task may be interrupted at any time, so the implementation needs to frequently check the state of the Context, in the Context.Done(), as soon as possible to exit. Epoch is 1 means the task is executed for the first time, otherwise it means that the task is rescheduled, the task may be completed or not.
type TaskRunner ¶
type TaskRunner interface { // ID returns the TaskRunner ID ID() string // Start start the runner, after runner starts it will start to periodically load the tasks assigned to // the current executor, as well as periodically send heartbeats. Start() error // Stop stop the runner, all running tasks will be terminated Stop() error // Parallelism maximum number of concurrently executing Tasks Parallelism() int // RegisterExecutor register the task executor RegisterExecutor(code uint32, executor TaskExecutor) }
TaskRunner each runner can execute multiple task concurrently
func NewTaskRunner ¶
func NewTaskRunner(runnerID string, service TaskService, opts ...RunnerOption) TaskRunner
NewTaskRunner new task runner. The TaskRunner can be created by CN nodes and pull tasks from TaskService to execute periodically.
type TaskService ¶
type TaskService interface { // Close close the task service Close() error // Create Creates an asynchronous task that executes a single time, this method is idempotent, the // same task is not created repeatedly based on multiple calls. Create(context.Context, task.TaskMetadata) error // CreateBatch is similar to Create, but with a batch task list CreateBatch(context.Context, []task.TaskMetadata) error // CreateCronTask is similar to Create, but create a task that runs periodically, with the period // described using a Cron expression. CreateCronTask(ctx context.Context, task task.TaskMetadata, cronExpr string) error // Allocate allocate task runner fot spec task. Allocate(ctx context.Context, value task.Task, taskRunner string) error // Complete task completed. The result used to indicate whether the execution succeeded or failed Complete(ctx context.Context, taskRunner string, task task.Task, result task.ExecuteResult) error // Heartbeat sending a heartbeat tells the scheduler that the specified task is running normally. // If the scheduler does not receive the heartbeat for a long time, it will reassign the task executor // to execute the task. Returning `ErrInvalidTask` means that the Task has been reassigned or has // ended, and the Task execution needs to be terminated immediately。 Heartbeat(ctx context.Context, task task.Task) error // QueryTask query tasks by conditions QueryTask(context.Context, ...Condition) ([]task.Task, error) // QueryCronTask returns all cron task metadata QueryCronTask(context.Context) ([]task.CronTask, error) // StartScheduleCronTask start schedule cron tasks. A timer will be started to pull the latest CronTask // from the TaskStore at regular intervals, and a timer will be maintained in memory for all Cron's to be // triggered at regular intervals. StartScheduleCronTask() // StopScheduleCronTask stop schedule cron tasks. StopScheduleCronTask() // GetStorage returns the task storage GetStorage() TaskStorage }
TaskService Asynchronous Task Service, which provides scheduling execution and management of asynchronous tasks. CN, DN, HAKeeper, LogService will all hold this service.
func NewTaskService ¶
func NewTaskService(store TaskStorage, logger *zap.Logger) TaskService
NewTaskService create a task service based on a task storage.
type TaskServiceHolder ¶
type TaskServiceHolder interface { // Close close the holder Close() error // Get returns the taskservice Get() (TaskService, bool) // Create create the taskservice Create(command logservicepb.CreateTaskService) error }
TaskServiceHolder create and hold the task service in the cn, dn and log node. Create the TaskService from the heartbeat's CreateTaskService schedule command.
func NewTaskServiceHolder ¶
func NewTaskServiceHolder(logger *zap.Logger, addressFactory func() (string, error)) TaskServiceHolder
NewTaskServiceHolder create a task service hold, it will create task storage and task service from the hakeeper's schedule command.
func NewTaskServiceHolderWithTaskStorageFactorySelector ¶
func NewTaskServiceHolderWithTaskStorageFactorySelector(logger *zap.Logger, addressFactory func() (string, error), selector func(string, string, string) TaskStorageFactory) TaskServiceHolder
NewTaskServiceHolderWithTaskStorageFactorySelector is similar to NewTaskServiceHolder, but with a special task storage facroty selector
type TaskStorage ¶
type TaskStorage interface { // Close close the task storage Close() error // Add add tasks and returns number of successful added Add(context.Context, ...task.Task) (int, error) // Update update tasks and returns number of successful updated Update(context.Context, []task.Task, ...Condition) (int, error) // Delete delete tasks and returns number of successful deleted Delete(context.Context, ...Condition) (int, error) // Query query tasks by conditions Query(context.Context, ...Condition) ([]task.Task, error) // AddCronTask add cron task and returns number of successful added AddCronTask(context.Context, ...task.CronTask) (int, error) // QueryCronTask query all cron tasks QueryCronTask(context.Context) ([]task.CronTask, error) // UpdateCronTask crontask generates tasks periodically, and this update // needs to be in a transaction. Update cron task and insert a new task. // This update must be transactional and needs to be done conditionally // using CronTask.TriggerTimes and the task.Metadata.ID field. UpdateCronTask(context.Context, task.CronTask, task.Task) (int, error) }
TaskStorage task storage
func NewMemTaskStorage ¶
func NewMemTaskStorage() TaskStorage
func NewMysqlTaskStorage ¶
func NewMysqlTaskStorage(dsn, dbname string) (TaskStorage, error)
type TaskStorageFactory ¶
type TaskStorageFactory interface {
Create(address string) (TaskStorage, error)
}
func NewFixedTaskStorageFactory ¶
func NewFixedTaskStorageFactory(store TaskStorage) TaskStorageFactory
NewFixedTaskStorageFactory creates a fixed task storage factory which always returns the special taskstorage
func NewMySQLBasedTaskStorageFactory ¶
func NewMySQLBasedTaskStorageFactory(username, password, database string) TaskStorageFactory
NewMySQLBasedTaskStorageFactory creates a mysql based task storage factory using the special username, password and database