Documentation
¶
Index ¶
- Constants
- Variables
- func QueueKey(queue string) string
- func RedisLock(cli *redis.Client, key string, ttl time.Duration) (unlocker func(), err error)
- func RedisLockV(cli *redis.Client, key string, val string, ttl time.Duration) (unlocker func(), err error)
- func RedisLockerE(cli *redis.Client, key, val string, ttl time.Duration, f func() error) (bool, error)
- type Context
- type ContextKey
- type Logger
- type Meta
- type Option
- type Queue
- type RedisRunner
- type RunnerStatus
- type Worker
Constants ¶
View Source
const ( RunnerAliveStatusTTL = 30 // runner存活状态持续时间 RedisTimeout = time.Second * 3 Prefix = "worker:" KeyWorkers = Prefix + "workers" // 存储work数据 KeyRunnerAlivePrefix = Prefix + "alive#" // 设置runner存活状态 KeyWaitingQueue = Prefix + "waiting" // 等待队列 ReadyQueueLockTerm = 60 * time.Second // 就绪队列锁有效期 KeyReadyQueueLocker = Prefix + "readyQueueLocker" // 就绪队列锁 KeyWaitingQueueLocker = Prefix + "waitingLocker" // 等待队列锁 KeyWorkingCheckLocker = Prefix + "workingLocker" // 工作空间状态检查锁 KeyWorking = Prefix + "working" // 工作空间 WorkingCheckLockerTerm = 6 * time.Minute // 工作空间状态检查锁超时时间 WaitingQueueCatchMissingWaiting = 30 * time.Second // 等待队列线程未获取锁时,等待时间 WaitingQueueCatchEmptyWaiting = 1 * time.Second // 等待队列线程 WaitingQueueLockTerm = 60 * time.Second // 等待队列锁有效期 WaitingQueueCatchBatchSize = 100 // 等待队列转移批次大小 WaitingQueueDataIDSeparator = "#" // 等待队列内存储队列名称和ID,使用分隔符连接 ReadyQueuePullBatchSize = 30 // 就绪队列请求批量大小 NeedPullThresholdRatio = 3 // 工作空间数量小于NeedPullThresholdRatio * Threads 时,触发请求就绪队列逻辑 )
Variables ¶
View Source
var ( ErrWorkerNotRegistry = errors.New("unregistry worker") KeyReadyQueueHigh = QueueKey(QueueHigh) // 就绪队列高优先级 KeyReadyQueueLow = QueueKey(QueueLow) // 就绪队列低优先级 )
View Source
var (
ErrLockerAlreadySet = errors.New("locker has been occupied")
)
Functions ¶
func RedisLockV ¶
Types ¶
type Logger ¶
type Logger interface { Debugf(template string, args ...interface{}) Infof(template string, args ...interface{}) Warnf(template string, args ...interface{}) Errorf(template string, args ...interface{}) Debug(args ...interface{}) Info(args ...interface{}) Warn(args ...interface{}) Error(args ...interface{}) }
type Meta ¶
type RedisRunner ¶
type RedisRunner struct { ID string RegistryWorkers map[string]reflect.Type // contains filtered or unexported fields }
保证消息不丢失,但是可能出现消息重复消费情况, 有需要可以业务端确保幂等消费逻辑 等待队列:延时执行的worker到等待队列,时间到以后,被转移到就绪队列 就绪队列:待执行的worker,有多个优先级(优先级调度策略,如何处理饥饿情况) 工作空间:为了保证多进程下数据安全,每个worker当前处理任务会分发到工作空间,工作空间即为分配给当前进程的任务,成功执行后才从工作空间删除 支持任务错误重试逻辑,失败任务会根据重试次数来确定重新调度时间,并发布到等待队列 任务失败次数超过重试阈值后,任务丢弃
func NewRunner ¶
func NewRunner(redisCli *redis.Client, threads uint, logger Logger) (*RedisRunner, error)
func (*RedisRunner) Declare ¶
func (r *RedisRunner) Declare(work Worker, opts ...Option) (*Meta, error)
Declare should used before worker Registry
func (*RedisRunner) GetQueueLen ¶
func (r *RedisRunner) GetQueueLen(queue string) (int64, error)
func (*RedisRunner) RegistryWorker ¶
func (r *RedisRunner) RegistryWorker(work Worker) error
worker should registry before worker loop lanch
type RunnerStatus ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.