tasks

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDispatcherNotFound = errors.New("tae sched: dispatcher not found")
	ErrSchedule           = errors.New("tae sched: cannot schedule")
)
View Source
var (
	ErrBadTaskRequestPara    = errors.New("tae scheduler: bad task request parameters")
	ErrScheduleScopeConflict = errors.New("tae scheduler: scope conflict")
)
View Source
var DefaultScopeSharder = func(scope *common.ID) int {
	if scope == nil {
		return 0
	}
	return int(scope.TableID + scope.SegmentID)
}
View Source
var (
	ErrDispatchWrongTask = errors.New("tae: wrong task type")
)
View Source
var (
	ErrTaskHandleEnqueue = errors.New("tae: task handle enqueue")
)
View Source
var WaitableCtx = &Context{Waitable: true}

Functions

func IsSameScope

func IsSameScope(left, right *common.ID) bool

func NewPoolHandler

func NewPoolHandler(num int) *poolHandler

func NewSingleWorkerHandler

func NewSingleWorkerHandler(name string) *singleWorkerHandler

func NextTaskId

func NextTaskId() uint64

func RegisterType

func RegisterType(t TaskType, name string)

func TaskName

func TaskName(t TaskType) string

Types

type BaseDispatcher

type BaseDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseDispatcher

func NewBaseDispatcher() *BaseDispatcher

func (*BaseDispatcher) Close

func (d *BaseDispatcher) Close() error

func (*BaseDispatcher) Dispatch

func (d *BaseDispatcher) Dispatch(task Task)

func (*BaseDispatcher) RegisterHandler

func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler)

type BaseScheduler

type BaseScheduler struct {
	ops.OpWorker

	Dispatchers map[TaskType]Dispatcher
	// contains filtered or unexported fields
}

func NewBaseScheduler

func NewBaseScheduler(name string) *BaseScheduler

func (*BaseScheduler) RegisterDispatcher

func (s *BaseScheduler) RegisterDispatcher(t TaskType, dispatcher Dispatcher)

func (*BaseScheduler) Schedule

func (s *BaseScheduler) Schedule(task Task) error

func (*BaseScheduler) Stop

func (s *BaseScheduler) Stop()

type BaseScopedDispatcher

type BaseScopedDispatcher struct {
	// contains filtered or unexported fields
}

func NewBaseScopedDispatcher

func NewBaseScopedDispatcher(sharder ScopedTaskSharder) *BaseScopedDispatcher

func (*BaseScopedDispatcher) AddHandle

func (d *BaseScopedDispatcher) AddHandle(h TaskHandler)

func (*BaseScopedDispatcher) Close

func (d *BaseScopedDispatcher) Close() error

func (*BaseScopedDispatcher) Dispatch

func (d *BaseScopedDispatcher) Dispatch(task Task)

type BaseTask

type BaseTask struct {
	ops.Op
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(impl Task, taskType TaskType, ctx *Context) *BaseTask

func (*BaseTask) Cancel

func (task *BaseTask) Cancel() (err error)

func (*BaseTask) Execute

func (task *BaseTask) Execute() (err error)

func (*BaseTask) ID

func (task *BaseTask) ID() uint64

func (*BaseTask) Name

func (task *BaseTask) Name() string

func (*BaseTask) Type

func (task *BaseTask) Type() TaskType

type BaseTaskHandler

type BaseTaskHandler struct {
	ops.OpWorker
}

func NewBaseEventHandler

func NewBaseEventHandler(name string) *BaseTaskHandler

func (*BaseTaskHandler) Close

func (h *BaseTaskHandler) Close() error

func (*BaseTaskHandler) Enqueue

func (h *BaseTaskHandler) Enqueue(task Task)

func (*BaseTaskHandler) Execute

func (h *BaseTaskHandler) Execute(task Task)

type Context

type Context struct {
	DoneCB   ops.OpDoneCB
	Waitable bool
}

type Dispatcher

type Dispatcher interface {
	io.Closer
	Dispatch(Task)
}

type FnTask

type FnTask struct {
	*BaseTask
	Fn FuncT
}

func NewFnTask

func NewFnTask(ctx *Context, taskType TaskType, fn FuncT) *FnTask

func (*FnTask) Execute

func (task *FnTask) Execute() error

type FuncT

type FuncT = func() error

type MScopedTask

type MScopedTask interface {
	Task
	Scopes() []common.ID
}

type MultiScopedFnTask

type MultiScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewMultiScopedFnTask

func NewMultiScopedFnTask(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) *MultiScopedFnTask

func (*MultiScopedFnTask) Scopes

func (task *MultiScopedFnTask) Scopes() []common.ID

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	Schedule(Task) error
}

type ScopedFnTask

type ScopedFnTask struct {
	*FnTask
	// contains filtered or unexported fields
}

func NewScopedFnTask

func NewScopedFnTask(ctx *Context, taskType TaskType, scope *common.ID, fn FuncT) *ScopedFnTask

func (*ScopedFnTask) Scope

func (task *ScopedFnTask) Scope() *common.ID

type ScopedTask

type ScopedTask interface {
	Task
	Scope() *common.ID
}

type ScopedTaskSharder

type ScopedTaskSharder = func(scope *common.ID) int

type Task

type Task interface {
	base.IOp
	ID() uint64
	Type() TaskType
	Cancel() error
	Name() string
}

type TaskHandler

type TaskHandler interface {
	io.Closer
	Start()
	Enqueue(Task)
	Execute(Task)
}

type TaskScheduler

type TaskScheduler interface {
	Scheduler
	ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error)
	ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error)
	ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error)
	ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error)
	Checkpoint(indexes []*wal.Index) error

	GetCheckpointedLSN() uint64
	GetPenddingLSNCnt() uint64
	GetSafeTS() uint64
}

type TaskType

type TaskType uint16
const (
	NoopTask TaskType = iota
	MockTask
	CustomizedTask

	DataCompactionTask
	CheckpointTask
	GCTask
	IOTask
)

type TxnTaskFactory

type TxnTaskFactory = func(ctx *Context, txn txnif.AsyncTxn) (Task, error)

Directories

Path Synopsis
ops

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL