Documentation ¶
Index ¶
- Variables
- func Register(key TaskKey, definition TaskDefinition)
- func Run(ctx context.Context, key TaskKey, arg interface{}) error
- func RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error
- func StartWithOptions(db *gorm.DB, table string, options ...Option)
- func Stop(wait bool)
- func Transaction(fc func(tx *gorm.DB) error) (err error)
- type CtxMarshaler
- type Logger
- type Option
- func WithCheckCallback(f func(logger Logger, abnormalTasks []Task)) Option
- func WithContext(ctx context.Context) Option
- func WithCtxMarshaler(m CtxMarshaler) Option
- func WithDryRun(flag bool) Option
- func WithInitializedTimeout(d time.Duration) Option
- func WithInstantScanInterval(d time.Duration) Option
- func WithLoggerFactory(f func(ctx context.Context) Logger) Option
- func WithPoolSize(size int) Option
- func WithRunningTimeout(d time.Duration) Option
- func WithScanInterval(d time.Duration) Option
- func WithStorageTimeout(d time.Duration) Option
- func WithWaitTimeout(d time.Duration) Option
- type Task
- type TaskDefinition
- type TaskExtra
- type TaskHandler
- type TaskKey
- type TaskManager
- func (s *TaskManager) ForceRerunTasks(taskIDs []uint64, status TaskStatus) (int64, error)
- func (s *TaskManager) QueryUnsuccessfulTasks(limit, offset int) ([]Task, error)
- func (s *TaskManager) Register(key TaskKey, definition TaskDefinition)
- func (s *TaskManager) Run(ctx context.Context, key TaskKey, arg interface{}) error
- func (s *TaskManager) RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error
- func (s *TaskManager) Start()
- func (s *TaskManager) Stop(wait bool)
- func (s *TaskManager) Transaction(fc func(tx *gorm.DB) error) (err error)
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
var ( // ErrZeroRowsAffected represents zero rows affected in a database operation. ErrZeroRowsAffected = errors.New("zero rows affected") // ErrUnexpected represents unexpected error occurred. ErrUnexpected = errors.New("unexpected") // ErrTaskNotFound represents certain task not found. ErrTaskNotFound = errors.New("task not found") // ErrOption represents option is invalid. ErrOption = errors.New("option invalid") // ErrDefNilHandler represents Handler in the task definition is nil. ErrDefNilHandler = errors.New("definition handler is nil") // ErrDefEmptyPrimaryKey represents primaryKey in the task definition is empty. ErrDefEmptyPrimaryKey = errors.New("definition primary key is empty") // ErrDefInvalidLoopInterval represents loopInterval in the task definition is invalid. ErrDefInvalidLoopInterval = errors.New("definition loop interval is invalid") // ErrDefInvalidArgument represents argument in the task definition is invalid. ErrDefInvalidArgument = errors.New("definition argument is invalid") )
Functions ¶
func Register ¶
func Register(key TaskKey, definition TaskDefinition)
Register binds a task definition to a certain task key.
func RunWithTx ¶
RunWithTx makes it possible to create a task along with other database operations in the same transaction.
func StartWithOptions ¶
StartWithOptions configures the default task manager and starts it. This function should be called before any other function is called.
Types ¶
type CtxMarshaler ¶
type CtxMarshaler interface { MarshalCtx(ctx context.Context) ([]byte, error) UnmarshalCtx(bytes []byte) (context.Context, error) }
CtxMarshaler is used to marshal or unmarshal context.
type Logger ¶
type Logger interface { Printf(format string, args ...interface{}) Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) }
Logger is a logging interface for logging necessary messages.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is a interface.
func WithCheckCallback ¶
WithCheckCallback set the checkCallback option.
func WithCtxMarshaler ¶
func WithCtxMarshaler(m CtxMarshaler) Option
WithCtxMarshaler set the ctxMarshaler option.
func WithInitializedTimeout ¶
WithInitializedTimeout set the initializedTimeout option.
func WithInstantScanInterval ¶
WithInstantScanInterval set the instantScanInterval option.
func WithLoggerFactory ¶
WithLoggerFactory set the loggerFactory option.
func WithRunningTimeout ¶
WithRunningTimeout set the runningTimeout option.
func WithScanInterval ¶
WithScanInterval set the scanInterval option.
func WithStorageTimeout ¶
WithStorageTimeout set the storageTimeout option.
func WithWaitTimeout ¶
WithWaitTimeout set the waitTimeout option.
type Task ¶
type Task struct { ID uint64 TaskKey TaskKey TaskStatus TaskStatus Context []byte Argument []byte Extra TaskExtra CreatedAt time.Time UpdatedAt time.Time }
Task is an entity in database.
type TaskDefinition ¶
type TaskDefinition struct { // must provide, task handler Handler TaskHandler // optional, task argument type in the handler ArgType reflect.Type // optional, to replace default config CtxMarshaler CtxMarshaler // optional, max retry times before fail RetryTimes int // optional, retry interval RetryInterval func(times int) time.Duration // optional, determine whether the task will be cleaned immediately once succeeded CleanSucceeded bool // optional, determine whether the initialized task can still be scheduled after timeout InitTimeoutSensitive bool // contains filtered or unexported fields }
TaskDefinition is a definition of a certain task
type TaskHandler ¶
TaskHandler is a handler to a certain task
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is the overall processor of task, which includes scheduler, scanner and other components
func DefaultManager ¶
func DefaultManager() *TaskManager
DefaultManager returns the default task manager.
func NewTaskManager ¶
func NewTaskManager(db *gorm.DB, table string, options ...Option) *TaskManager
NewTaskManager generates a new instance of TaskManager.
The database and task table must be provided because this tool relies heavily on the database. For more information about the table schema, please refer to 'model.sql'.
func (*TaskManager) ForceRerunTasks ¶
func (s *TaskManager) ForceRerunTasks(taskIDs []uint64, status TaskStatus) (int64, error)
ForceRerunTasks changes specific tasks to 'initialized'.
func (*TaskManager) QueryUnsuccessfulTasks ¶
func (s *TaskManager) QueryUnsuccessfulTasks(limit, offset int) ([]Task, error)
QueryUnsuccessfulTasks checks initialized, running or failed tasks.
func (*TaskManager) Register ¶
func (s *TaskManager) Register(key TaskKey, definition TaskDefinition)
Register binds a task definition to a certain task key. Tasks of same type usually have the same task key.
Task key is a unique ID for a set of tasks with same definition. Task handler should be idempotent because a task may be scheduled more than once in some cases.
Handler must be provided in the task definition. It would be better to provide the argument type additionally, unless you want to use the default argument type(i.e. map[string]interface{} for struct) inside the handler.
func (*TaskManager) Run ¶
func (s *TaskManager) Run(ctx context.Context, key TaskKey, arg interface{}) error
Run provides the ability to asynchronously run a registered task reliably. It's an alternative to using 'go func( ){}' when you need to care about the ultimate success of a task.
An error is returned when the task creating process failed, otherwise, the task will be scheduled asynchronously later. If error or panic occurs in the running process, it will be rescheduled according to the 'RetryTimes' value. If the retry times exceeds the maximum config value, the task is marked 'failed' in the database with error logs recorded. In these cases, maybe a manual operation is essential.
The context passed in should be consistent with the 'ctxMarshaler' value defined in the overall configuration or the task definition.
func (*TaskManager) RunWithTx ¶
func (s *TaskManager) RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error
RunWithTx makes it possible to create a task along with other database operations in the same transaction. The task will be scheduled if the transaction is committed successfully, or canceled if the transaction is rolled backs. Thus, this is a simple implement for BASE that can be used in distributed transaction situations.
The task will be scheduled immediately after the transaction is committed if you use the builtin 'Transaction' function below. Otherwise, it will be scheduled later in the scan process.
You can create more than one task in a single transaction, like this:
_ = Transaction(func(tx *gorm.DB) error { if err:= doSomething(); err != nil{ // do something return err } if err := RunWithTx(); err != nil { // task1 return err } if err := RunWithTx(); err != nil { // task2 return err } return nil })
func (*TaskManager) Start ¶
func (s *TaskManager) Start()
Start starts the TaskManager. This function should be called before any other functions in a TaskManager is called.
func (*TaskManager) Stop ¶
func (s *TaskManager) Stop(wait bool)
Stop provides the ability to gracefully stop current running tasks. If you cannot tolerate task failure or loss in cases when a termination signal is received or the pod is migrated, it would be better to explicitly call this function before the main process exits. Otherwise, these tasks are easily to be killed and will be reported by abnormal task check process later.
The wait parameter determines whether to wait for all running tasks to complete.
func (*TaskManager) Transaction ¶
func (s *TaskManager) Transaction(fc func(tx *gorm.DB) error) (err error)
Transaction wraps the 'Transaction' function of *gorm.DB, providing the ability to schedule the tasks created inside once the transaction is committed successfully.
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of a task.
const ( TaskStatusUnKnown TaskStatus = "" TaskStatusInitialized TaskStatus = "initialized" TaskStatusRunning TaskStatus = "running" TaskStatusSucceeded TaskStatus = "succeeded" TaskStatusFailed TaskStatus = "failed" )
here are constants for task status