Documentation
¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Variables
- type DailyStats
- type ErrQueueNotEmpty
- type ErrQueueNotFound
- type Pagination
- type RDB
- func (r *RDB) AllQueues() ([]string, error)
- func (r *RDB) CancelationPubSub() (*redis.PubSub, error)
- func (r *RDB) CheckAndEnqueue(qnames ...string) error
- func (r *RDB) ClearServerState(host string, pid int, serverID string) error
- func (r *RDB) Close() error
- func (r *RDB) ClusterKeySlot(qname string) (int64, error)
- func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)
- func (r *RDB) CurrentStats(qname string) (*Stats, error)
- func (r *RDB) DeleteActiveTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeleteAllActiveTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeletePendingTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)
- func (r *RDB) Done(msg *base.TaskMessage) error
- func (r *RDB) Enqueue(msg *base.TaskMessage) error
- func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
- func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
- func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
- func (r *RDB) KillAllRetryTasks(qname string) (int64, error)
- func (r *RDB) KillAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)
- func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)
- func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error)
- func (r *RDB) ListServers() ([]*base.ServerInfo, error)
- func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
- func (r *RDB) Pause(qname string) error
- func (r *RDB) Ping() error
- func (r *RDB) PublishCancelation(id string) error
- func (r *RDB) RedisClusterInfo() (map[string]string, error)
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) RemoveQueue(qname string, force bool) error
- func (r *RDB) Requeue(msg *base.TaskMessage) error
- func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
- func (r *RDB) RunAllDeadTasks(qname string) (int64, error)
- func (r *RDB) RunAllRetryTasks(qname string) (int64, error)
- func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error
- func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error
- func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
- func (r *RDB) Unpause(qname string) error
- func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoProcessableTask indicates that there are no tasks ready to be processed. ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists") )
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
type DailyStats struct { // Name of the queue (e.g. "default", "critical"). Queue string // Total number of tasks processed during the given day. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed during the given day. Failed int // Date this stats was taken. Time time.Time }
DailyStats holds aggregate data for a given day.
type ErrQueueNotEmpty ¶
type ErrQueueNotEmpty struct {
// contains filtered or unexported fields
}
ErrQueueNotEmpty indicates specified queue is not empty.
func (*ErrQueueNotEmpty) Error ¶
func (e *ErrQueueNotEmpty) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
// contains filtered or unexported fields
}
ErrQueueNotFound indicates specified queue does not exist.
func (*ErrQueueNotFound) Error ¶
func (e *ErrQueueNotFound) Error() string
type Pagination ¶
type Pagination struct { // Number of items in the page. Size int // Page number starting from zero. Page int }
Pagination specifies the page size and page number for the list operation.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) CancelationPubSub ¶
CancelationPubSub returns a pubsub for cancelation messages.
func (*RDB) CheckAndEnqueue ¶
CheckAndEnqueue checks for scheduled/retry tasks for the given queues and enqueues any tasks that are ready to be processed.
func (*RDB) ClearServerState ¶
ClearServerState deletes server state data from redis.
func (*RDB) ClusterKeySlot ¶
ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (*RDB) ClusterNodes ¶
ClusterNodes returns a list of nodes the given queue belongs to.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteActiveTask ¶
func (*RDB) DeleteAllDeadTasks ¶
DeleteAllDeadTasks deletes all dead tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllPendingTasks ¶
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteDeadTask ¶
DeleteDeadTask deletes a dead task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeletePendingTask ¶
func (*RDB) DeleteRetryTask ¶
DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) DeleteScheduledTask ¶
DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) Dequeue ¶
Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.
func (*RDB) Done ¶
func (r *RDB) Done(msg *base.TaskMessage) error
Done removes the task from active queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) Enqueue ¶
func (r *RDB) Enqueue(msg *base.TaskMessage) error
Enqueue inserts the given task to the tail of the queue.
func (*RDB) EnqueueUnique ¶
EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days for the given queue.
func (*RDB) Kill ¶
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error
Kill sends the task to "dead" queue from active queue, assigning the error message to the task. It also trims the set by timestamp and set size.
func (*RDB) KillAllRetryTasks ¶
KillAllRetryTasks kills all retry tasks from the given queue and returns the number of tasks that were moved.
func (*RDB) KillAllScheduledTasks ¶
KillAllScheduledTasks kills all scheduled tasks from the given queue and returns the number of tasks that were moved.
func (*RDB) KillRetryTask ¶
KillRetryTask finds a retry task that matches the given id and score from the given queue and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) KillScheduledTask ¶
KillScheduledTask finds a scheduled task that matches the given id and score from the given queue and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ListActive ¶
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)
ListActive returns all tasks that are currently being processed for the given queue.
func (*RDB) ListDead ¶
ListDead returns all tasks from the given queue that have exhausted its retry limit.
func (*RDB) ListDeadlineExceeded ¶
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)
ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
func (*RDB) ListPending ¶
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)
ListPending returns pending tasks that are ready to be processed.
func (*RDB) ListRetry ¶
ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.
func (*RDB) ListServers ¶
func (r *RDB) ListServers() ([]*base.ServerInfo, error)
ListServers returns the list of server info.
func (*RDB) ListWorkers ¶
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
ListWorkers returns the list of worker stats.
func (*RDB) PublishCancelation ¶
PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.
func (*RDB) RedisClusterInfo ¶
RedisClusterInfo returns a map of redis cluster info.
func (*RDB) RemoveQueue ¶
RemoveQueue removes the specified queue.
If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.
func (*RDB) Requeue ¶
func (r *RDB) Requeue(msg *base.TaskMessage) error
Requeue moves the task from active queue to the specified queue.
func (*RDB) Retry ¶
Retry moves the task from active to retry queue, incrementing retry count and assigning error message to the task message.
func (*RDB) RunAllDeadTasks ¶
RunAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.
func (*RDB) RunAllRetryTasks ¶
RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued.
func (*RDB) RunAllScheduledTasks ¶
RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued.
func (*RDB) RunDeadTask ¶
RunDeadTask finds a dead task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) RunRetryTask ¶
RunRetryTask finds a retry task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) RunScheduledTask ¶
RunScheduledTask finds a scheduled task that matches the given id and score from from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (*RDB) ScheduleUnique ¶
ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) WriteServerState ¶
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
WriteServerState writes server state data to redis with expiration set to the value ttl.
type Stats ¶
type Stats struct { // Name of the queue (e.g. "default", "critical"). Queue string // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool // Size is the total number of tasks in the queue. Size int // Number of tasks in each state. Pending int Active int Scheduled int Retry int Dead int // Total number of tasks processed during the current date. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed during the current date. Failed int // Time this stats was taken. Timestamp time.Time }
Stats represents a state of queues at a certain time.