rdb

package
v0.25.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

View Source
const LeaseDuration = 30 * time.Second

LeaseDuration is the duration used to initially create a lease and to extend it thereafter.

Variables

This section is empty.

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Total number of tasks processed during the given day.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the given day.
	Failed int
	// Date this stats was taken.
	Time time.Time
}

DailyStats holds aggregate data for a given day.

type GroupStat

type GroupStat struct {
	// Name of the group.
	Group string

	// Size of the group.
	Size int
}

type Pagination

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client redis.UniversalClient) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) AddToGroup

func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string) error

func (*RDB) AddToGroupUnique

func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, groupKey string, ttl time.Duration) error

func (*RDB) AggregationCheck

func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error)

AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns the set ID. If not, it returns an empty string for the set ID. The time for gracePeriod and maxDelay is computed relative to the time t.

Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words, the function only checks the most recently added task against the given gracePeriod.

func (*RDB) AllQueues

func (r *RDB) AllQueues() ([]string, error)

AllQueues returns a list of all queue names.

func (*RDB) Archive

func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error

Archive sends the given task to archive, attaching the error message to the task. It also trims the archive by timestamp and set size.

func (*RDB) ArchiveAllAggregatingTasks

func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error)

ArchiveAllAggregatingTasks archives all aggregating tasks from the given group and returns the number of tasks archived. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllPendingTasks

func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error)

ArchiveAllPendingTasks archives all pending tasks from the given queue and returns the number of tasks moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllRetryTasks

func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error)

ArchiveAllRetryTasks archives all retry tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveAllScheduledTasks

func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error)

ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and returns the number of tasks that were moved. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) ArchiveTask

func (r *RDB) ArchiveTask(qname, id string) error

ArchiveTask finds a task that matches the id from the given queue and archives it. It returns nil if it successfully archived the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is already archived, it returns TaskAlreadyArchivedError. If a task is in active state it returns non-nil error with FailedPrecondition code.

func (*RDB) CancelationPubSub

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) ClearSchedulerEntries

func (r *RDB) ClearSchedulerEntries(schedulerID string) error

ClearSchedulerEntries deletes scheduler entries data from redis.

func (*RDB) ClearSchedulerHistory

func (r *RDB) ClearSchedulerHistory(entryID string) error

ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.

func (*RDB) ClearServerState

func (r *RDB) ClearServerState(host string, pid int, serverID string) error

ClearServerState deletes server state data from redis.

func (*RDB) Client

func (r *RDB) Client() redis.UniversalClient

Client returns the reference to underlying redis client.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) ClusterKeySlot

func (r *RDB) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*RDB) ClusterNodes

func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)

ClusterNodes returns a list of nodes the given queue belongs to.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats(qname string) (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteAggregationSet

func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error

DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.

func (*RDB) DeleteAllAggregatingTasks

func (r *RDB) DeleteAllAggregatingTasks(qname, gname string) (int64, error)

DeleteAllAggregatingTasks deletes all aggregating tasks from the given group and returns the number of tasks deleted.

func (*RDB) DeleteAllArchivedTasks

func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error)

DeleteAllArchivedTasks deletes all archived tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllCompletedTasks

func (r *RDB) DeleteAllCompletedTasks(qname string) (int64, error)

DeleteAllCompletedTasks deletes all completed tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllPendingTasks

func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)

DeleteAllPendingTasks deletes all pending tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)

DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteExpiredCompletedTasks

func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error

DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, and delete all expired tasks.

func (*RDB) DeleteTask

func (r *RDB) DeleteTask(qname, id string) error

DeleteTask finds a task that matches the id from the given queue and deletes it. It returns nil if it successfully archived the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active state it returns non-nil error with Code FailedPrecondition.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error)

Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and its lease expiration time. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error

Done removes the task from active queue and deletes the task. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error

Enqueue adds the given task to the pending list of the queue.

func (*RDB) EnqueueUnique

func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) ExtendLease

func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error)

ExtendLease extends the lease for the given tasks by LeaseDuration (30s). It returns a new expiration time if the operation was successful.

func (*RDB) ForwardIfReady

func (r *RDB) ForwardIfReady(qnames ...string) error

ForwardIfReady checks scheduled and retry sets of the given queues and move any tasks that are ready to be processed to the pending set.

func (*RDB) GetTaskInfo

func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error)

GetTaskInfo returns a TaskInfo describing the task from the given queue.

func (*RDB) GroupStats

func (r *RDB) GroupStats(qname string) ([]*GroupStat, error)

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days for the given queue.

func (*RDB) ListActive

func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListActive returns all tasks that are currently being processed for the given queue.

func (*RDB) ListAggregating

func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.TaskInfo, error)

ListAggregating returns all tasks from the given group.

func (*RDB) ListArchived

func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListArchived returns all tasks from the given queue that have exhausted its retry limit.

func (*RDB) ListCompleted

func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListCompleted returns all tasks from the given queue that have completed successfully.

func (*RDB) ListGroups

func (r *RDB) ListGroups(qname string) ([]string, error)

ListGroups returns a list of all known groups in the given queue.

func (*RDB) ListLeaseExpired

func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error)

ListLeaseExpired returns a list of task messages with an expired lease from the given queues.

func (*RDB) ListPending

func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListPending returns pending tasks that are ready to be processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error)

ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.

func (*RDB) ListSchedulerEnqueueEvents

func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error)

ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.

func (*RDB) ListSchedulerEntries

func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error)

ListSchedulerEntries returns the list of scheduler entries.

func (*RDB) ListServers

func (r *RDB) ListServers() ([]*base.ServerInfo, error)

ListServers returns the list of server info.

func (*RDB) ListWorkers

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) MarkAsComplete

func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error

MarkAsComplete removes the task from active queue to mark the task as completed. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Pause

func (r *RDB) Pause(qname string) error

Pause pauses processing of tasks from the given queue.

func (*RDB) Ping

func (r *RDB) Ping() error

Ping checks the connection with redis server.

func (*RDB) PublishCancelation

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) ReadAggregationSet

func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error)

ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and the deadline for aggregating those tasks.

func (*RDB) ReclaimStaleAggregationSets

func (r *RDB) ReclaimStaleAggregationSets(qname string) error

ReclaimStaleAggregationSets checks for any stale aggregation sets in the given queue, and reclaim tasks in the stale aggregation set by putting them back in the group.

func (*RDB) RecordSchedulerEnqueueEvent

func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error

RecordSchedulerEnqueueEvent records the time when the given task was enqueued.

func (*RDB) RedisClusterInfo

func (r *RDB) RedisClusterInfo() (map[string]string, error)

RedisClusterInfo returns a map of redis cluster info.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error

Requeue moves the task from active queue to the specified queue.

func (*RDB) Retry

func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error

Retry moves the task from active to retry queue. It also annotates the message with the given error message and if isFailure is true increments the retried counter.

func (*RDB) RunAllAggregatingTasks

func (r *RDB) RunAllAggregatingTasks(qname, gname string) (int64, error)

RunAllAggregatingTasks schedules all tasks from the given queue to run and returns the number of tasks scheduled to run. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllArchivedTasks

func (r *RDB) RunAllArchivedTasks(qname string) (int64, error)

RunAllArchivedTasks enqueues all archived tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllRetryTasks

func (r *RDB) RunAllRetryTasks(qname string) (int64, error)

RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunAllScheduledTasks

func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)

RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued. If a queue with the given name doesn't exist, it returns QueueNotFoundError.

func (*RDB) RunTask

func (r *RDB) RunTask(qname, id string) error

RunTask finds a task that matches the id from the given queue and updates it to pending state. It returns nil if it successfully updated the task.

If a queue with the given name doesn't exist, it returns QueueNotFoundError. If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.

func (*RDB) Schedule

func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the scheduled set to be processed in the future.

func (*RDB) ScheduleUnique

func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) SetClock

func (r *RDB) SetClock(c timeutil.Clock)

SetClock sets the clock used by RDB to the given clock.

Use this function to set the clock to SimulatedClock in tests.

func (*RDB) Unpause

func (r *RDB) Unpause(qname string) error

Unpause resumes processing of tasks from the given queue.

func (*RDB) WriteResult

func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error)

WriteResult writes the given result data for the specified task.

func (*RDB) WriteSchedulerEntries

func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error

WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.

func (*RDB) WriteServerState

func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error

WriteServerState writes server state data to redis with expiration set to the value ttl.

type Stats

type Stats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// MemoryUsage is the total number of bytes the queue and its tasks require
	// to be stored in redis. It is an approximate memory usage value in bytes
	// since the value is computed by sampling.
	MemoryUsage int64
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool
	// Size is the total number of tasks in the queue.
	Size int

	// Groups is the total number of groups in the queue.
	Groups int

	// Number of tasks in each state.
	Pending     int
	Active      int
	Scheduled   int
	Retry       int
	Archived    int
	Completed   int
	Aggregating int

	// Number of tasks processed within the current date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Number of tasks failed within the current date.
	Failed int

	// Total number of tasks processed (both succeeded and failed) from this queue.
	ProcessedTotal int
	// Total number of tasks failed.
	FailedTotal int

	// Latency of the queue, measured by the oldest pending task in the queue.
	Latency time.Duration
	// Time this stats was taken.
	Timestamp time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL