Documentation ¶
Index ¶
- func StartDelayTaskLifeCycle(ctx context.Context)
- func TaskChannelsKey() string
- func TaskDelayQueueKey() string
- func TaskFailedQueueKey(channel string) string
- func TaskPrepareQueueKey() string
- func TaskQueueDistinctKey(channel string, command string) string
- func TaskQueueExecKey(channel string) string
- func TaskQueueKey(channel string) string
- func TransferPrepareTask(ctx context.Context)
- type Retryer
- type TaskChannel
- func (queue *TaskChannel) Close()
- func (queue *TaskChannel) Listen(ctx context.Context, dispose func())
- func (queue *TaskChannel) NewWorkerProcessID() string
- func (queue *TaskChannel) RegisterWorker(callback func(command brokers.Task, processID string) (bool, error))
- func (queue *TaskChannel) Work(dispose func())
- type TaskManager
- func (manager *TaskManager) AddChannel(channel *brokers.Channel) error
- func (manager *TaskManager) AddDelayTask(execTime time.Time, task brokers.Task) (id string, existence bool, err error)
- func (manager *TaskManager) AddFailedTask(task brokers.Task) error
- func (manager *TaskManager) AddTask(task brokers.Task) (id string, existence bool, err error)
- func (manager *TaskManager) Close()
- func (manager *TaskManager) GetDelayTask(taskID string) (brokers.Task, error)
- func (manager *TaskManager) GetDelayTasks() (map[string]brokers.Task, error)
- func (manager *TaskManager) GetFailedTask(channel, taskID string) (brokers.Task, error)
- func (manager *TaskManager) GetFailedTasks(channel string) (map[string]brokers.Task, error)
- func (manager *TaskManager) GetTaskChannel(channelName string) (channel brokers.Channel, err error)
- func (manager *TaskManager) GetTaskChannels() (channels map[string]*brokers.Channel, err error)
- func (manager *TaskManager) MigrateDelayTask()
- func (manager *TaskManager) QueryTask(channel string) (tasks []brokers.Task, err error)
- func (manager *TaskManager) RemoveChannel(channelName string) error
- func (manager *TaskManager) RemoveDelayTask(taskID string) (brokers.Task, error)
- func (manager *TaskManager) RemoveFailedTask(channel string, taskID string) (brokers.Task, error)
- func (manager *TaskManager) RetryFailedTask(channel string, taskID string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StartDelayTaskLifeCycle ¶
StartDelayTaskLifeCycle 启动延时任务迁移
func TaskFailedQueueKey ¶
TaskFailedQueueKey 返回失败任务队列KEY
func TaskPrepareQueueKey ¶
func TaskPrepareQueueKey() string
TaskPrepareQueueKey return the prepare key for queue
func TaskQueueDistinctKey ¶
TaskQueueDistinctKey 返回任务的去重KEY
func TransferPrepareTask ¶
TransferPrepareTask 将prepare队列中的任务加入正式的任务队列
Types ¶
type Retryer ¶
type Retryer struct {
// contains filtered or unexported fields
}
Retryer 重试器
type TaskChannel ¶
type TaskChannel struct {
// contains filtered or unexported fields
}
TaskChannel is the queue object for redis broker
func CreateTaskChannel ¶
func CreateTaskChannel(channel *brokers.Channel) *TaskChannel
CreateTaskChannel create a redis queue
func (*TaskChannel) Listen ¶
func (queue *TaskChannel) Listen(ctx context.Context, dispose func())
Listen to the redis queue
func (*TaskChannel) NewWorkerProcessID ¶
func (queue *TaskChannel) NewWorkerProcessID() string
NewWorkerProcessID 为worker分配ID
func (*TaskChannel) RegisterWorker ¶
func (queue *TaskChannel) RegisterWorker(callback func(command brokers.Task, processID string) (bool, error))
RegisterWorker 注册worker来消费队列
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager 任务管理器
func (*TaskManager) AddChannel ¶
func (manager *TaskManager) AddChannel(channel *brokers.Channel) error
AddChannel 新增一个channel,会持久化到Redis中
func (*TaskManager) AddDelayTask ¶
func (manager *TaskManager) AddDelayTask(execTime time.Time, task brokers.Task) (id string, existence bool, err error)
AddDelayTask 新增一个延时任务到队列
func (*TaskManager) AddFailedTask ¶
func (manager *TaskManager) AddFailedTask(task brokers.Task) error
AddFailedTask 添加失败的任务到失败任务队列
func (*TaskManager) GetDelayTask ¶
func (manager *TaskManager) GetDelayTask(taskID string) (brokers.Task, error)
GetDelayTask Get specified delay task
func (*TaskManager) GetDelayTasks ¶
func (manager *TaskManager) GetDelayTasks() (map[string]brokers.Task, error)
GetDelayTasks 获取所有延迟任务
func (*TaskManager) GetFailedTask ¶
func (manager *TaskManager) GetFailedTask(channel, taskID string) (brokers.Task, error)
GetFailedTask 查询失败的任务
func (*TaskManager) GetFailedTasks ¶
GetFailedTasks 返回channel中所有失败的任务
func (*TaskManager) GetTaskChannel ¶
func (manager *TaskManager) GetTaskChannel(channelName string) (channel brokers.Channel, err error)
GetTaskChannel 从Redis中查询某个channel信息
func (*TaskManager) GetTaskChannels ¶
func (manager *TaskManager) GetTaskChannels() (channels map[string]*brokers.Channel, err error)
GetTaskChannels 返回偶有的channel信息
func (*TaskManager) MigrateDelayTask ¶
func (manager *TaskManager) MigrateDelayTask()
MigrateDelayTask 迁移延时任务到执行队列
func (*TaskManager) QueryTask ¶
func (manager *TaskManager) QueryTask(channel string) (tasks []brokers.Task, err error)
QueryTask function query task queue status
func (*TaskManager) RemoveChannel ¶
func (manager *TaskManager) RemoveChannel(channelName string) error
RemoveChannel 从Redis中移除channel
func (*TaskManager) RemoveDelayTask ¶
func (manager *TaskManager) RemoveDelayTask(taskID string) (brokers.Task, error)
RemoveDelayTask Remove a delay task from queue
func (*TaskManager) RemoveFailedTask ¶
RemoveFailedTask 从失败任务队列中移除任务
func (*TaskManager) RetryFailedTask ¶
func (manager *TaskManager) RetryFailedTask(channel string, taskID string) error
RetryFailedTask 重试失败的任务