Documentation ¶
Overview ¶
Package inspeq provides helper types and functions to inspect queues and tasks managed by Asynq.
Inspector is used to query and mutate the state of queues and tasks.
Example:
inspector := inspeq.New(asynq.RedisClientOpt{Addr: "localhost:6379"}) tasks, err := inspector.ListArchivedTasks("my-queue") for _, t := range tasks { if err := inspector.DeleteTaskByKey(t.Key()); err != nil { // handle error } }
Index ¶
- type ActiveTask
- type ArchivedTask
- type ClusterNode
- type DailyStats
- type ErrQueueNotEmpty
- type ErrQueueNotFound
- type Inspector
- func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error)
- func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error)
- func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error)
- func (i *Inspector) ArchiveTaskByKey(qname, key string) error
- func (i *Inspector) CancelActiveTask(id string) error
- func (i *Inspector) Close() error
- func (i *Inspector) ClusterKeySlot(qname string) (int64, error)
- func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error)
- func (i *Inspector) CurrentStats(qname string) (*QueueStats, error)
- func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error)
- func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error)
- func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error)
- func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error)
- func (i *Inspector) DeleteQueue(qname string, force bool) error
- func (i *Inspector) DeleteTaskByKey(qname, key string) error
- func (i *Inspector) History(qname string, n int) ([]*DailyStats, error)
- func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error)
- func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error)
- func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error)
- func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error)
- func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error)
- func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error)
- func (i *Inspector) PauseQueue(qname string) error
- func (i *Inspector) Queues() ([]string, error)
- func (i *Inspector) RunAllArchivedTasks(qname string) (int, error)
- func (i *Inspector) RunAllRetryTasks(qname string) (int, error)
- func (i *Inspector) RunAllScheduledTasks(qname string) (int, error)
- func (i *Inspector) RunTaskByKey(qname, key string) error
- func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error)
- func (i *Inspector) Servers() ([]*ServerInfo, error)
- func (i *Inspector) UnpauseQueue(qname string) error
- type ListOption
- type PendingTask
- type QueueStats
- type RetryTask
- type ScheduledTask
- type SchedulerEnqueueEvent
- type SchedulerEntry
- type ServerInfo
- type WorkerInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActiveTask ¶
type ActiveTask struct { *asynq.Task ID string Queue string MaxRetry int Retried int LastError string }
ActiveTask is a task that's currently being processed.
type ArchivedTask ¶
type ArchivedTask struct { *asynq.Task ID string Queue string MaxRetry int Retried int LastFailedAt time.Time LastError string // contains filtered or unexported fields }
ArchivedTask is a task archived for debugging and inspection purposes, and it won't be retried automatically. A task can be archived when the task exhausts its retry counts or manually archived by a user via the CLI or Inspector.
func (*ArchivedTask) Key ¶
func (t *ArchivedTask) Key() string
Key returns a key used to delete and run the archived task.
type ClusterNode ¶
type ClusterNode struct { // Node ID in the cluster. ID string // Address of the node. Addr string }
ClusterNode describes a node in redis cluster.
type DailyStats ¶
type DailyStats struct { // Name of the queue. Queue string // Total number of tasks being processed during the given date. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed to be processed during the given date. Failed int // Date this stats was taken. Date time.Time }
DailyStats holds aggregate data for a given day for a given queue.
type ErrQueueNotEmpty ¶
type ErrQueueNotEmpty struct {
// contains filtered or unexported fields
}
ErrQueueNotEmpty indicates that the specified queue is not empty.
func (*ErrQueueNotEmpty) Error ¶
func (e *ErrQueueNotEmpty) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
// contains filtered or unexported fields
}
ErrQueueNotFound indicates that the specified queue does not exist.
func (*ErrQueueNotFound) Error ¶
func (e *ErrQueueNotFound) Error() string
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector is a client interface to inspect and mutate the state of queues and tasks.
func (*Inspector) ArchiveAllPendingTasks ¶
ArchiveAllPendingTasks archives all pending tasks from the given queue, and reports the number of tasks archived.
func (*Inspector) ArchiveAllRetryTasks ¶
ArchiveAllRetryTasks archives all retry tasks from the given queue, and reports the number of tasks archiveed.
func (*Inspector) ArchiveAllScheduledTasks ¶
ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, and reports the number of tasks archiveed.
func (*Inspector) ArchiveTaskByKey ¶
ArchiveTaskByKey archives a task with the given key in the given queue.
func (*Inspector) CancelActiveTask ¶
CancelActiveTask sends a signal to cancel processing of the task with the given id. CancelActiveTask is best-effort, which means that it does not guarantee that the task with the given id will be canceled. The return value only indicates whether the cancelation signal has been sent.
func (*Inspector) ClusterKeySlot ¶
ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (*Inspector) ClusterNodes ¶
func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error)
ClusterNodes returns a list of nodes the given queue belongs to.
func (*Inspector) CurrentStats ¶
func (i *Inspector) CurrentStats(qname string) (*QueueStats, error)
CurrentStats returns a current stats of the given queue.
func (*Inspector) DeleteAllArchivedTasks ¶
DeleteAllArchivedTasks deletes all archived tasks from the specified queue, and reports the number tasks deleted.
func (*Inspector) DeleteAllPendingTasks ¶
DeleteAllPendingTasks deletes all pending tasks from the specified queue, and reports the number tasks deleted.
func (*Inspector) DeleteAllRetryTasks ¶
DeleteAllRetryTasks deletes all retry tasks from the specified queue, and reports the number tasks deleted.
func (*Inspector) DeleteAllScheduledTasks ¶
DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, and reports the number tasks deleted.
func (*Inspector) DeleteQueue ¶
DeleteQueue removes the specified queue.
If force is set to true, DeleteQueue will remove the queue regardless of the queue size as long as no tasks are active in the queue. If force is set to false, DeleteQueue will remove the queue only if the queue is empty.
If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound. If force is set to false and the specified queue is not empty, DeleteQueue returns ErrQueueNotEmpty.
func (*Inspector) DeleteTaskByKey ¶
DeleteTaskByKey deletes a task with the given key from the given queue.
func (*Inspector) History ¶
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error)
History returns a list of stats from the last n days.
func (*Inspector) ListActiveTasks ¶
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error)
ListActiveTasks retrieves active tasks from the specified queue.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListArchivedTasks ¶
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error)
ListArchivedTasks retrieves archived tasks from the specified queue. Tasks are sorted by LastFailedAt field in descending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListPendingTasks ¶
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error)
ListPendingTasks retrieves pending tasks from the specified queue.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListRetryTasks ¶
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error)
ListRetryTasks retrieves retry tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListScheduledTasks ¶
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error)
ListScheduledTasks retrieves scheduled tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListSchedulerEnqueueEvents ¶
func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error)
ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
By default, it retrieves the first 30 tasks.
func (*Inspector) PauseQueue ¶
PauseQueue pauses task processing on the specified queue. If the queue is already paused, it will return a non-nil error.
func (*Inspector) RunAllArchivedTasks ¶
RunAllArchivedTasks transition all archived tasks to pending state from the given queue, and reports the number of tasks transitioned.
func (*Inspector) RunAllRetryTasks ¶
RunAllRetryTasks transition all retry tasks to pending state from the given queue, and reports the number of tasks transitioned.
func (*Inspector) RunAllScheduledTasks ¶
RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, and reports the number of tasks transitioned.
func (*Inspector) RunTaskByKey ¶
RunTaskByKey transition a task to pending state given task key and queue name.
func (*Inspector) SchedulerEntries ¶
func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error)
SchedulerEntries returns a list of all entries registered with currently running schedulers.
func (*Inspector) Servers ¶
func (i *Inspector) Servers() ([]*ServerInfo, error)
Servers return a list of running servers' information.
func (*Inspector) UnpauseQueue ¶
UnpauseQueue resumes task processing on the specified queue. If the queue is not paused, it will return a non-nil error.
type ListOption ¶
type ListOption interface{}
ListOption specifies behavior of list operation.
func Page ¶
func Page(n int) ListOption
Page returns an option to specify the page number for list operation. The value 1 fetches the first page.
Negative page number is treated as one.
func PageSize ¶
func PageSize(n int) ListOption
PageSize returns an option to specify the page size for list operation.
Negative page size is treated as zero.
type PendingTask ¶
type PendingTask struct { *asynq.Task ID string Queue string MaxRetry int Retried int LastError string }
PendingTask is a task in a queue and is ready to be processed.
func (*PendingTask) Key ¶
func (t *PendingTask) Key() string
Key returns a key used to delete, and archive the pending task.
type QueueStats ¶
type QueueStats struct { // Name of the queue. Queue string // Total number of bytes that the queue and its tasks require to be stored in redis. MemoryUsage int64 // Size is the total number of tasks in the queue. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. Size int // Number of pending tasks. Pending int // Number of active tasks. Active int // Number of scheduled tasks. Scheduled int // Number of retry tasks. Retry int // Number of archived tasks. Archived int // Total number of tasks being processed during the given date. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed to be processed during the given date. Failed int // Paused indicates whether the queue is paused. // If true, tasks in the queue will not be processed. Paused bool // Time when this stats was taken. Timestamp time.Time }
QueueStats represents a state of queues at a certain time.
type RetryTask ¶
type RetryTask struct { *asynq.Task ID string Queue string NextProcessAt time.Time MaxRetry int Retried int LastError string // contains filtered or unexported fields }
RetryTask is a task scheduled to be retried in the future.
type ScheduledTask ¶
type ScheduledTask struct { *asynq.Task ID string Queue string MaxRetry int Retried int LastError string NextProcessAt time.Time // contains filtered or unexported fields }
ScheduledTask is a task scheduled to be processed in the future.
func (*ScheduledTask) Key ¶
func (t *ScheduledTask) Key() string
Key returns a key used to delete, run, and archive the scheduled task.
type SchedulerEnqueueEvent ¶
type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. TaskID string // Time the task was enqueued. EnqueuedAt time.Time }
SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEntry ¶
type SchedulerEntry struct { // Identifier of this entry. ID string // Spec describes the schedule of this entry. Spec string // Periodic Task registered for this entry. Task *asynq.Task // Opts is the options for the periodic task. Opts []asynq.Option // Next shows the next time the task will be enqueued. Next time.Time // Prev shows the last time the task was enqueued. // Zero time if task was never enqueued. Prev time.Time }
SchedulerEntry holds information about a periodic task registered with a scheduler.
type ServerInfo ¶
type ServerInfo struct { // Unique Identifier for the server. ID string // Host machine on which the server is running. Host string // PID of the process in which the server is running. PID int // Server configuration details. // See Config doc for field descriptions. Concurrency int Queues map[string]int StrictPriority bool // Time the server started. Started time.Time // Status indicates the status of the server. // TODO: Update comment with more details. Status string // A List of active workers currently processing tasks. ActiveWorkers []*WorkerInfo }
ServerInfo describes a running Server instance.
type WorkerInfo ¶
type WorkerInfo struct { // The task the worker is processing. Task *ActiveTask // Time the worker started processing the task. Started time.Time // Time the worker needs to finish processing the task by. Deadline time.Time }
WorkerInfo describes a running worker processing a task.