taskservice

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// EQ record == condition
	EQ = Op(1)
	// GT record > condition
	GT = Op(2)
	// GE record >= condition
	GE = Op(3)
	// LT record < condition
	LT = Op(4)
	// LE record <= condition
	LE = Op(5)

	OpName = map[Op]string{
		EQ: "=",
		GT: ">",
		GE: ">=",
		LT: "<",
		LE: "<=",
	}
)

Functions

func DebugCtlTaskFramwork

func DebugCtlTaskFramwork(disable bool)

DebugCtlTaskFramwork disable task framework

Types

type Condition

type Condition func(*conditions)

Condition options for query tasks

func WithLimitCond

func WithLimitCond(limit int) Condition

WithLimitCond set query result limit

func WithTaskEpochCond

func WithTaskEpochCond(op Op, value uint32) Condition

WithTaskEpochCond set task epoch condition

func WithTaskExecutorCond

func WithTaskExecutorCond(op Op, value uint32) Condition

WithTaskExecutorCond set task executor condition

func WithTaskIDCond

func WithTaskIDCond(op Op, value uint64) Condition

WithTaskIDCond set task id condition

func WithTaskIDDesc

func WithTaskIDDesc() Condition

WithTaskIDDesc set query with order by task id desc

func WithTaskParentTaskIDCond

func WithTaskParentTaskIDCond(op Op, value string) Condition

WithTaskParentTaskIDCond set task ParentTaskID condition

func WithTaskRunnerCond

func WithTaskRunnerCond(op Op, value string) Condition

WithTaskRunnerCond set task runner condition

func WithTaskStatusCond

func WithTaskStatusCond(op Op, value task.TaskStatus) Condition

WithTaskStatusCond set status condition

type Op

type Op int

Op condition op

type RunnerOption

type RunnerOption func(*taskRunner)

RunnerOption option for create task runner

func WithOptions

func WithOptions(
	queryLimit int,
	parallelism int,
	maxWaitTasks int,
	fetchInterval time.Duration,
	fetchTimeout time.Duration,
	retryInterval time.Duration,
	heartbeatInterval time.Duration,
) RunnerOption

WithOptions set all options needed by taskRunner

func WithRunnerFetchInterval

func WithRunnerFetchInterval(interval time.Duration) RunnerOption

WithRunnerFetchInterval set fetch tasks interval duration

func WithRunnerFetchLimit

func WithRunnerFetchLimit(limit int) RunnerOption

WithRunnerFetchLimit set fetch tasks limit

func WithRunnerFetchTimeout

func WithRunnerFetchTimeout(timeout time.Duration) RunnerOption

WithRunnerFetchTimeout set fetch timeout

func WithRunnerHeartbeatInterval

func WithRunnerHeartbeatInterval(interval time.Duration) RunnerOption

WithRunnerHeartbeatInterval set heartbeat duration

func WithRunnerLogger

func WithRunnerLogger(logger *zap.Logger) RunnerOption

WithRunnerLogger set logger

func WithRunnerMaxWaitTasks

func WithRunnerMaxWaitTasks(maxWaitTasks int) RunnerOption

WithRunnerMaxWaitTasks set the maximum number of tasks waiting to be executed, more than that will block fetching tasks.

func WithRunnerParallelism

func WithRunnerParallelism(parallelism int) RunnerOption

WithRunnerParallelism set the parallelism for execute tasks.

func WithRunnerRetryInterval

func WithRunnerRetryInterval(interval time.Duration) RunnerOption

WithRunnerRetryInterval set retry interval duration for operation

type TaskExecutor

type TaskExecutor func(ctx context.Context, task task.Task) error

TaskExecutor which is responsible for the execution logic of a specific Task, and the function exits to represent the completion of the task execution. In the process of task execution task may be interrupted at any time, so the implementation needs to frequently check the state of the Context, in the Context.Done(), as soon as possible to exit. Epoch is 1 means the task is executed for the first time, otherwise it means that the task is rescheduled, the task may be completed or not.

type TaskRunner

type TaskRunner interface {
	// ID returns the TaskRunner ID
	ID() string
	// Start start the runner, after runner starts it will start to periodically load the tasks assigned to
	// the current executor, as well as periodically send heartbeats.
	Start() error
	// Stop stop the runner, all running tasks will be terminated
	Stop() error
	// Parallelism maximum number of concurrently executing Tasks
	Parallelism() int
	// RegisterExecutor register the task executor
	RegisterExecutor(code uint32, executor TaskExecutor)
}

TaskRunner each runner can execute multiple task concurrently

func NewTaskRunner

func NewTaskRunner(runnerID string, service TaskService, opts ...RunnerOption) TaskRunner

NewTaskRunner new task runner. The TaskRunner can be created by CN nodes and pull tasks from TaskService to execute periodically.

type TaskService

type TaskService interface {
	// Close close the task service
	Close() error

	// Create Creates an asynchronous task that executes a single time, this method is idempotent, the
	// same task is not created repeatedly based on multiple calls.
	Create(context.Context, task.TaskMetadata) error
	// CreateBatch is similar to Create, but with a batch task list
	CreateBatch(context.Context, []task.TaskMetadata) error
	// CreateCronTask is similar to Create, but create a task that runs periodically, with the period
	// described using a Cron expression.
	CreateCronTask(ctx context.Context, task task.TaskMetadata, cronExpr string) error
	// Allocate allocate task runner fot spec task.
	Allocate(ctx context.Context, value task.Task, taskRunner string) error
	// Complete task completed. The result used to indicate whether the execution succeeded or failed
	Complete(ctx context.Context, taskRunner string, task task.Task, result task.ExecuteResult) error
	// Heartbeat sending a heartbeat tells the scheduler that the specified task is running normally.
	// If the scheduler does not receive the heartbeat for a long time, it will reassign the task executor
	// to execute the task. Returning `ErrInvalidTask` means that the Task has been reassigned or has
	// ended, and the Task execution needs to be terminated immediately。
	Heartbeat(ctx context.Context, task task.Task) error
	// QueryTask query tasks by conditions
	QueryTask(context.Context, ...Condition) ([]task.Task, error)
	// QueryCronTask returns all cron task metadata
	QueryCronTask(context.Context) ([]task.CronTask, error)

	// StartScheduleCronTask start schedule cron tasks. A timer will be started to pull the latest CronTask
	// from the TaskStore at regular intervals, and a timer will be maintained in memory for all Cron's to be
	// triggered at regular intervals.
	StartScheduleCronTask()
	// StopScheduleCronTask stop schedule cron tasks.
	StopScheduleCronTask()

	// GetStorage returns the task storage
	GetStorage() TaskStorage
}

TaskService Asynchronous Task Service, which provides scheduling execution and management of asynchronous tasks. CN, DN, HAKeeper, LogService will all hold this service.

func NewTaskService

func NewTaskService(store TaskStorage, logger *zap.Logger) TaskService

NewTaskService create a task service based on a task storage.

type TaskServiceHolder

type TaskServiceHolder interface {
	// Close close the holder
	Close() error
	// Get returns the taskservice
	Get() (TaskService, bool)
	// Create create the taskservice
	Create(command logservicepb.CreateTaskService) error
}

TaskServiceHolder create and hold the task service in the cn, dn and log node. Create the TaskService from the heartbeat's CreateTaskService schedule command.

func NewTaskServiceHolder

func NewTaskServiceHolder(logger *zap.Logger,
	addressFactory func() (string, error)) TaskServiceHolder

NewTaskServiceHolder create a task service hold, it will create task storage and task service from the hakeeper's schedule command.

func NewTaskServiceHolderWithTaskStorageFactorySelector

func NewTaskServiceHolderWithTaskStorageFactorySelector(logger *zap.Logger,
	addressFactory func() (string, error),
	selector func(string, string, string) TaskStorageFactory) TaskServiceHolder

NewTaskServiceHolderWithTaskStorageFactorySelector is similar to NewTaskServiceHolder, but with a special task storage facroty selector

type TaskStorage

type TaskStorage interface {
	// Close close the task storage
	Close() error

	// Add add tasks and returns number of successful added
	Add(context.Context, ...task.Task) (int, error)
	// Update update tasks and returns number of successful updated
	Update(context.Context, []task.Task, ...Condition) (int, error)
	// Delete delete tasks and returns number of successful deleted
	Delete(context.Context, ...Condition) (int, error)
	// Query query tasks by conditions
	Query(context.Context, ...Condition) ([]task.Task, error)

	// AddCronTask add cron task and returns number of successful added
	AddCronTask(context.Context, ...task.CronTask) (int, error)
	// QueryCronTask query all cron tasks
	QueryCronTask(context.Context) ([]task.CronTask, error)

	// UpdateCronTask crontask generates tasks periodically, and this update
	// needs to be in a transaction. Update cron task and insert a new task.
	// This update must be transactional and needs to be done conditionally
	// using CronTask.TriggerTimes and the task.Metadata.ID field.
	UpdateCronTask(context.Context, task.CronTask, task.Task) (int, error)
}

TaskStorage task storage

func NewMemTaskStorage

func NewMemTaskStorage() TaskStorage

func NewMysqlTaskStorage

func NewMysqlTaskStorage(dsn, dbname string) (TaskStorage, error)

type TaskStorageFactory

type TaskStorageFactory interface {
	Create(address string) (TaskStorage, error)
}

func NewFixedTaskStorageFactory

func NewFixedTaskStorageFactory(store TaskStorage) TaskStorageFactory

NewFixedTaskStorageFactory creates a fixed task storage factory which always returns the special taskstorage

func NewMySQLBasedTaskStorageFactory

func NewMySQLBasedTaskStorageFactory(username, password, database string) TaskStorageFactory

NewMySQLBasedTaskStorageFactory creates a mysql based task storage factory using the special username, password and database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL