Documentation ¶
Index ¶
- Constants
- func NewEngine(redisName string, cfg *config.RedisConf, conn *go_redis.Client) (engine.Engine, error)
- func PoolJobKey(j engine.Job) string
- func PoolJobKey2(namespace, queue, jobID string) string
- func PoolJobKeyPrefix(namespace, queue string) string
- func PreloadDeadLetterLuaScript(redis *RedisInstance) error
- func PreloadQueueLuaScript(redis *RedisInstance) error
- func RedisInstanceMonitor(redis *RedisInstance)
- func SetLogger(l *logrus.Logger)
- func Setup(conf *config.Config) error
- type DeadLetter
- func (dl *DeadLetter) Add(jobID string) error
- func (dl *DeadLetter) Delete(limit int64) (count int64, err error)
- func (dl *DeadLetter) Name() string
- func (dl *DeadLetter) Peek() (size int64, jobID string, err error)
- func (dl *DeadLetter) Respawn(limit, ttlSecond int64) (count int64, err error)
- func (dl *DeadLetter) Size() (size int64, err error)
- type Engine
- func (e *Engine) BatchConsume(namespace string, queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []engine.Job, err error)
- func (e *Engine) Consume(namespace string, queues []string, ttrSecond, timeoutSecond uint32) (job engine.Job, err error)
- func (e *Engine) Delete(namespace, queue, jobID string) error
- func (e *Engine) DeleteDeadLetter(namespace, queue string, limit int64) (count int64, err error)
- func (e *Engine) Destroy(namespace, queue string) (count int64, err error)
- func (e *Engine) DumpInfo(out io.Writer) error
- func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, err error)
- func (e *Engine) PeekDeadLetter(namespace, queue string) (size int64, jobID string, err error)
- func (e *Engine) Publish(job engine.Job) (jobID string, err error)
- func (e *Engine) RespawnDeadLetter(namespace, queue string, limit, ttlSecond int64) (count int64, err error)
- func (e *Engine) Shutdown()
- func (e *Engine) Size(namespace, queue string) (size int64, err error)
- func (e *Engine) SizeOfDeadLetter(namespace, queue string) (size int64, err error)
- type MetaManager
- func (m *MetaManager) Dump() (map[string][]string, error)
- func (m *MetaManager) ListNamespaces() (namespaces []string, err error)
- func (m *MetaManager) ListQueues(namespace string) (queues []string, err error)
- func (m *MetaManager) RecordIfNotExist(namespace, queue string)
- func (m *MetaManager) Remove(namespace, queue string)
- type Metrics
- type Pool
- type Queue
- func (q *Queue) Destroy() (count int64, err error)
- func (q *Queue) Name() string
- func (q *Queue) Peek() (jobID string, tries uint16, err error)
- func (q *Queue) Poll(timeoutSecond, ttrSecond uint32) (jobID string, tries uint16, err error)
- func (q *Queue) Push(j engine.Job) error
- func (q *Queue) Size() (size int64, err error)
- type QueueName
- type RedisInfo
- type RedisInstance
- type SizeMonitor
- type SizeProvider
- type Timer
Constants ¶
const ( PoolPrefix = "j2" QueuePrefix = "q2" DeadLetterPrefix = "d2" MetaPrefix = "m2" BatchSize = int64(100) )
const ( Namespace = "infra" Subsystem = "lmstfy_redis_v2" )
const ( MaxRedisConnections = 5000 VersionV2 = "v2" )
Variables ¶
This section is empty.
Functions ¶
func PoolJobKey ¶
func PoolJobKey2 ¶
func PoolJobKeyPrefix ¶
func PreloadDeadLetterLuaScript ¶
func PreloadDeadLetterLuaScript(redis *RedisInstance) error
Because the DeadLetter is not like Timer which is a singleton, DeadLetters are transient objects like Queue. So we have to preload the lua scripts separately.
func PreloadQueueLuaScript ¶
func PreloadQueueLuaScript(redis *RedisInstance) error
func RedisInstanceMonitor ¶
func RedisInstanceMonitor(redis *RedisInstance)
Types ¶
type DeadLetter ¶
type DeadLetter struct {
// contains filtered or unexported fields
}
DeadLetter is where dead job will be buried, the job can be respawned into ready queue
func NewDeadLetter ¶
func NewDeadLetter(namespace, queue string, redis *RedisInstance) (*DeadLetter, error)
NewDeadLetter return an instance of DeadLetter storage
func (*DeadLetter) Add ¶
func (dl *DeadLetter) Add(jobID string) error
Add a job to dead letter. NOTE the data format is the same as the ready queue (lua struct `HHc0`), by doing this we could directly pop the dead job back to the ready queue.
NOTE: this method is not called any where except in tests, but this logic is implement in the timer's pump script. please refer to that.
func (*DeadLetter) Name ¶
func (dl *DeadLetter) Name() string
func (*DeadLetter) Respawn ¶
func (dl *DeadLetter) Respawn(limit, ttlSecond int64) (count int64, err error)
func (*DeadLetter) Size ¶
func (dl *DeadLetter) Size() (size int64, err error)
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine that connects all the dots including: - store jobs to timer set or ready queue - deliver jobs to clients - manage dead letters
func (*Engine) BatchConsume ¶
func (e *Engine) BatchConsume(namespace string, queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []engine.Job, err error)
BatchConsume consume some jobs of a queue
func (*Engine) Consume ¶
func (e *Engine) Consume(namespace string, queues []string, ttrSecond, timeoutSecond uint32) (job engine.Job, err error)
Consume multiple queues under the same namespace. the queue order implies priority: the first queue in the list is of the highest priority when that queue has job ready to be consumed. if none of the queues has any job, then consume wait for any queue that has job first.
func (*Engine) DeleteDeadLetter ¶
func (*Engine) PeekDeadLetter ¶
func (*Engine) RespawnDeadLetter ¶
type MetaManager ¶
type MetaManager struct {
// contains filtered or unexported fields
}
func NewMetaManager ¶
func NewMetaManager(redis *RedisInstance) *MetaManager
func (*MetaManager) ListNamespaces ¶
func (m *MetaManager) ListNamespaces() (namespaces []string, err error)
func (*MetaManager) ListQueues ¶
func (m *MetaManager) ListQueues(namespace string) (queues []string, err error)
func (*MetaManager) RecordIfNotExist ¶
func (m *MetaManager) RecordIfNotExist(namespace, queue string)
func (*MetaManager) Remove ¶
func (m *MetaManager) Remove(namespace, queue string)
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool stores all the jobs' data. this is a global singleton per engine note: this `Pool` is NOT the same terminology as the EnginePool
func NewPool ¶
func NewPool(redis *RedisInstance) *Pool
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the "ready queue" that has all the jobs that can be consumed right now
func (*Queue) Poll ¶
Pop a job. If the tries > 0, add job to the "in-flight" timer with timestamp set to `TTR + now()`; Or we might just move the job to "dead-letter".
type QueueName ¶
func PollQueues ¶
func PollQueues(redis *RedisInstance, timer *Timer, queueNames []QueueName, timeoutSecond, ttrSecond uint32) (queueName *QueueName, jobID string, retries uint16, err error)
Poll from multiple queues using blocking method; OR pop a job from one queue using non-blocking method
type RedisInfo ¶
type RedisInfo struct { MemUsed int64 // used_memory MemMax int64 // maxmemory NKeys int64 // total keys NExpires int64 // keys with TTL NClients int64 // connected_clients NBlocking int64 // blocked_clients }
func GetRedisInfo ¶
func GetRedisInfo(redis *RedisInstance) *RedisInfo
type RedisInstance ¶
type SizeMonitor ¶
type SizeMonitor struct {
// contains filtered or unexported fields
}
func NewSizeMonitor ¶
func NewSizeMonitor(redis *RedisInstance, timer *Timer, preloadData map[string][]string) *SizeMonitor
func (*SizeMonitor) Loop ¶
func (m *SizeMonitor) Loop()
func (*SizeMonitor) MonitorIfNotExist ¶
func (m *SizeMonitor) MonitorIfNotExist(namespace, queue string)
func (*SizeMonitor) Remove ¶
func (m *SizeMonitor) Remove(namespace, queue string)
type SizeProvider ¶
type Timer ¶
type Timer struct {
// contains filtered or unexported fields
}
Timer is the other way of saying "delay queue". timer kick jobs into ready queue when it's ready.
func NewTimer ¶
func NewTimer(name string, redis *RedisInstance, interval, checkBackupInterval time.Duration) (*Timer, error)
NewTimer return an instance of delay queue