Documentation ¶
Index ¶
- Constants
- Variables
- func IsSameScope(left, right *common.ID) bool
- func JobName(jt JobType) string
- func NewParallelJobScheduler(parallism int) *parallelJobScheduler
- func NewPoolHandler(ctx context.Context, num int) *poolHandler
- func NewSingleWorkerHandler(ctx context.Context, name string) *singleWorkerHandler
- func NextTaskId() uint64
- func RegisterJobType(jt JobType, jn string)
- func RegisterType(t TaskType, name string)
- func TaskName(t TaskType) string
- type BaseDispatcher
- type BaseScheduler
- type BaseScopedDispatcher
- type BaseTask
- type BaseTaskHandler
- type Context
- type Dispatcher
- type FnTask
- type FuncT
- type Job
- func (job *Job) Close()
- func (job *Job) DoneWithErr(err error)
- func (job *Job) GetResult() *JobResult
- func (job *Job) ID() string
- func (job *Job) Init(ctx context.Context, id string, typ JobType, exec JobExecutor)
- func (job *Job) Reset()
- func (job *Job) Run()
- func (job *Job) String() string
- func (job *Job) Type() JobType
- func (job *Job) WaitDone() *JobResult
- type JobExecutor
- type JobResult
- type JobScheduler
- type JobType
- type MScopedTask
- type MultiScopedFnTask
- type Scheduler
- type ScopedFnTask
- type ScopedTask
- type ScopedTaskSharder
- type Task
- type TaskHandler
- type TaskScheduler
- type TaskType
- type TxnTaskFactory
Constants ¶
View Source
const ( JTAny JobType = iota JTCustomizedStart = 100 JTInvalid = 10000 )
Variables ¶
View Source
var ( ErrDispatcherNotFound = moerr.NewInternalErrorNoCtx("tae sched: dispatcher not found") ErrSchedule = moerr.NewInternalErrorNoCtx("tae sched: cannot schedule") )
View Source
var ( ErrBadTaskRequestPara = moerr.NewInternalErrorNoCtx("tae scheduler: bad task request parameters") ErrScheduleScopeConflict = moerr.NewInternalErrorNoCtx("tae scheduler: scope conflict") )
View Source
var DefaultScopeSharder = func(scope *common.ID) int { if scope == nil { return 0 } hasher := fnv.New64a() hasher.Write(types.EncodeUint64(&scope.TableID)) hasher.Write(types.EncodeUuid(scope.SegmentID())) return int(hasher.Sum64()) }
View Source
var (
ErrDispatchWrongTask = moerr.NewInternalErrorNoCtx("tae: wrong task type")
)
View Source
var (
ErrTaskHandleEnqueue = moerr.NewInternalErrorNoCtx("tae: task handle enqueue")
)
View Source
var SerialJobScheduler = new(simpleJobSceduler)
View Source
var WaitableCtx = &Context{Waitable: true}
Functions ¶
func IsSameScope ¶
func NewParallelJobScheduler ¶ added in v0.7.0
func NewParallelJobScheduler(parallism int) *parallelJobScheduler
func NewPoolHandler ¶
func NewSingleWorkerHandler ¶
func NextTaskId ¶
func NextTaskId() uint64
func RegisterJobType ¶ added in v0.8.0
func RegisterType ¶
Types ¶
type BaseDispatcher ¶
type BaseDispatcher struct {
// contains filtered or unexported fields
}
func NewBaseDispatcher ¶
func NewBaseDispatcher() *BaseDispatcher
func (*BaseDispatcher) Close ¶
func (d *BaseDispatcher) Close() error
func (*BaseDispatcher) Dispatch ¶
func (d *BaseDispatcher) Dispatch(task Task)
func (*BaseDispatcher) RegisterHandler ¶
func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler)
type BaseScheduler ¶
type BaseScheduler struct { ops.OpWorker Dispatchers map[TaskType]Dispatcher // contains filtered or unexported fields }
func NewBaseScheduler ¶
func NewBaseScheduler(ctx context.Context, name string) *BaseScheduler
func (*BaseScheduler) RegisterDispatcher ¶
func (s *BaseScheduler) RegisterDispatcher(t TaskType, dispatcher Dispatcher)
func (*BaseScheduler) Schedule ¶
func (s *BaseScheduler) Schedule(task Task) error
func (*BaseScheduler) Stop ¶
func (s *BaseScheduler) Stop()
type BaseScopedDispatcher ¶
type BaseScopedDispatcher struct {
// contains filtered or unexported fields
}
func NewBaseScopedDispatcher ¶
func NewBaseScopedDispatcher(sharder ScopedTaskSharder) *BaseScopedDispatcher
func (*BaseScopedDispatcher) AddHandle ¶
func (d *BaseScopedDispatcher) AddHandle(h TaskHandler)
func (*BaseScopedDispatcher) Close ¶
func (d *BaseScopedDispatcher) Close() error
func (*BaseScopedDispatcher) Dispatch ¶
func (d *BaseScopedDispatcher) Dispatch(task Task)
type BaseTaskHandler ¶
func NewBaseEventHandler ¶
func NewBaseEventHandler(ctx context.Context, name string) *BaseTaskHandler
func (*BaseTaskHandler) Close ¶
func (h *BaseTaskHandler) Close() error
func (*BaseTaskHandler) Enqueue ¶
func (h *BaseTaskHandler) Enqueue(task Task)
func (*BaseTaskHandler) Execute ¶
func (h *BaseTaskHandler) Execute(task Task)
type Dispatcher ¶
type Job ¶ added in v0.7.0
type Job struct {
// contains filtered or unexported fields
}
func (*Job) DoneWithErr ¶ added in v0.8.0
type JobExecutor ¶ added in v0.7.0
type JobScheduler ¶ added in v0.7.0
type MScopedTask ¶
type MultiScopedFnTask ¶
type MultiScopedFnTask struct { *FnTask // contains filtered or unexported fields }
func NewMultiScopedFnTask ¶
func (*MultiScopedFnTask) Scopes ¶
func (task *MultiScopedFnTask) Scopes() []common.ID
type ScopedFnTask ¶
type ScopedFnTask struct { *FnTask // contains filtered or unexported fields }
func NewScopedFnTask ¶
func (*ScopedFnTask) Scope ¶
func (task *ScopedFnTask) Scope() *common.ID
type ScopedTask ¶
type ScopedTaskSharder ¶
type TaskScheduler ¶
type TaskScheduler interface { Scheduler ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error) ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error) ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error) ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error) ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error) GetCheckpointedLSN() uint64 GetPenddingLSNCnt() uint64 GetCheckpointTS() types.TS }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.