Documentation ¶
Index ¶
- Variables
- func AddArgs(args ...Arg) options.Option
- func AddCallbackOnError(tasks ...*Signature) options.Option
- func AddCallbackOnSuccess(tasks ...*Signature) options.Option
- func ConvertResult(result []*Result) ([]reflect.Value, error)
- func FormatResult(values []reflect.Value) string
- func HumanReadableResults(results []reflect.Value) string
- func NewErrNonsupportType(valueType string) error
- func ReflectTaskResults(taskResults []*Result) ([]reflect.Value, error)
- func ReflectValue(valueType string, value interface{}) (reflect.Value, error)
- func SetArgs(args ...Arg) options.Option
- func SetCallbackOnError(tasks ...*Signature) options.Option
- func SetCallbackOnSuccess(tasks ...*Signature) options.Option
- func SetETATime(after *time.Time) options.Option
- func SetGroupID(id string) options.Option
- func SetIgnoreNotRegisteredTask(register bool) options.Option
- func SetMeta(meta *Meta) options.Option
- func SetMetaSafe(safe bool) options.Option
- func SetPriority(priority uint8) options.Option
- func SetRetryCount(count int) options.Option
- func SetRetryInterval(interval int) options.Option
- func SetRouter(router string) options.Option
- func SetStopTaskDeletionOnError(deleteOnErr bool) options.Option
- func SetTriggerChord(task *Signature) options.Option
- func ValidateTask(task interface{}) error
- type Arg
- type Chain
- type ErrRetryTaskLater
- type Group
- type GroupCallback
- type GroupMeta
- type Meta
- type Processor
- type Result
- type Results
- type Retrievable
- type Signature
- type State
- type Status
- func NewFailureState(task *Signature, err string) *Status
- func NewPendingState(task *Signature) *Status
- func NewReceivedState(task *Signature) *Status
- func NewRetryState(task *Signature) *Status
- func NewStartedState(task *Signature) *Status
- func NewSuccessState(task *Signature, results ...*Result) *Status
- type StringSlice
- type Task
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskMustFunc = errors.New("task type must func") ErrTaskReturnNoValue = errors.New("task return is no value") ErrTaskReturnNoErr = errors.New("task return last values is must be error") )
var (
ErrDispatching = errors.New("dispatch task err")
)
Functions ¶
func AddCallbackOnError ¶
AddCallbackOnError 追加失败后回调
func AddCallbackOnSuccess ¶
AddCallbackOnSuccess 追加成功后回调
func ConvertResult ¶
ConvertResult 将Result类型转换成reflect.Value
func FormatResult ¶
FormatResult 将reflect.Value转换为可读答案
func HumanReadableResults ¶
HumanReadableResults ...
func NewErrNonsupportType ¶
func ReflectTaskResults ¶
ReflectTaskResults ...
func ReflectValue ¶
ReflectValue converts interface{} to reflect.Value based on string type
func SetCallbackOnError ¶
SetCallbackOnError 设置失败后回调
func SetCallbackOnSuccess ¶
SetCallbackOnSuccess 设置成功后回调
func SetIgnoreNotRegisteredTask ¶
SetIgnoreNotRegisteredTask 设置任务未注册是否忽略
func SetStopTaskDeletionOnError ¶
SetStopTaskDeletionOnError 设置任务出错后是否删除
func ValidateTask ¶
func ValidateTask(task interface{}) error
Types ¶
type Arg ¶
type Arg struct { Type string `json:"type" bson:"type"` Key string `json:"key" bson:"key"` Value interface{} `json:"value" bson:"value"` }
Arg task中的参数
type ErrRetryTaskLater ¶
type ErrRetryTaskLater struct {
// contains filtered or unexported fields
}
ErrRetryTaskLater 重试错误
func NewErrRetryTaskLater ¶
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater
NewErrRetryTaskLater 生成重试错误
func (ErrRetryTaskLater) RetryIn ¶
func (e ErrRetryTaskLater) RetryIn() time.Duration
RetryIn 返回重试时间,从现在开始到执行的间隔
type GroupCallback ¶
GroupCallback 具有回调任务的任务组
func NewGroupCallback ¶
func NewGroupCallback(group *Group, name string, callback *Signature) (*GroupCallback, error)
NewGroupCallback 创建具有回调任务的个任务组
type GroupMeta ¶
type GroupMeta struct { ID uint `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"` // GroupID 组的唯一标识 GroupID string `json:"group_id" bson:"_id" gorm:"column:id;index;comment:id"` // 组名称 Name string `json:"name" bson:"name" gorm:"column:name;comment:组名称"` // TaskIDs 接管的任务id TaskIDs StringSlice `json:"task_ids" bson:"task_ids" gorm:"column:task_ids;comment:接管的任务id;type:text"` // TriggerCompleted 是否触发完成 TriggerCompleted bool `json:"trigger_chord" bson:"trigger_chord" gorm:"column:trigger_chord;comment:是否触发完成"` // Lock 是否锁定 Lock bool `json:"lock" gorm:"column:lock;comment:锁"` // TTL 有效时间 TTL int64 `json:"ttl,omitempty" bson:"ttl,omitempty" gorm:"column:ttl;comment:过期时间"` // CreateAt 创建时间 CreateAt time.Time `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"` DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"` }
GroupMeta 组详情
type Processor ¶
type Processor interface { // Process 处理程序 Process(t *Signature) error // ConsumeQueue 消费队列 ConsumeQueue() string // PreConsumeHandler 是否进行预处理 PreConsumeHandler() bool }
Processor 任务处理器
type Result ¶
type Result struct { // Type 标注返回的类型 Type string `json:"type" bson:"type"` // Value 根据type解压value Value interface{} `json:"value" bson:"value"` }
Result 任务返回携带的kv键值对
type Retrievable ¶
type Signature ¶
type Signature struct { // ID 任务唯一id,要保证多实例中id唯一 ID string `json:"id" bson:"_id"` // Name 任务名称 Name string `json:"name" bson:"name"` // GroupID 多集群中组id GroupID string `json:"group_id" bson:"groupID"` // GroupTaskCount 组中任务计数 GroupTaskCount int `json:"group_task_count" bson:"group_task_count"` // Priority 任务优先级 Priority uint8 `json:"priority" bson:"priority"` // RetryCount 重试次数 RetryCount int `json:"retry_count" bson:"retry_count"` // RetryInterval 重试间隔时间 RetryInterval int `json:"retry_timeout" bson:"retry_timeout"` // StopTaskDeletionOnError 任务出错后删除 StopTaskDeletionOnError bool `json:"stop_task_deletion_on_error" bson:"stop_task_deletion_on_error"` // IgnoreNotRegisteredTask 忽略未注册的任务 IgnoreNotRegisteredTask bool `json:"not_registered" bson:"not_registered"` // Router 路由 Router string `json:"router" bson:"router"` // Args 携带参数 Args []Arg `json:"args" bson:"args"` // MetaSafe 安全的Meta MetaSafe bool `json:"meta_safe" bson:"meta_safe"` // Meta 携带原信息 Meta *Meta `json:"meta" bson:"meta"` // ETA 延时任务 ETA *time.Time `json:"eta" bson:"eta"` // CallbackChord 组任务回调 CallbackChord *Signature `json:"callback_chord" bson:"callback_chord"` // CallbackOnSuccess 任务成功后回调 CallbackOnSuccess []*Signature `json:"callback_on_success" bson:"callback_on_success"` // CallbackOnError 任务失败后回调 CallbackOnError []*Signature `json:"callback_on_error" bson:"callback_on_error"` }
func CopySignature ¶
func CopySignatures ¶
func NewSignature ¶
NewSignature 创建Signature
func SignatureFromContext ¶
SignatureFromContext 获取上下文任务签名
type Status ¶
type Status struct { ID uint `json:"-" bson:"-" gorm:"column:_id;primarykey;comment:_id"` TaskID string `json:"task_id" bson:"_id" gorm:"column:id;index;comment:id"` GroupID string `json:"group_id" bson:"group_id" gorm:"column:group_id;comment:组唯一标识"` Name string `json:"name" bson:"name" gorm:"column:name;comment:组名称"` Status State `json:"status" bson:"status" gorm:"column:status;comment:任务状态"` TTL int64 `json:"ttl" bson:"ttl" gorm:"column:ttl;comment:过期时间"` Error string `json:"error" bson:"error" gorm:"column:error;comment:错误"` Results Results `json:"results" bson:"results" gorm:"column:results;comment:结果;type:text"` CreateAt time.Time `json:"create_at" bson:"create_at" gorm:"column:create_at;comment:创建时间"` DeletedAt gorm.DeletedAt `json:"-" bson:"-" gorm:"index"` }
Status 任务状态
func NewFailureState ¶
NewFailureState 创建Failure状态
func NewReceivedState ¶
NewReceivedState 创建Received状态
func NewSuccessState ¶
NewSuccessState 创建Success状态
func (*Status) IsCompleted ¶
type StringSlice ¶
type StringSlice []string
func (*StringSlice) Scan ¶
func (s *StringSlice) Scan(src interface{}) error
type Task ¶
type Task struct { // TaskFunc 执行任务的函数 TaskFunc reflect.Value // UseContext 是否使用上下文 UseContext bool // Context 上下文信息 Context context.Context // Args 执行任务需要的参数 Args []reflect.Value }
func NewTaskWithSignature ¶
NewTaskWithSignature 初始化Task通过Signature
func (*Task) TransformArgs ¶
TransformArgs 将[]Arg转化为[]reflect.Value