rdb

package
v0.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package rdb encapsulates the interactions with redis.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
	ErrNoProcessableTask = errors.New("no tasks are ready for processing")

	// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
	ErrTaskNotFound = errors.New("could not find a task")

	// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
	ErrDuplicateTask = errors.New("task already exists")
)

Functions

This section is empty.

Types

type DailyStats

type DailyStats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Total number of tasks processed during the given day.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the given day.
	Failed int
	// Date this stats was taken.
	Time time.Time
}

DailyStats holds aggregate data for a given day.

type ErrQueueNotEmpty

type ErrQueueNotEmpty struct {
	// contains filtered or unexported fields
}

ErrQueueNotEmpty indicates specified queue is not empty.

func (*ErrQueueNotEmpty) Error

func (e *ErrQueueNotEmpty) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	// contains filtered or unexported fields
}

ErrQueueNotFound indicates specified queue does not exist.

func (*ErrQueueNotFound) Error

func (e *ErrQueueNotFound) Error() string

type Pagination

type Pagination struct {
	// Number of items in the page.
	Size int

	// Page number starting from zero.
	Page int
}

Pagination specifies the page size and page number for the list operation.

type RDB

type RDB struct {
	// contains filtered or unexported fields
}

RDB is a client interface to query and mutate task queues.

func NewRDB

func NewRDB(client redis.UniversalClient) *RDB

NewRDB returns a new instance of RDB.

func (*RDB) AllQueues

func (r *RDB) AllQueues() ([]string, error)

AllQueues returns a list of all queue names.

func (*RDB) CancelationPubSub

func (r *RDB) CancelationPubSub() (*redis.PubSub, error)

CancelationPubSub returns a pubsub for cancelation messages.

func (*RDB) CheckAndEnqueue

func (r *RDB) CheckAndEnqueue(qnames ...string) error

CheckAndEnqueue checks for scheduled/retry tasks for the given queues and enqueues any tasks that are ready to be processed.

func (*RDB) ClearServerState

func (r *RDB) ClearServerState(host string, pid int, serverID string) error

ClearServerState deletes server state data from redis.

func (*RDB) Close

func (r *RDB) Close() error

Close closes the connection with redis server.

func (*RDB) ClusterKeySlot

func (r *RDB) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*RDB) ClusterNodes

func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error)

ClusterNodes returns a list of nodes the given queue belongs to.

func (*RDB) CurrentStats

func (r *RDB) CurrentStats(qname string) (*Stats, error)

CurrentStats returns a current state of the queues.

func (*RDB) DeleteActiveTask

func (r *RDB) DeleteActiveTask(qname string, id uuid.UUID, score int64) error

func (*RDB) DeleteAllActiveTasks

func (r *RDB) DeleteAllActiveTasks(qname string) (int64, error)

func (*RDB) DeleteAllDeadTasks

func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error)

DeleteAllDeadTasks deletes all dead tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllPendingTasks

func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error)

func (*RDB) DeleteAllRetryTasks

func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error)

DeleteAllRetryTasks deletes all retry tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteAllScheduledTasks

func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the given queue and returns the number of tasks deleted.

func (*RDB) DeleteDeadTask

func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error

DeleteDeadTask deletes a dead task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeletePendingTask

func (r *RDB) DeletePendingTask(qname string, id uuid.UUID, score int64) error

func (*RDB) DeleteRetryTask

func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error

DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) DeleteScheduledTask

func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error

DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Dequeue

func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error)

Dequeue queries given queues in order and pops a task message off a queue if one exists and returns the message and deadline. Dequeue skips a queue if the queue is paused. If all queues are empty, ErrNoProcessableTask error is returned.

func (*RDB) Done

func (r *RDB) Done(msg *base.TaskMessage) error

Done removes the task from active queue to mark the task as done. It removes a uniqueness lock acquired by the task, if any.

func (*RDB) Enqueue

func (r *RDB) Enqueue(msg *base.TaskMessage) error

Enqueue inserts the given task to the tail of the queue.

func (*RDB) EnqueueUnique

func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error

EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) HistoricalStats

func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error)

HistoricalStats returns a list of stats from the last n days for the given queue.

func (*RDB) Kill

func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error

Kill sends the task to "dead" queue from active queue, assigning the error message to the task. It also trims the set by timestamp and set size.

func (*RDB) KillAllRetryTasks

func (r *RDB) KillAllRetryTasks(qname string) (int64, error)

KillAllRetryTasks kills all retry tasks from the given queue and returns the number of tasks that were moved.

func (*RDB) KillAllScheduledTasks

func (r *RDB) KillAllScheduledTasks(qname string) (int64, error)

KillAllScheduledTasks kills all scheduled tasks from the given queue and returns the number of tasks that were moved.

func (*RDB) KillRetryTask

func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error

KillRetryTask finds a retry task that matches the given id and score from the given queue and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) KillScheduledTask

func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error

KillScheduledTask finds a scheduled task that matches the given id and score from the given queue and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) ListActive

func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error)

ListActive returns all tasks that are currently being processed for the given queue.

func (*RDB) ListDead

func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error)

ListDead returns all tasks from the given queue that have exhausted its retry limit.

func (*RDB) ListDeadlineExceeded

func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error)

ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.

func (*RDB) ListPending

func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error)

ListPending returns pending tasks that are ready to be processed.

func (*RDB) ListRetry

func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error)

ListRetry returns all tasks from the given queue that have failed before and willl be retried in the future.

func (*RDB) ListScheduled

func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error)

ListScheduled returns all tasks from the given queue that are scheduled to be processed in the future.

func (*RDB) ListServers

func (r *RDB) ListServers() ([]*base.ServerInfo, error)

ListServers returns the list of server info.

func (*RDB) ListWorkers

func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error)

ListWorkers returns the list of worker stats.

func (*RDB) Pause

func (r *RDB) Pause(qname string) error

Pause pauses processing of tasks from the given queue.

func (*RDB) Ping

func (r *RDB) Ping() error

Ping checks the connection with redis server.

func (*RDB) PublishCancelation

func (r *RDB) PublishCancelation(id string) error

PublishCancelation publish cancelation message to all subscribers. The message is the ID for the task to be canceled.

func (*RDB) RedisClusterInfo

func (r *RDB) RedisClusterInfo() (map[string]string, error)

RedisClusterInfo returns a map of redis cluster info.

func (*RDB) RedisInfo

func (r *RDB) RedisInfo() (map[string]string, error)

RedisInfo returns a map of redis info.

func (*RDB) RemoveQueue

func (r *RDB) RemoveQueue(qname string, force bool) error

RemoveQueue removes the specified queue.

If force is set to true, it will remove the queue regardless as long as no tasks are active for the queue. If force is set to false, it will only remove the queue if the queue is empty.

func (*RDB) Requeue

func (r *RDB) Requeue(msg *base.TaskMessage) error

Requeue moves the task from active queue to the specified queue.

func (*RDB) Retry

func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error

Retry moves the task from active to retry queue, incrementing retry count and assigning error message to the task message.

func (*RDB) RunAllDeadTasks

func (r *RDB) RunAllDeadTasks(qname string) (int64, error)

RunAllDeadTasks enqueues all tasks from dead queue and returns the number of tasks enqueued.

func (*RDB) RunAllRetryTasks

func (r *RDB) RunAllRetryTasks(qname string) (int64, error)

RunAllRetryTasks enqueues all retry tasks from the given queue and returns the number of tasks enqueued.

func (*RDB) RunAllScheduledTasks

func (r *RDB) RunAllScheduledTasks(qname string) (int64, error)

RunAllScheduledTasks enqueues all scheduled tasks from the given queue and returns the number of tasks enqueued.

func (*RDB) RunDeadTask

func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error

RunDeadTask finds a dead task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) RunRetryTask

func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error

RunRetryTask finds a retry task that matches the given id and score from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) RunScheduledTask

func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error

RunScheduledTask finds a scheduled task that matches the given id and score from from the given queue and enqueues it for processing. If a task that matches the id and score does not exist, it returns ErrTaskNotFound.

func (*RDB) Schedule

func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error

Schedule adds the task to the backlog queue to be processed in the future.

func (*RDB) ScheduleUnique

func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error

ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. It returns ErrDuplicateTask if the lock cannot be acquired.

func (*RDB) Unpause

func (r *RDB) Unpause(qname string) error

Unpause resumes processing of tasks from the given queue.

func (*RDB) WriteServerState

func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error

WriteServerState writes server state data to redis with expiration set to the value ttl.

type Stats

type Stats struct {
	// Name of the queue (e.g. "default", "critical").
	Queue string
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue should not be processed.
	Paused bool
	// Size is the total number of tasks in the queue.
	Size int
	// Number of tasks in each state.
	Pending   int
	Active    int
	Scheduled int
	Retry     int
	Dead      int
	// Total number of tasks processed during the current date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed during the current date.
	Failed int
	// Time this stats was taken.
	Timestamp time.Time
}

Stats represents a state of queues at a certain time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL