redis

package
v0.0.0-...-3ff8f60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2018 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartDelayTaskLifeCycle

func StartDelayTaskLifeCycle(ctx context.Context)

StartDelayTaskLifeCycle 启动延时任务迁移

func TaskChannelsKey

func TaskChannelsKey() string

TaskChannelsKey 返回所有channel信息存储的KEY

func TaskDelayQueueKey

func TaskDelayQueueKey() string

TaskDelayQueueKey 返回延时任务的key

func TaskFailedQueueKey

func TaskFailedQueueKey(channel string) string

TaskFailedQueueKey 返回失败任务队列KEY

func TaskPrepareQueueKey

func TaskPrepareQueueKey() string

TaskPrepareQueueKey return the prepare key for queue

func TaskQueueDistinctKey

func TaskQueueDistinctKey(channel string, command string) string

TaskQueueDistinctKey 返回任务的去重KEY

func TaskQueueExecKey

func TaskQueueExecKey(channel string) string

TaskQueueExecKey 返回执行中的任务队列KEY

func TaskQueueKey

func TaskQueueKey(channel string) string

TaskQueueKey 返回任务队列的KEY

func TransferPrepareTask

func TransferPrepareTask(ctx context.Context)

TransferPrepareTask 将prepare队列中的任务加入正式的任务队列

Types

type Retryer

type Retryer struct {
	// contains filtered or unexported fields
}

Retryer 重试器

func Retry

func Retry(f func(retryTimes int) error, max int) *Retryer

Retry 自动重试函数

func (*Retryer) Failed

func (r *Retryer) Failed(failedFunc func(err error)) *Retryer

Failed 注册执行失败后置函数(重试最大次数后,仍然失败才调用)

func (*Retryer) Finished

func (r *Retryer) Finished(finishFunc func(retryTimes int, err error) bool) *Retryer

Finished 注册无论成功失败,最终执行完毕后执行的后置函数

func (*Retryer) Run

func (r *Retryer) Run() (int, error)

Run 同步的方式运行

func (*Retryer) RunAsync

func (r *Retryer) RunAsync() <-chan struct{}

RunAsync 异步方式运行

func (*Retryer) Success

func (r *Retryer) Success(successFunc func(retryTimes int)) *Retryer

Success 注册执行成功后置函数

type TaskChannel

type TaskChannel struct {
	// contains filtered or unexported fields
}

TaskChannel is the queue object for redis broker

func CreateTaskChannel

func CreateTaskChannel(channel *brokers.Channel) *TaskChannel

CreateTaskChannel create a redis queue

func (*TaskChannel) Close

func (queue *TaskChannel) Close()

Close task queue

func (*TaskChannel) Listen

func (queue *TaskChannel) Listen(ctx context.Context, dispose func())

Listen to the redis queue

func (*TaskChannel) NewWorkerProcessID

func (queue *TaskChannel) NewWorkerProcessID() string

NewWorkerProcessID 为worker分配ID

func (*TaskChannel) RegisterWorker

func (queue *TaskChannel) RegisterWorker(callback func(command brokers.Task, processID string) (bool, error))

RegisterWorker 注册worker来消费队列

func (*TaskChannel) Work

func (queue *TaskChannel) Work(dispose func())

Work 执行消费者worker

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager 任务管理器

func GetTaskManager

func GetTaskManager() *TaskManager

GetTaskManager Create a redis Manager

func (*TaskManager) AddChannel

func (manager *TaskManager) AddChannel(channel *brokers.Channel) error

AddChannel 新增一个channel,会持久化到Redis中

func (*TaskManager) AddDelayTask

func (manager *TaskManager) AddDelayTask(execTime time.Time, task brokers.Task) (id string, existence bool, err error)

AddDelayTask 新增一个延时任务到队列

func (*TaskManager) AddFailedTask

func (manager *TaskManager) AddFailedTask(task brokers.Task) error

AddFailedTask 添加失败的任务到失败任务队列

func (*TaskManager) AddTask

func (manager *TaskManager) AddTask(task brokers.Task) (id string, existence bool, err error)

AddTask 用于将任务加入到Channel

func (*TaskManager) Close

func (manager *TaskManager) Close()

Close manager

func (*TaskManager) GetDelayTask

func (manager *TaskManager) GetDelayTask(taskID string) (brokers.Task, error)

GetDelayTask Get specified delay task

func (*TaskManager) GetDelayTasks

func (manager *TaskManager) GetDelayTasks() (map[string]brokers.Task, error)

GetDelayTasks 获取所有延迟任务

func (*TaskManager) GetFailedTask

func (manager *TaskManager) GetFailedTask(channel, taskID string) (brokers.Task, error)

GetFailedTask 查询失败的任务

func (*TaskManager) GetFailedTasks

func (manager *TaskManager) GetFailedTasks(channel string) (map[string]brokers.Task, error)

GetFailedTasks 返回channel中所有失败的任务

func (*TaskManager) GetTaskChannel

func (manager *TaskManager) GetTaskChannel(channelName string) (channel brokers.Channel, err error)

GetTaskChannel 从Redis中查询某个channel信息

func (*TaskManager) GetTaskChannels

func (manager *TaskManager) GetTaskChannels() (channels map[string]*brokers.Channel, err error)

GetTaskChannels 返回偶有的channel信息

func (*TaskManager) MigrateDelayTask

func (manager *TaskManager) MigrateDelayTask()

MigrateDelayTask 迁移延时任务到执行队列

func (*TaskManager) QueryTask

func (manager *TaskManager) QueryTask(channel string) (tasks []brokers.Task, err error)

QueryTask function query task queue status

func (*TaskManager) RemoveChannel

func (manager *TaskManager) RemoveChannel(channelName string) error

RemoveChannel 从Redis中移除channel

func (*TaskManager) RemoveDelayTask

func (manager *TaskManager) RemoveDelayTask(taskID string) (brokers.Task, error)

RemoveDelayTask Remove a delay task from queue

func (*TaskManager) RemoveFailedTask

func (manager *TaskManager) RemoveFailedTask(channel string, taskID string) (brokers.Task, error)

RemoveFailedTask 从失败任务队列中移除任务

func (*TaskManager) RetryFailedTask

func (manager *TaskManager) RetryFailedTask(channel string, taskID string) error

RetryFailedTask 重试失败的任务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL