Documentation ¶
Index ¶
- Constants
- Variables
- func FakeUniqueID() string
- func IFaceToString(value interface{}) string
- type DefaultTaskSetting
- type DefaultTaskSettingWithoutTimeout
- type FailedJobHandler
- type JobIFace
- type JobMemory
- func (job *JobMemory) Attempts() (attempt int64)
- func (job *JobMemory) Delete() (err error)
- func (job *JobMemory) Failed(err error)
- func (job *JobMemory) GetName() (queueName string)
- func (job *JobMemory) HasFailed() (hasFail bool)
- func (job *JobMemory) IsDeleted() (deleted bool)
- func (job *JobMemory) IsReleased() (released bool)
- func (job *JobMemory) MarkAsFailed()
- func (job *JobMemory) Payload() (payload *Payload)
- func (job *JobMemory) PopTime() (time time.Time)
- func (job *JobMemory) Queue() (queue QueueIFace)
- func (job *JobMemory) Release(delay int64) (err error)
- func (job *JobMemory) Timeout() (time time.Duration)
- func (job *JobMemory) TimeoutAt() (time time.Time)
- type JobRedis
- func (job *JobRedis) Attempts() (attempt int64)
- func (job *JobRedis) Delete() (err error)
- func (job *JobRedis) Failed(err error)
- func (job *JobRedis) GetName() (queueName string)
- func (job *JobRedis) HasFailed() (hasFail bool)
- func (job *JobRedis) IsDeleted() (deleted bool)
- func (job *JobRedis) IsReleased() (released bool)
- func (job *JobRedis) MarkAsFailed()
- func (job *JobRedis) Payload() (payload *Payload)
- func (job *JobRedis) PopTime() (time time.Time)
- func (job *JobRedis) Queue() (queue QueueIFace)
- func (job *JobRedis) Release(delay int64) (err error)
- func (job *JobRedis) Timeout() (time time.Duration)
- func (job *JobRedis) TimeoutAt() (time time.Time)
- type Logger
- type Payload
- type Queue
- func (q *Queue) Bootstrap(tasks []TaskIFace) error
- func (q *Queue) BootstrapOne(task TaskIFace) error
- func (q *Queue) Delay(task TaskIFace, payload interface{}, duration time.Duration) error
- func (q *Queue) DelayAt(task TaskIFace, payload interface{}, delay time.Time) error
- func (q *Queue) DelayAtByName(name string, payload interface{}, delay time.Time) error
- func (q *Queue) DelayByName(name string, payload interface{}, duration time.Duration) error
- func (q *Queue) Dispatch(task TaskIFace, payload interface{}) error
- func (q *Queue) DispatchByName(name string, payload interface{}) error
- func (q *Queue) SetFailedJobHandler(failedJobHandler FailedJobHandler)
- func (q *Queue) SetHighPriorityTask(task TaskIFace) error
- func (q *Queue) ShutDown(ctx context.Context) error
- func (q *Queue) Size(task TaskIFace) int64
- func (q *Queue) Start() error
- type QueueIFace
- type RawBody
- type TaskIFace
Constants ¶
const ( DefaultMaxExecuteDuration = 900 * time.Second // job任务执行时长极限预警值:15分钟 DefaultMaxTries = 1 // 默认最大重试次数:1次<即不重试> DefaultRetryInterval = 60 // 默认下次任务重试间隔:1分钟<即可多次执行任务失败后下一次尝试是在60秒后> )
定义常量
const ( Redis = "redis" Memory = "memory" )
queue队列支持的底层驱动名称常量 后续扩充mq、sqs、db等在此添加常量并实现 QueueIFace 接口予以关联
Variables ¶
var ( // ErrQueueClosed 队列处于优雅关闭或关闭状态错误 ErrQueueClosed = errors.New("queue.error.queue.closed") // ErrMaxAttemptsExceeded 尝试执行次数超限 ErrMaxAttemptsExceeded = errors.New("queue.max.execute.attempts") // ErrAbortForWaitingPrevJobFinish 等待上一次任务执行结束退出 ErrAbortForWaitingPrevJobFinish = errors.New("queue.abort.for.waiting.prev.job.finish") )
Functions ¶
func FakeUniqueID ¶
func FakeUniqueID() string
FakeUniqueID 生成一个V4版本的uuid字符串,生成失败返回时间戳纳秒 UUID单机足以保障唯一,生成失败场景下纳秒时间戳也可以一定程度上保障单机唯一
Types ¶
type DefaultTaskSetting ¶
type DefaultTaskSetting struct{}
DefaultTaskSetting 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长
func (*DefaultTaskSetting) MaxTries ¶
func (task *DefaultTaskSetting) MaxTries() int64
MaxTries 默认最大尝试次数1,即投递的任务仅执行1次
func (*DefaultTaskSetting) RateAllow ¶
func (task *DefaultTaskSetting) RateAllow() bool
RateAllow 任务限流方法,默认不限流
func (*DefaultTaskSetting) RetryInterval ¶
func (task *DefaultTaskSetting) RetryInterval() int64
RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认60秒后重试
func (*DefaultTaskSetting) Timeout ¶
func (task *DefaultTaskSetting) Timeout() time.Duration
Timeout 任务最大执行超时时长:默认超时时长为900秒
type DefaultTaskSettingWithoutTimeout ¶
type DefaultTaskSettingWithoutTimeout struct{}
DefaultTaskSettingWithoutTimeout 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长
func (*DefaultTaskSettingWithoutTimeout) MaxTries ¶
func (task *DefaultTaskSettingWithoutTimeout) MaxTries() int64
MaxTries 默认最大尝试次数1,即投递的任务仅执行1次
func (*DefaultTaskSettingWithoutTimeout) RateAllow ¶
func (task *DefaultTaskSettingWithoutTimeout) RateAllow() bool
RateAllow 任务限流方法,默认不限流
func (*DefaultTaskSettingWithoutTimeout) RetryInterval ¶
func (task *DefaultTaskSettingWithoutTimeout) RetryInterval() int64
RetryInterval 当任务执行失败后再次尝试执行的间隔时长,默认立即重试,即间隔时长为0秒
type FailedJobHandler ¶
FailedJobHandler 失败任务记录|处理回调方法 @param *Payload 失败job的对象信息 @param error job任务失败的error报错信息
type JobIFace ¶
type JobIFace interface { Release(delay int64) (err error) // 释放任务:将任务重新放入队列 Delete() (err error) // 删除任务:任务不再执行 IsDeleted() (deleted bool) // 检查任务是否已删除 IsReleased() (released bool) // 检查任务是否已释放 Attempts() (attempt int64) // 获取任务已尝试执行过的次数 PopTime() (time time.Time) // 获取任务首次被pop取出的时刻 Timeout() (time time.Duration) // 任务超时时长 TimeoutAt() (time time.Time) // 任务执行超时的时刻 HasFailed() (hasFail bool) // 检测当前job任务执行是否出现了错误 MarkAsFailed() // 设置当前job任务执行出现了错误 Failed(err error) // 设置任务执行失败 Queue() (queue QueueIFace) // 获取job任务所属队列queue句柄 GetName() (queueName string) // 获取job所属队列名称 Payload() (payload *Payload) // 获取任务执行参数payload }
JobIFace 基于不同技术栈的队列任务Job实现契约
type JobMemory ¶
type JobMemory struct {
// contains filtered or unexported fields
}
func (*JobMemory) IsReleased ¶
func (*JobMemory) MarkAsFailed ¶
func (job *JobMemory) MarkAsFailed()
func (*JobMemory) Queue ¶
func (job *JobMemory) Queue() (queue QueueIFace)
type JobRedis ¶
type JobRedis struct {
// contains filtered or unexported fields
}
func (*JobRedis) IsReleased ¶
func (*JobRedis) MarkAsFailed ¶
func (job *JobRedis) MarkAsFailed()
func (*JobRedis) Queue ¶
func (job *JobRedis) Queue() (queue QueueIFace)
type Logger ¶
type Logger interface { // Debug debug级别输出的日志 // - msg 日志消息文本描述 // - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数> Debug(msg string, keyValue ...string) // Info info级别输出的日志 // - msg 日志消息文本描述 // - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数> Info(msg string, keyValue ...string) // Warn warn级别输出的日志 // - msg 日志消息文本描述 // - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数> Warn(msg string, keyValue ...string) // Error error级别输出的日志 // - msg 日志消息文本描述 // - keyValue 按顺序一个key一个value,len(keyValue)一定是偶数<注意0也是偶数> Error(msg string, keyValue ...string) }
Logger 日志接口定义
type Payload ¶
type Payload struct { Name string `json:"Name"` // 队列名称 ID string `json:"ID"` // 任务ID MaxTries int64 `json:"MaxTries"` // 任务最大尝试次数,默认1 RetryInterval int64 `json:"RetryInterval"` // 当任务最大允许尝试次数大于0时,下次尝试之前的间隔时长,单位:秒 Attempts int64 `json:"Attempts"` // 任务已被尝试执行的的次数 Payload []byte `json:"Payload"` // 任务参数比特字面量,可decode成具体job被execute时的类型 PopTime int64 `json:"PopTime"` // 任务首次被取出执行的时间戳,取出的时候才去设置 Timeout int64 `json:"Timeout"` // 任务最大执行超时时长,单位:秒 TimeoutAt int64 `json:"TimeoutAt"` // 任务超时时刻时间戳,被执行时刻才会去设置 }
Payload 存储于队列中的job任务结构
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue 队列struct
func New ¶
New 初始化一个队列
@param driver 队列实现底层驱动,可选值见上方14行附近位置的常量 @param conn driver对应底层驱动连接器句柄,具体类型参考 QueueIFace 实体类 @param logger 实现 Logger 接口的结构体实例的指针对象 @param concurrent 单个队列最大并发消费数
func (*Queue) DelayAtByName ¶
DelayAtByName 按任务name投递一个延迟队列Job任务
- 投递一个异步延迟执行的任务
- 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用DelayAt方法
func (*Queue) DelayByName ¶
DelayByName 按任务name投递一个将来时刻执行的延迟队列Job任务
- 投递一个异步延迟执行的任务
- 重要提示:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Delay方法
func (*Queue) DispatchByName ¶
DispatchByName 按任务name投递一个队列Job任务
- 投递一个异步立即执行的任务
- 重要:使用该方法则意味着投递任务之前必须bootstrap任务类,新项目请尽量使用Dispatch方法
func (*Queue) SetFailedJobHandler ¶
func (q *Queue) SetFailedJobHandler(failedJobHandler FailedJobHandler)
SetFailedJobHandler 设置失败任务的收尾处理器
1、尝试了指定的最大尝试次数后仍然失败的任务善后方法 2、此时通过此处设置的处理器可记录到底哪个任务失败了以及失败任务的payload参数情况 3、以及后续的重试等逻辑等
func (*Queue) SetHighPriorityTask ¶
SetHighPriorityTask 指定高优先级的Job任务,多次调用可以设置多个高优先级Job任务
- ① 当队列消费者消费速度过慢,任务堆积时被指定的高优先级Job将尽量保障优先执行
- ② 虽然此处可以指定队列job的高优先级执行,但也不保障待执行任务过多堆积时优先级任务一定会被执行,所以高优先级Job不要指定的过多
type QueueIFace ¶
type QueueIFace interface { // Size 获取当前队列长度方法 // @param queue 队列的名称 Size(queue string) (size int64) // Push 投递一条任务到队列方法 // @param queue 队列的名称 // @param payload 投递进队列的参数负载 Push(queue string, payload interface{}) (err error) // Later 投递一条指定延长时长的延迟任务到队列的方法 // @param queue 延迟队列的名称 // @param durationTo 相对于投递任务时刻延迟的时长 // @param payload 投递进队列的多个参数负载 Later(queue string, durationTo time.Duration, payload interface{}) (err error) // LaterAt 投递一条指定执行时间的延迟任务到队列的方法 // @param queue 延迟队列的名称 // @param timeAt 延迟执行的时刻 // @param payload 投递进队列的多个参数负载 LaterAt(queue string, timeAt time.Time, payload interface{}) (err error) // Pop 从队尾取出一条任务的方法 // @param queue 队列的名称 Pop(queue string) (job JobIFace, exist bool) // SetConnection 设置队列底层连接器 // @param connection 底层连接器实例 SetConnection(connection interface{}) (err error) // GetConnection 获取队列底层连接器 GetConnection() (connection interface{}, err error) }
QueueIFace 基于不同技术栈的队列实现契约
type RawBody ¶
type RawBody struct { ID string // 队列内部唯一标识符ID // contains filtered or unexported fields }
RawBody 队列execute执行时传递给执行方法的参数Raw结构:job任务参数的包装器
- ID 内部标记队列任务的唯一ID,使用UUID生成
type TaskIFace ¶
type TaskIFace interface { MaxTries() int64 // 定义队列任务最大尝试次数:任务执行的最大尝试次数 RetryInterval() int64 // 定义队列任务最大尝试间隔:当任务执行失败后再次尝试执行的间隔时长,单位:秒 Timeout() time.Duration // 定义队列超时方法:返回超时时长 RateAllow() bool // 定义队列限流方法:1秒内会多次尝试去执行,返回true则执行返回false则不执行留待下一轮 Name() string // 定义队列名称方法:返回队列名称 Execute(ctx context.Context, job *RawBody) error // 定义队列任务执行时的方法:执行成功返回nil,执行失败返回error }
TaskIFace 定义队列Job任务执行逻辑的契约(队列任务执行类)