Versions in this module Expand all Collapse all v1 v1.0.1 Jun 23, 2022 Changes in this version + const KeyReadyQueueLocker + const KeyRunnerAlivePrefix + const KeyWaitingQueue + const KeyWaitingQueueLocker + const KeyWorkers + const KeyWorking + const KeyWorkingCheckLocker + const NeedPullThresholdRatio + const Prefix + const ReadyQueueLockTerm + const ReadyQueuePullBatchSize + const RedisTimeout + const RunnerAliveStatusTTL + const WaitingQueueCatchBatchSize + const WaitingQueueCatchEmptyWaiting + const WaitingQueueCatchMissingWaiting + const WaitingQueueDataIDSeparator + const WaitingQueueLockTerm + const WorkingCheckLockerTerm + var ErrLockerAlreadySet = errors.New("locker has been occupied") + var ErrWorkerNotRegistry = errors.New("unregistry worker") + var KeyReadyQueueHigh = QueueKey(QueueHigh) + var KeyReadyQueueLow = QueueKey(QueueLow) + func QueueKey(queue string) string + func RedisLock(cli *redis.Client, key string, ttl time.Duration) (unlocker func(), err error) + func RedisLockV(cli *redis.Client, key string, val string, ttl time.Duration) (unlocker func(), err error) + func RedisLockerE(cli *redis.Client, key, val string, ttl time.Duration, f func() error) (bool, error) + type Context interface + Meta func() *Meta + func NewContext(ctx context.Context, m *Meta) Context + type ContextKey string + const ContextKeyMeta + type Logger interface + Debug func(args ...interface{}) + Debugf func(template string, args ...interface{}) + Error func(args ...interface{}) + Errorf func(template string, args ...interface{}) + Info func(args ...interface{}) + Infof func(template string, args ...interface{}) + Warn func(args ...interface{}) + Warnf func(template string, args ...interface{}) + type Meta struct + CreatedAt time.Time + Error string + ID string + Name string + PerformAt *time.Time + Queue Queue + Raw []byte + Retry int + RetryCount int + Success bool + func NewMetaByWorker(w Worker, opts ...Option) (*Meta, error) + func (m *Meta) String() string + type Option func(c *Meta) + func WithPerformAt(performAt time.Time) Option + func WithQueue(q Queue) Option + func WithRetry(retry int) Option + type Queue = string + const DefaultRetryCount + const QueueHigh + const QueueLow + type RedisRunner struct + ID string + RegistryWorkers map[string]reflect.Type + func NewRunner(redisCli *redis.Client, threads uint, logger Logger) (*RedisRunner, error) + func (r *RedisRunner) Declare(work Worker, opts ...Option) (*Meta, error) + func (r *RedisRunner) GetQueueLen(queue string) (int64, error) + func (r *RedisRunner) RegistryWorker(work Worker) error + func (r *RedisRunner) Run(ctx context.Context) error + type RunnerStatus struct + ExecCount int64 + FailCount int64 + type Worker interface + Perform func(ctx Context) error + WorkerName func() string