task

package
v0.5.0-beta2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2019 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrExecuting = errors.New("is executing")
View Source
var ErrOverMaxRetry = errors.New("over max retry")

Functions

This section is empty.

Types

type Awaken added in v0.2.0

type Awaken struct {
	TaskKey string
}

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

func (Context) KeepLive

func (cb Context) KeepLive(ctx context.Context) error

func (Context) Task added in v0.2.0

func (cb Context) Task() Task

type ExecOption

type ExecOption struct {
	ExpectStartTime    *time.Time     `bson:"expectRetryTime,omitempty"`
	MaxExecDuration    *time.Duration `bson:"maxExecDuration,omitempty"`
	MaxRecover         *int32         `bson:"remainExec,omitempty"`
	CompensateDuration *time.Duration `bson:"compensateDuration,omitempty"`
}

func (ExecOption) AddRemainExecCount

func (e ExecOption) AddRemainExecCount(delta int32) ExecOption

func (ExecOption) GetExecutingSchedule

func (e ExecOption) GetExecutingSchedule() Schedule

func (ExecOption) GetExpectStartSchedule

func (e ExecOption) GetExpectStartSchedule() Schedule

func (ExecOption) GetExpectTime

func (e ExecOption) GetExpectTime() time.Time

func (ExecOption) Merge

func (e ExecOption) Merge(option ExecOption) ExecOption

func (ExecOption) SetCompensateDuration

func (e ExecOption) SetCompensateDuration(dur time.Duration) ExecOption

func (ExecOption) SetExpectStartTime

func (e ExecOption) SetExpectStartTime(expect time.Time) ExecOption

func (ExecOption) SetFinished

func (e ExecOption) SetFinished() ExecOption

func (ExecOption) SetMaxExecDuration

func (e ExecOption) SetMaxExecDuration(dur time.Duration) ExecOption

func (ExecOption) SetMaxRecoverCount

func (e ExecOption) SetMaxRecoverCount(count int32) ExecOption

type Execution

type Execution struct {
	Available    bool       `bson:"available"`
	Config       ExecOption `bson:"config"`
	CreateTime   time.Time  `bson:"createTime,omitempty"`
	StartTime    time.Time  `bson:"startTime,omitempty"`
	EndTime      time.Time  `bson:"endTime,omitempty"`
	Result       Result     `bson:"result,omitempty"`
	LastKeepLive time.Time  `bson:"lastKeepLive,omitempty"`
}

func (*Execution) End added in v0.1.2

func (er *Execution) End(result Result, t time.Time)

func (*Execution) Ended

func (er *Execution) Ended() bool

func (*Execution) Executing added in v0.1.2

func (er *Execution) Executing() bool

func (*Execution) IsDead

func (er *Execution) IsDead(t time.Time) bool

func (*Execution) OverExecTime added in v0.1.5

func (er *Execution) OverExecTime(t time.Time) bool

func (*Execution) ReadyToStart

func (er *Execution) ReadyToStart() bool

func (*Execution) Start added in v0.1.2

func (er *Execution) Start(t time.Time)

func (*Execution) WaitingStart

func (er *Execution) WaitingStart() bool

type Executor

type Executor interface {
	Execute(ctx Context, res *Result)
}

type ExecutorFunc

type ExecutorFunc func(cb Context, res *Result)

func (ExecutorFunc) Execute

func (fe ExecutorFunc) Execute(cb Context, res *Result)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(scheduler Scheduler, executor Executor, opt ManagerOption) *Manager

func (*Manager) ApplyNewTask

func (svc *Manager) ApplyNewTask(ctx context.Context, taskKey string, option ...Option) error

func (*Manager) Close added in v0.3.0

func (svc *Manager) Close(ctx context.Context) error

func (*Manager) CloseTask added in v0.2.0

func (svc *Manager) CloseTask(ctx context.Context, taskKey string) error

func (*Manager) KeepLive

func (svc *Manager) KeepLive(ctx context.Context, task Task) error

func (*Manager) TaskCallback

func (svc *Manager) TaskCallback(ctx context.Context, task Task, result Result) error

type ManagerOption

type ManagerOption struct {
	MaxBusterTask           int
	DefaultExecMaxDuration  time.Duration
	DefaultRecoverCount     int32
	DefaultKeepLiveDuration time.Duration
	WaitCloseDuration       time.Duration
	Middle                  []Middle
}

func DefaultManagerOption

func DefaultManagerOption() ManagerOption

func (ManagerOption) CompleteWith

func (mOpt ManagerOption) CompleteWith(dmp ManagerOption) ManagerOption

func (ManagerOption) Option

func (mOpt ManagerOption) Option() Option

type MemoScheduler

type MemoScheduler struct {
	// contains filtered or unexported fields
}

func NewMemoScheduler

func NewMemoScheduler(defaultKeepLiveDuration time.Duration) *MemoScheduler

func (*MemoScheduler) Close added in v0.3.0

func (scheduler *MemoScheduler) Close(ctx context.Context) error

func (*MemoScheduler) NewStageID added in v0.3.0

func (scheduler *MemoScheduler) NewStageID(ctx context.Context, taskKey string) (seq int64, err error)

func (*MemoScheduler) ReadTask added in v0.1.5

func (scheduler *MemoScheduler) ReadTask(ctx context.Context, taskKey string) (*Task, error)

func (*MemoScheduler) RemoveTaskSchedule

func (scheduler *MemoScheduler) RemoveTaskSchedule(ctx context.Context, task Task) error

func (*MemoScheduler) ScheduleTask added in v0.1.5

func (scheduler *MemoScheduler) ScheduleTask(ctx context.Context, task Task, cover bool) error

func (*MemoScheduler) TaskSummery added in v0.1.5

func (scheduler *MemoScheduler) TaskSummery(ctx context.Context) (*Summery, error)

func (*MemoScheduler) WaitTaskAwaken

func (scheduler *MemoScheduler) WaitTaskAwaken(ctx context.Context) (awaken Awaken, err error)

type Meta

type Meta struct {
	Tags       []string   `bson:"tags"`
	ExecOption ExecOption `bson:"execDesc"`
	CreateTime time.Time  `bson:"createTime"`
	StartCount int32      `bson:"restartCount"`
	ExecCount  int32      `bson:"execCount"`
	Exclusive  bool       `bson:"exclusive"`
}

func (Meta) CanRestart

func (info Meta) CanRestart() bool

type Middle

type Middle interface {
	WrapScheduler(scheduler Scheduler) Scheduler
	WrapExecutor(executor Executor) Executor
}

func NewRecorderMiddle

func NewRecorderMiddle(factory record.Factory) Middle

type Option

type Option struct {
	Exec    ExecOption
	Overlap *bool
	Tags    *[]string
}

func (Option) Merge

func (opt Option) Merge(option Option) Option

func (Option) SetExpectStartTime

func (opt Option) SetExpectStartTime(expect time.Time) Option

func (Option) SetKeepLiveDuration

func (opt Option) SetKeepLiveDuration(duration time.Duration) Option

func (Option) SetMaxExecDuration

func (opt Option) SetMaxExecDuration(maxExec time.Duration) Option

func (Option) SetMaxRecoverCount

func (opt Option) SetMaxRecoverCount(count int32) Option

func (Option) SetOverLap

func (opt Option) SetOverLap(over bool) Option

func (Option) SetTags

func (opt Option) SetTags(tags []string) Option

type Result

type Result struct {
	ResultInfo string     `bson:"resultInfo"`
	ResultCode int64      `bson:"resultCode"`
	Continue   bool       `bson:"continue"`
	NextExec   ExecOption `bson:"next,omitempty"`
}

func (*Result) Finish

func (er *Result) Finish()

func (*Result) ReturnWithLive

func (er *Result) ReturnWithLive(keepLive time.Duration) *Result

keep current task alive and return

func (*Result) SetMaxDuration

func (er *Result) SetMaxDuration(maxDuration time.Duration) *Result

func (*Result) SetMaxRecover

func (er *Result) SetMaxRecover(maxRecover int32) *Result

func (*Result) WaitAndReDo

func (er *Result) WaitAndReDo(wait time.Duration) *Result

type Schedule added in v0.1.5

type Schedule struct {
	AwakenTime         time.Time     `bson:"awaken"`
	CompensateDuration time.Duration `bson:"compensate"`
}

type ScheduleTestSuit

type ScheduleTestSuit struct {
	New func() Scheduler
}

func (*ScheduleTestSuit) TestSchedulerBasic

func (suit *ScheduleTestSuit) TestSchedulerBasic(t *testing.T)

func (*ScheduleTestSuit) TestSchedulerOverWriteTime

func (suit *ScheduleTestSuit) TestSchedulerOverWriteTime(t *testing.T)

type Scheduler

type Scheduler interface {
	ScheduleTask(ctx context.Context, task Task, cover bool) error
	RemoveTaskSchedule(ctx context.Context, task Task) error
	WaitTaskAwaken(ctx context.Context) (awaken Awaken, err error)
	ReadTask(ctx context.Context, taskKey string) (*Task, error)
	NewStageID(ctx context.Context, taskKey string) (stageID int64, err error)
	Close(ctx context.Context) error
}

type StatusCode added in v0.1.5

type StatusCode int
const (
	StatusUnknown         StatusCode = 0
	StatusUnavailable     StatusCode = 1
	StatusWaitingExec     StatusCode = 2
	StatusExecuting       StatusCode = 3
	StatusExecuteFinished StatusCode = 4
)

func (StatusCode) String added in v0.1.5

func (status StatusCode) String() string

type Summery

type Summery struct {
	StatusCount map[StatusCode]int64
}

type Task

type Task struct {
	Key       string    `bson:"_id"`
	Stage     int64     `bson:"stage"`
	Schedule  Schedule  `bson:"schedule"`  // schedule info for scheduler to schedule
	Meta      Meta      `bson:"meta"`      // task meta info
	Execution Execution `bson:"execution"` // current execution info

}

func (*Task) NewExec added in v0.2.0

func (t *Task) NewExec(stageID int64, option ExecOption)

func (*Task) Status

func (t *Task) Status() StatusCode

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL