Documentation ¶
Overview ¶
Package rdb encapsulates the interactions with redis.
Index ¶
- Constants
- type DailyStats
- type GroupStat
- type Pagination
- type RDB
- func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string) error
- func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, groupKey string, ttl time.Duration) error
- func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, ...) (string, error)
- func (r *RDB) AllQueues() ([]string, error)
- func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error
- func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error)
- func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error)
- func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error)
- func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) ArchiveTask(qname, id string) error
- func (r *RDB) CancelationPubSub() (*redis.PubSub, error)
- func (r *RDB) ClearSchedulerEntries(schedulerID string) error
- func (r *RDB) ClearSchedulerHistory(entryID string) error
- func (r *RDB) ClearServerState(host string, pid int, serverID string) error
- func (r *RDB) Client() redis.UniversalClient
- func (r *RDB) Close() error
- func (r *RDB) ClusterKeySlot(qname string) (int64, error)
- func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)
- func (r *RDB) CurrentStats(qname string) (*Stats, error)
- func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error
- func (r *RDB) DeleteAllAggregatingTasks(qname, gname string) (int64, error)
- func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllCompletedTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)
- func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error
- func (r *RDB) DeleteTask(qname, id string) error
- func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error)
- func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error
- func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error
- func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time.Duration) error
- func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error)
- func (r *RDB) ForwardIfReady(qnames ...string) error
- func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error)
- func (r *RDB) GroupStats(qname string) ([]*GroupStat, error)
- func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
- func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListGroups(qname string) ([]string, error)
- func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error)
- func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error)
- func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)
- func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)
- func (r *RDB) ListServers() ([]*base.ServerInfo, error)
- func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
- func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error
- func (r *RDB) Pause(qname string) error
- func (r *RDB) Ping() error
- func (r *RDB) PublishCancelation(id string) error
- func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error)
- func (r *RDB) ReclaimStaleAggregationSets(qname string) error
- func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error
- func (r *RDB) RedisClusterInfo() (map[string]string, error)
- func (r *RDB) RedisInfo() (map[string]string, error)
- func (r *RDB) RemoveQueue(qname string, force bool) error
- func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error
- func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, ...) error
- func (r *RDB) RunAllAggregatingTasks(qname, gname string) (int64, error)
- func (r *RDB) RunAllArchivedTasks(qname string) (int64, error)
- func (r *RDB) RunAllRetryTasks(qname string) (int64, error)
- func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)
- func (r *RDB) RunTask(qname, id string) error
- func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error
- func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ...) error
- func (r *RDB) SetClock(c timeutil.Clock)
- func (r *RDB) Unpause(qname string) error
- func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error)
- func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error
- func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
- type Stats
Constants ¶
const LeaseDuration = 30 * time.Second
LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DailyStats ¶
type DailyStats struct { // Name of the queue (e.g. "default", "critical"). Queue string // Total number of tasks processed during the given day. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed during the given day. Failed int // Date this stats was taken. Time time.Time }
DailyStats holds aggregate data for a given day.
type Pagination ¶
type Pagination struct { // Number of items in the page. Size int // Page number starting from zero. Page int }
Pagination specifies the page size and page number for the list operation.
type RDB ¶
type RDB struct {
// contains filtered or unexported fields
}
RDB is a client interface to query and mutate task queues.
func (*RDB) AddToGroup ¶
func (*RDB) AddToGroupUnique ¶
func (*RDB) AggregationCheck ¶
func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error)
AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns the set ID. If not, it returns an empty string for the set ID. The time for gracePeriod and maxDelay is computed relative to the time t.
Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words, the function only checks the most recently added task against the given gracePeriod.
func (*RDB) Archive ¶
Archive sends the given task to archive, attaching the error message to the task. It also trims the archive by timestamp and set size.
func (*RDB) ArchiveAllAggregatingTasks ¶
ArchiveAllAggregatingTasks archives all aggregating tasks from the given group and returns the number of tasks archived. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) ArchiveAllPendingTasks ¶
ArchiveAllPendingTasks archives all pending tasks from the given queue and returns the number of tasks moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) ArchiveAllRetryTasks ¶
ArchiveAllRetryTasks archives all retry tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) ArchiveAllScheduledTasks ¶
ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) ArchiveTask ¶
ArchiveTask finds a task that matches the id from the given queue and archives it. It returns nil if it successfully archived the task.
If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is already archived, it returns TaskAlreadyArchivedError. If a task is in active state it returns non-nil error with FailedPrecondition code.
func (*RDB) CancelationPubSub ¶
CancelationPubSub returns a pubsub for cancelation messages.
func (*RDB) ClearSchedulerEntries ¶
ClearSchedulerEntries deletes scheduler entries data from redis.
func (*RDB) ClearSchedulerHistory ¶
ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
func (*RDB) ClearServerState ¶
ClearServerState deletes server state data from redis.
func (*RDB) Client ¶
func (r *RDB) Client() redis.UniversalClient
Client returns the reference to underlying redis client.
func (*RDB) ClusterKeySlot ¶
ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (*RDB) ClusterNodes ¶
ClusterNodes returns a list of nodes the given queue belongs to.
func (*RDB) CurrentStats ¶
CurrentStats returns a current state of the queues.
func (*RDB) DeleteAggregationSet ¶
DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.
func (*RDB) DeleteAllAggregatingTasks ¶
DeleteAllAggregatingTasks deletes all aggregating tasks from the given group and returns the number of tasks deleted.
func (*RDB) DeleteAllArchivedTasks ¶
DeleteAllArchivedTasks deletes all archived tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllCompletedTasks ¶
DeleteAllCompletedTasks deletes all completed tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllPendingTasks ¶
DeleteAllPendingTasks deletes all pending tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.
func (*RDB) DeleteExpiredCompletedTasks ¶
DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, and delete all expired tasks.
func (*RDB) DeleteTask ¶
DeleteTask finds a task that matches the id from the given queue and deletes it. It returns nil if it successfully archived the task.
If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active state it returns non-nil error with Code FailedPrecondition.
func (*RDB) Dequeue ¶
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error)
Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and its lease expiration time. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.
func (*RDB) Done ¶
Done removes the task from active queue and deletes the task. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) EnqueueUnique ¶
EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) ExtendLease ¶
ExtendLease extends the lease for the given tasks by LeaseDuration (30s). It returns a new expiration time if the operation was successful.
func (*RDB) ForwardIfReady ¶
ForwardIfReady checks scheduled and retry sets of the given queues and move any tasks that are ready to be processed to the pending set.
func (*RDB) GetTaskInfo ¶
GetTaskInfo returns a TaskInfo describing the task from the given queue.
func (*RDB) HistoricalStats ¶
func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)
HistoricalStats returns a list of stats from the last n days for the given queue.
func (*RDB) ListActive ¶
ListActive returns all tasks that are currently being processed for the given queue.
func (*RDB) ListAggregating ¶
ListAggregating returns all tasks from the given group.
func (*RDB) ListArchived ¶
ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (*RDB) ListCompleted ¶
ListCompleted returns all tasks from the given queue that have completed successfully.
func (*RDB) ListGroups ¶
ListGroups returns a list of all known groups in the given queue.
func (*RDB) ListLeaseExpired ¶
ListLeaseExpired returns a list of task messages with an expired lease from the given queues.
func (*RDB) ListPending ¶
ListPending returns pending tasks that are ready to be processed.
func (*RDB) ListRetry ¶
ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.
func (*RDB) ListScheduled ¶
ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.
func (*RDB) ListSchedulerEnqueueEvents ¶
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)
ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func (*RDB) ListSchedulerEntries ¶
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)
ListSchedulerEntries returns the list of scheduler entries.
func (*RDB) ListServers ¶
func (r *RDB) ListServers() ([]*base.ServerInfo, error)
ListServers returns the list of server info.
func (*RDB) ListWorkers ¶
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)
ListWorkers returns the list of worker stats.
func (*RDB) MarkAsComplete ¶
MarkAsComplete removes the task from active queue to mark the task as completed. It removes a uniqueness lock acquired by the task, if any.
func (*RDB) PublishCancelation ¶
PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.
func (*RDB) ReadAggregationSet ¶
func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error)
ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and the deadline for aggregating those tasks.
func (*RDB) ReclaimStaleAggregationSets ¶
ReclaimStaleAggregationSets checks for any stale aggregation sets in the given queue, and reclaim tasks in the stale aggregation set by putting them back in the group.
func (*RDB) RecordSchedulerEnqueueEvent ¶
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error
RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (*RDB) RedisClusterInfo ¶
RedisClusterInfo returns a map of redis cluster info.
func (*RDB) RemoveQueue ¶
RemoveQueue removes the specified queue.
If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.
func (*RDB) Retry ¶
func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
Retry moves the task from active to retry queue. It also annotates the message with the given error message and if isFailure is true increments the retried counter.
func (*RDB) RunAllAggregatingTasks ¶
RunAllAggregatingTasks schedules all tasks from the given queue to run and returns the number of tasks scheduled to run. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) RunAllArchivedTasks ¶
RunAllArchivedTasks enqueues all archived tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) RunAllRetryTasks ¶
RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) RunAllScheduledTasks ¶
RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.
func (*RDB) RunTask ¶
RunTask finds a task that matches the id from the given queue and updates it to pending state. It returns nil if it successfully updated the task.
If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.
func (*RDB) ScheduleUnique ¶
func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.
func (*RDB) SetClock ¶
SetClock sets the clock used by RDB to the given clock.
Use this function to set the clock to SimulatedClock in tests.
func (*RDB) WriteResult ¶
WriteResult writes the given result data for the specified task.
func (*RDB) WriteSchedulerEntries ¶
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error
WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
func (*RDB) WriteServerState ¶
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error
WriteServerState writes server state data to redis with expiration set to the value ttl.
type Stats ¶
type Stats struct { // Name of the queue (e.g. "default", "critical"). Queue string // MemoryUsage is the total number of bytes the queue and its tasks require // to be stored in redis. It is an approximate memory usage value in bytes // since the value is computed by sampling. MemoryUsage int64 // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool // Size is the total number of tasks in the queue. Size int // Groups is the total number of groups in the queue. Groups int // Number of tasks in each state. Pending int Active int Scheduled int Retry int Archived int Completed int Aggregating int // Number of tasks processed within the current date. // The number includes both succeeded and failed tasks. Processed int // Number of tasks failed within the current date. Failed int // Total number of tasks processed (both succeeded and failed) from this queue. ProcessedTotal int // Total number of tasks failed. FailedTotal int // Latency of the queue, measured by the oldest pending task in the queue. Latency time.Duration // Time this stats was taken. Timestamp time.Time }
Stats represents a state of queues at a certain time.