Documentation
¶
Index ¶
- Constants
- Variables
- func DelayTime(delay int, delayUnit DelayUnit) int64
- func GetDelayJobKey(slot, topic string) string
- func GetDelayQueueKey(slot, topic string) string
- func GetReadyQueueKey(slot, topic string) string
- type Consumer
- type ConsumerOption
- type ConsumerOptions
- type DelayUnit
- type Delayed
- func (d *Delayed) Cancel(topic, jobId string) error
- func (d *Delayed) Close() error
- func (d *Delayed) Dispatch(topic string, payload []byte) (string, error)
- func (d *Delayed) DispatchDelay(topic string, payload []byte, delay int, delayUnit DelayUnit) (string, error)
- func (d *Delayed) Provide(ctx context.Context) interface{}
- func (d *Delayed) Register(worker *Worker)
- func (d *Delayed) Run() error
- type IConsumer
- type IProduct
- type Job
- type JobHandler
- type Producer
- type Queue
- type RedisQueue
- func (rq *RedisQueue) AddTopic(topic string) error
- func (rq *RedisQueue) Cancel(topic, jobId string) error
- func (rq *RedisQueue) Close() error
- func (rq *RedisQueue) DelayLen(topic string) (int, error)
- func (rq *RedisQueue) Exists(topic, jobId string) (bool, error)
- func (rq *RedisQueue) FetchDelayJob(topic string) ([]*Job, error)
- func (rq *RedisQueue) FetchReadyJob(topic string) ([]*Job, error)
- func (rq *RedisQueue) Pop(topic string) (*Job, error)
- func (rq *RedisQueue) Provide(context.Context) interface{}
- func (rq *RedisQueue) Push(job *Job) error
- func (rq *RedisQueue) PushDelay(job *Job) error
- func (rq *RedisQueue) ReadyLen(topic string) (int, error)
- func (rq *RedisQueue) Run() error
- type Worker
Constants ¶
View Source
const ( ReadyQueueKey = "delayed:ready:topic.%s" // 就绪队列的 key DelayQueueKey = "delayed:delay:topic.%s" // 延迟SortedSet的 key DelayJob = "delayed:job:topic.%s" // 延迟任务详情存放的 key )
Variables ¶
View Source
var (
ErrGetJobIdRetries = fmt.Errorf("获取 JobId 重试次数超过 5 次, 请检查 Redis 连接是否正常")
)
Functions ¶
func GetDelayQueueKey ¶
GetDelayQueueKey 获取延迟SortedSet的 key
func GetReadyQueueKey ¶
GetReadyQueueKey 获取就绪队列的 key
Types ¶
type ConsumerOption ¶
type ConsumerOption func(*ConsumerOptions)
func WithClosedWait ¶
func WithClosedWait(closedWait int) ConsumerOption
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) ConsumerOption
func WithSleep ¶
func WithSleep(sleep int) ConsumerOption
func WithWorkerNum ¶
func WithWorkerNum(workerNum int) ConsumerOption
type ConsumerOptions ¶
type ConsumerOptions struct { Sleep int // 当就绪队列为空时, 休眠时间, 单位为秒 ClosedWait int // 关闭队列时, 等待队列中的任务执行完成的时间, 单位为秒 MaxRetries int // 任务失败后的重试次数 WorkerNum int // 任务处理器的数量 }
ConsumerOptions 代表消费者的配置选项
type Delayed ¶
type Delayed struct {
// contains filtered or unexported fields
}
type Job ¶
type Job struct { Id string `msgpack:"1"` // 任务ID Topic string `msgpack:"2"` // 任务主题 Delay int64 `msgpack:"3"` // 任务延迟执行的时间戳 Payload []byte `msgpack:"4"` // 任务内容 Timestamp int64 `msgpack:"5"` // 任务投递时间 Retries int `msgpack:"6"` // 任务重试次数 }
Job 代表一个定时任务
type JobHandler ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
type Queue ¶
type Queue interface { Run() error // 启动队列 AddTopic(string) error // 添加一个 topic Push(*Job) error // 添加 Job 到队列中, 投递的方向为队列尾部 Pop(string) (*Job, error) // 从队列中获取 Job PushDelay(*Job) error // 将 Job 添加到延迟队列中 Cancel(string, string) error // 通过指定 topic 和 jobId 取消延迟任务 FetchReadyJob(string) ([]*Job, error) // 获取队列中已就绪的 Job FetchDelayJob(string) ([]*Job, error) // 获取延迟队列中的所有 Job ReadyLen(string) (int, error) // 获取队列中已就绪的 Job 数量 DelayLen(string) (int, error) // 获取延迟队列中的 Job 数量 Exists(string, string) (bool, error) // 判断 JobID 是否存在 Close() error // 关闭队列 }
Queue 代表一个队列, 用于存储 Job, 并提供 Job 的增删改查等操作, 以及 Job 的投递和消费, 以及 Job 的迁移, 以及队列的关闭等操作 延迟 Job 会被存储在延迟队列中, 就绪 Job 会被存储在就绪队列中, 就绪 Job 会被消费, 延迟 Job 在延迟时间到达后会被迁移到就绪队列中
func NewRedisQueue ¶
func NewRedisQueue() Queue
type RedisQueue ¶
func (*RedisQueue) AddTopic ¶
func (rq *RedisQueue) AddTopic(topic string) error
func (*RedisQueue) Close ¶
func (rq *RedisQueue) Close() error
func (*RedisQueue) DelayLen ¶
func (rq *RedisQueue) DelayLen(topic string) (int, error)
DelayLen 获取延迟队列长度
func (*RedisQueue) Exists ¶
func (rq *RedisQueue) Exists(topic, jobId string) (bool, error)
Exists 判断任务是否存在
func (*RedisQueue) FetchDelayJob ¶
func (rq *RedisQueue) FetchDelayJob(topic string) ([]*Job, error)
FetchDelayJob 获取延迟队列中的任务
func (*RedisQueue) FetchReadyJob ¶
func (rq *RedisQueue) FetchReadyJob(topic string) ([]*Job, error)
FetchReadyJob 获取就绪队列中的任务
func (*RedisQueue) Provide ¶
func (rq *RedisQueue) Provide(context.Context) interface{}
func (*RedisQueue) PushDelay ¶
func (rq *RedisQueue) PushDelay(job *Job) error
func (*RedisQueue) ReadyLen ¶
func (rq *RedisQueue) ReadyLen(topic string) (int, error)
ReadyLen 获取就绪队列长度
func (*RedisQueue) Run ¶
func (rq *RedisQueue) Run() error
Click to show internal directories.
Click to hide internal directories.