taskservice

package
v1.2.3-hotfix-20240916 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CondLimit condCode = iota
	CondTaskID
	CondTaskRunner
	CondTaskStatus
	CondTaskEpoch
	CondTaskParentTaskID
	CondTaskExecutor
	CondTaskType
	CondOrderByDesc
	CondAccountID
	CondAccount
	CondLastHeartbeat
	CondCronTaskId
	CondTaskMetadataId
)

Variables

View Source
var (
	// EQ record == condition
	EQ = Op(1)
	// GT record > condition
	GT = Op(2)
	// GE record >= condition
	GE = Op(3)
	// LT record < condition
	LT = Op(4)
	// LE record <= condition
	LE = Op(5)
	// IN record in condition
	IN = Op(6)
	// LIKE record LIKE condition
	LIKE = Op(7)

	OpName = map[Op]string{
		EQ:   "=",
		GT:   ">",
		GE:   ">=",
		LT:   "<",
		LE:   "<=",
		IN:   "IN",
		LIKE: "LIKE",
	}
)
View Source
var (
	ErrNotReady = moerr.NewInvalidStateNoCtx("task store not ready")
)

Functions

func DebugCtlTaskFramework added in v1.2.0

func DebugCtlTaskFramework(disable bool)

DebugCtlTaskFramework disable task framework

Types

type ActiveRoutine added in v1.1.0

type ActiveRoutine interface {
	// Resume resumes the go routine of the daemon task.
	Resume() error
	// Pause pauses the go routine of the daemon task.
	Pause() error
	// Cancel cancels the go routine of the daemon task.
	Cancel() error
}

ActiveRoutine is an interface that the go routine of the daemon task should implement.

type Condition

type Condition func(*conditions)

Condition options for query tasks

func WithAccount added in v1.1.0

func WithAccount(op Op, value string) Condition

WithAccount set task account condition.

func WithAccountID added in v1.1.0

func WithAccountID(op Op, value uint32) Condition

WithAccountID set task account ID condition.

func WithCronTaskId added in v1.1.2

func WithCronTaskId(op Op, value uint64) Condition

func WithLastHeartbeat added in v1.1.0

func WithLastHeartbeat(op Op, value int64) Condition

WithLastHeartbeat set last heartbeat condition.

func WithLimitCond

func WithLimitCond(limit int) Condition

WithLimitCond set query result limit

func WithTaskEpochCond

func WithTaskEpochCond(op Op, value uint32) Condition

WithTaskEpochCond set task epoch condition

func WithTaskExecutorCond

func WithTaskExecutorCond(op Op, value task.TaskCode) Condition

WithTaskExecutorCond set task executor condition

func WithTaskIDCond

func WithTaskIDCond(op Op, value uint64) Condition

WithTaskIDCond set task id condition

func WithTaskIDDesc

func WithTaskIDDesc() Condition

WithTaskIDDesc set query with order by task id desc

func WithTaskMetadataId added in v1.2.0

func WithTaskMetadataId(op Op, value string) Condition

func WithTaskParentTaskIDCond

func WithTaskParentTaskIDCond(op Op, value string) Condition

WithTaskParentTaskIDCond set task ParentTaskID condition

func WithTaskRunnerCond

func WithTaskRunnerCond(op Op, value string) Condition

WithTaskRunnerCond set task runner condition

func WithTaskStatusCond

func WithTaskStatusCond(value ...task.TaskStatus) Condition

WithTaskStatusCond set status condition

func WithTaskType added in v1.1.0

func WithTaskType(op Op, value string) Condition

WithTaskType set task type condition.

type Getter added in v1.2.0

type Getter func() (TaskService, bool)

type Op

type Op int

Op condition op

type RunnerOption

type RunnerOption func(*taskRunner)

RunnerOption option for create task runner

func WithOptions

func WithOptions(
	queryLimit int,
	parallelism int,
	maxWaitTasks int,
	fetchInterval time.Duration,
	fetchTimeout time.Duration,
	retryInterval time.Duration,
	heartbeatInterval time.Duration,
	heartbeatTimeout time.Duration,
) RunnerOption

WithOptions set all options needed by taskRunner

func WithRunnerFetchInterval

func WithRunnerFetchInterval(interval time.Duration) RunnerOption

WithRunnerFetchInterval set fetch tasks interval duration

func WithRunnerFetchLimit

func WithRunnerFetchLimit(limit int) RunnerOption

WithRunnerFetchLimit set fetch tasks limit

func WithRunnerFetchTimeout

func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption

WithRunnerFetchTimeout set fetch timeout

func WithRunnerHeartbeatInterval

func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption

WithRunnerHeartbeatInterval set heartbeat duration

func WithRunnerHeartbeatTimeout added in v1.1.0

func WithRunnerHeartbeatTimeout(timeout time.Duration) RunnerOption

WithRunnerHeartbeatTimeout set heartbeat timeout.

func WithRunnerLogger

func WithRunnerLogger(logger *zap.Logger) RunnerOption

WithRunnerLogger set logger

func WithRunnerMaxWaitTasks

func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption

WithRunnerMaxWaitTasks set the maximum number of tasks waiting to be executed, more than that will block fetching tasks.

func WithRunnerParallelism

func WithRunnerParallelism(parallelism int) RunnerOption

WithRunnerParallelism set the parallelism for execute tasks.

func WithRunnerRetryInterval

func WithRunnerRetryInterval(interval time.Duration) RunnerOption

WithRunnerRetryInterval set retry interval duration for operation

type TaskExecutor

type TaskExecutor func(ctx context.Context, task task.Task) error

TaskExecutor which is responsible for the execution logic of a specific Task, and the function exists to represent the completion of the task execution. In the process of task execution task may be interrupted at any time, so the implementation needs to frequently check the state of the Context, in the Context.Done(), as soon as possible to exit. Epoch is 1 means the task is executed for the first time, otherwise it means that the task is rescheduled, the task may be completed or not.

type TaskHandler added in v1.1.0

type TaskHandler interface {
	Handle(ctx context.Context) error
}

type TaskRunner

type TaskRunner interface {
	// ID returns the TaskRunner ID
	ID() string
	// Start start the runner, after runner starts it will start to periodically load the tasks assigned to
	// the current executor, as well as periodically send heartbeats.
	Start() error
	// Stop stop the runner, all running tasks will be terminated
	Stop() error
	// Parallelism maximum number of concurrently executing Tasks
	Parallelism() int
	// RegisterExecutor register the task executor
	RegisterExecutor(code task.TaskCode, executor TaskExecutor)
	// GetExecutor returns the task executor
	GetExecutor(code task.TaskCode) TaskExecutor
	// Attach attaches the active go-routine to the daemon task.
	Attach(ctx context.Context, taskID uint64, routine ActiveRoutine) error
}

TaskRunner each runner can execute multiple task concurrently

func NewTaskRunner

func NewTaskRunner(runnerID string, service TaskService, claimFn func(string) bool, opts ...RunnerOption) TaskRunner

NewTaskRunner new task runner. The TaskRunner can be created by CN nodes and pull tasks from TaskService to execute periodically.

type TaskService

type TaskService interface {
	// Close close the task service
	Close() error

	// CreateAsyncTask Creates an asynchronous task that executes a single time, this method is idempotent, the
	// same task is not created repeatedly based on multiple calls.
	CreateAsyncTask(context.Context, task.TaskMetadata) error
	// CreateBatch is similar to Create, but with a batch task list
	CreateBatch(context.Context, []task.TaskMetadata) error
	// CreateCronTask is similar to Create, but create a task that runs periodically, with the period
	// described using a Cron expression.
	CreateCronTask(ctx context.Context, task task.TaskMetadata, cronExpr string) error
	// Allocate allocate task runner fot spec task.
	Allocate(ctx context.Context, value task.AsyncTask, taskRunner string) error
	// Complete task completed. The result used to indicate whether the execution succeeded or failed
	Complete(ctx context.Context, taskRunner string, task task.AsyncTask, result task.ExecuteResult) error
	// Heartbeat sending a heartbeat tells the scheduler that the specified task is running normally.
	// If the scheduler does not receive the heartbeat for a long time, it will reassign the task executor
	// to execute the task. Returning `ErrInvalidTask` means that the Task has been reassigned or has
	// ended, and the Task execution needs to be terminated immediately。
	Heartbeat(ctx context.Context, task task.AsyncTask) error
	// QueryAsyncTask query tasks by conditions
	QueryAsyncTask(context.Context, ...Condition) ([]task.AsyncTask, error)
	// QueryCronTask query cron tasks by conditions
	QueryCronTask(context.Context, ...Condition) ([]task.CronTask, error)

	// CreateDaemonTask creates a daemon task that will run in background for long time.
	CreateDaemonTask(ctx context.Context, value task.TaskMetadata, details *task.Details) error
	// QueryDaemonTask returns all daemon tasks which match the conditions.
	QueryDaemonTask(ctx context.Context, conds ...Condition) ([]task.DaemonTask, error)
	// UpdateDaemonTask updates the daemon task record.
	UpdateDaemonTask(ctx context.Context, tasks []task.DaemonTask, cond ...Condition) (int, error)
	// HeartbeatDaemonTask sends heartbeat to daemon task.
	HeartbeatDaemonTask(ctx context.Context, task task.DaemonTask) error

	// StartScheduleCronTask start schedule cron tasks. A timer will be started to pull the latest CronTask
	// from the TaskStore at regular intervals, and a timer will be maintained in memory for all Cron's to be
	// triggered at regular intervals.
	StartScheduleCronTask()
	// StopScheduleCronTask stop schedule cron tasks.
	StopScheduleCronTask()

	// GetStorage returns the task storage
	GetStorage() TaskStorage
}

TaskService Asynchronous Task Service, which provides scheduling execution and management of asynchronous tasks. CN, DN, HAKeeper, LogService will all hold this service.

func NewTaskService

func NewTaskService(
	rt runtime.Runtime,
	store TaskStorage) TaskService

NewTaskService create a task service based on a task storage.

type TaskServiceHolder

type TaskServiceHolder interface {
	// Close close the holder
	Close() error
	// Get returns the taskservice
	Get() (TaskService, bool)
	// Create create the taskservice
	Create(command logservicepb.CreateTaskService) error
}

TaskServiceHolder create and hold the task service in the cn, tn and log node. Create the TaskService from the heartbeat's CreateTaskService schedule command.

func NewTaskServiceHolder

func NewTaskServiceHolder(
	rt runtime.Runtime,
	addressFactory func(context.Context, bool) (string, error)) TaskServiceHolder

NewTaskServiceHolder create a task service hold, it will create task storage and task service from the hakeeper's schedule command.

func NewTaskServiceHolderWithTaskStorageFactorySelector

func NewTaskServiceHolderWithTaskStorageFactorySelector(
	rt runtime.Runtime,
	addressFactory func(context.Context, bool) (string, error),
	selector func(string, string, string) TaskStorageFactory) TaskServiceHolder

NewTaskServiceHolderWithTaskStorageFactorySelector is similar to NewTaskServiceHolder, but with a special task storage facroty selector

type TaskStorage

type TaskStorage interface {
	// Close close the task storage
	Close() error
	// PingContext ping the db with context
	PingContext(context.Context) error

	// AddAsyncTask adds async tasks and returns number of successful added
	AddAsyncTask(context.Context, ...task.AsyncTask) (int, error)
	// UpdateAsyncTask updates async tasks and returns number of successful updated
	UpdateAsyncTask(context.Context, []task.AsyncTask, ...Condition) (int, error)
	// DeleteAsyncTask deletes tasks and returns number of successful deleted
	DeleteAsyncTask(context.Context, ...Condition) (int, error)
	// QueryAsyncTask queries tasks by conditions
	QueryAsyncTask(context.Context, ...Condition) ([]task.AsyncTask, error)

	// AddCronTask add cron task and returns number of successful added
	AddCronTask(context.Context, ...task.CronTask) (int, error)
	// QueryCronTask query all cron tasks
	QueryCronTask(context.Context, ...Condition) ([]task.CronTask, error)
	// UpdateCronTask crontask generates tasks periodically, and this update
	// needs to be in a transaction. Update cron task and insert a new task.
	// This update must be transactional and needs to be done conditionally
	// using CronTask.TriggerTimes and the task.Metadata.ID field.
	UpdateCronTask(context.Context, task.CronTask, task.AsyncTask) (int, error)

	// AddDaemonTask adds daemon tasks and returns number of successful added.
	AddDaemonTask(ctx context.Context, tasks ...task.DaemonTask) (int, error)
	// UpdateDaemonTask updates daemon tasks and returns number of successful updated.
	UpdateDaemonTask(ctx context.Context, tasks []task.DaemonTask, conds ...Condition) (int, error)
	// DeleteDaemonTask deletes daemon tasks and returns number of successful deleted.
	DeleteDaemonTask(ctx context.Context, condition ...Condition) (int, error)
	// QueryDaemonTask queries daemon tasks by conditions.
	QueryDaemonTask(ctx context.Context, condition ...Condition) ([]task.DaemonTask, error)
	// HeartbeatDaemonTask update the last heartbeat field of the task.
	HeartbeatDaemonTask(ctx context.Context, task []task.DaemonTask) (int, error)
}

TaskStorage task storage

func NewMemTaskStorage

func NewMemTaskStorage() TaskStorage

type TaskStorageFactory

type TaskStorageFactory interface {
	Create(address string) (TaskStorage, error)
}

func NewFixedTaskStorageFactory

func NewFixedTaskStorageFactory(store TaskStorage) TaskStorageFactory

NewFixedTaskStorageFactory creates a fixed task storage factory which always returns the special taskstorage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL