inspeq

package
v0.17.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package inspeq provides helper types and functions to inspect queues and tasks managed by Asynq.

Inspector is used to query and mutate the state of queues and tasks.

Example:

inspector := inspeq.New(asynq.RedisClientOpt{Addr: "localhost:6379"})

tasks, err := inspector.ListArchivedTasks("my-queue")

for _, t := range tasks {
    if err := inspector.DeleteTaskByKey(t.Key()); err != nil {
        // handle error
    }
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActiveTask

type ActiveTask struct {
	*asynq.Task
	ID        string
	Queue     string
	MaxRetry  int
	Retried   int
	LastError string
}

ActiveTask is a task that's currently being processed.

type ArchivedTask

type ArchivedTask struct {
	*asynq.Task
	ID           string
	Queue        string
	MaxRetry     int
	Retried      int
	LastFailedAt time.Time
	LastError    string
	// contains filtered or unexported fields
}

ArchivedTask is a task archived for debugging and inspection purposes, and it won't be retried automatically. A task can be archived when the task exhausts its retry counts or manually archived by a user via the CLI or Inspector.

func (*ArchivedTask) Key

func (t *ArchivedTask) Key() string

Key returns a key used to delete and run the archived task.

type ClusterNode

type ClusterNode struct {
	// Node ID in the cluster.
	ID string

	// Address of the node.
	Addr string
}

ClusterNode describes a node in redis cluster.

type DailyStats

type DailyStats struct {
	// Name of the queue.
	Queue string
	// Total number of tasks being processed during the given date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed during the given date.
	Failed int
	// Date this stats was taken.
	Date time.Time
}

DailyStats holds aggregate data for a given day for a given queue.

type ErrQueueNotEmpty

type ErrQueueNotEmpty struct {
	// contains filtered or unexported fields
}

ErrQueueNotEmpty indicates that the specified queue is not empty.

func (*ErrQueueNotEmpty) Error

func (e *ErrQueueNotEmpty) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	// contains filtered or unexported fields
}

ErrQueueNotFound indicates that the specified queue does not exist.

func (*ErrQueueNotFound) Error

func (e *ErrQueueNotFound) Error() string

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector is a client interface to inspect and mutate the state of queues and tasks.

func New

New returns a new instance of Inspector.

func (*Inspector) ArchiveAllPendingTasks

func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error)

ArchiveAllPendingTasks archives all pending tasks from the given queue, and reports the number of tasks archived.

func (*Inspector) ArchiveAllRetryTasks

func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error)

ArchiveAllRetryTasks archives all retry tasks from the given queue, and reports the number of tasks archiveed.

func (*Inspector) ArchiveAllScheduledTasks

func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error)

ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, and reports the number of tasks archiveed.

func (*Inspector) ArchiveTaskByKey

func (i *Inspector) ArchiveTaskByKey(qname, key string) error

ArchiveTaskByKey archives a task with the given key in the given queue.

func (*Inspector) CancelActiveTask

func (i *Inspector) CancelActiveTask(id string) error

CancelActiveTask sends a signal to cancel processing of the task with the given id. CancelActiveTask is best-effort, which means that it does not guarantee that the task with the given id will be canceled. The return value only indicates whether the cancelation signal has been sent.

func (*Inspector) Close

func (i *Inspector) Close() error

Close closes the connection with redis.

func (*Inspector) ClusterKeySlot

func (i *Inspector) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*Inspector) ClusterNodes

func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error)

ClusterNodes returns a list of nodes the given queue belongs to.

func (*Inspector) CurrentStats

func (i *Inspector) CurrentStats(qname string) (*QueueStats, error)

CurrentStats returns a current stats of the given queue.

func (*Inspector) DeleteAllArchivedTasks

func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error)

DeleteAllArchivedTasks deletes all archived tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteAllPendingTasks

func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error)

DeleteAllPendingTasks deletes all pending tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteAllRetryTasks

func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error)

DeleteAllRetryTasks deletes all retry tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteAllScheduledTasks

func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteQueue

func (i *Inspector) DeleteQueue(qname string, force bool) error

DeleteQueue removes the specified queue.

If force is set to true, DeleteQueue will remove the queue regardless of the queue size as long as no tasks are active in the queue. If force is set to false, DeleteQueue will remove the queue only if the queue is empty.

If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound. If force is set to false and the specified queue is not empty, DeleteQueue returns ErrQueueNotEmpty.

func (*Inspector) DeleteTaskByKey

func (i *Inspector) DeleteTaskByKey(qname, key string) error

DeleteTaskByKey deletes a task with the given key from the given queue.

func (*Inspector) History

func (i *Inspector) History(qname string, n int) ([]*DailyStats, error)

History returns a list of stats from the last n days.

func (*Inspector) ListActiveTasks

func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error)

ListActiveTasks retrieves active tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListArchivedTasks

func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error)

ListArchivedTasks retrieves archived tasks from the specified queue. Tasks are sorted by LastFailedAt field in descending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListPendingTasks

func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error)

ListPendingTasks retrieves pending tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListRetryTasks

func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error)

ListRetryTasks retrieves retry tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListScheduledTasks

func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error)

ListScheduledTasks retrieves scheduled tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListSchedulerEnqueueEvents

func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error)

ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.

By default, it retrieves the first 30 tasks.

func (*Inspector) PauseQueue

func (i *Inspector) PauseQueue(qname string) error

PauseQueue pauses task processing on the specified queue. If the queue is already paused, it will return a non-nil error.

func (*Inspector) Queues

func (i *Inspector) Queues() ([]string, error)

Queues returns a list of all queue names.

func (*Inspector) RunAllArchivedTasks

func (i *Inspector) RunAllArchivedTasks(qname string) (int, error)

RunAllArchivedTasks transition all archived tasks to pending state from the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunAllRetryTasks

func (i *Inspector) RunAllRetryTasks(qname string) (int, error)

RunAllRetryTasks transition all retry tasks to pending state from the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunAllScheduledTasks

func (i *Inspector) RunAllScheduledTasks(qname string) (int, error)

RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunTaskByKey

func (i *Inspector) RunTaskByKey(qname, key string) error

RunTaskByKey transition a task to pending state given task key and queue name.

func (*Inspector) SchedulerEntries

func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error)

SchedulerEntries returns a list of all entries registered with currently running schedulers.

func (*Inspector) Servers

func (i *Inspector) Servers() ([]*ServerInfo, error)

Servers return a list of running servers' information.

func (*Inspector) UnpauseQueue

func (i *Inspector) UnpauseQueue(qname string) error

UnpauseQueue resumes task processing on the specified queue. If the queue is not paused, it will return a non-nil error.

type ListOption

type ListOption interface{}

ListOption specifies behavior of list operation.

func Page

func Page(n int) ListOption

Page returns an option to specify the page number for list operation. The value 1 fetches the first page.

Negative page number is treated as one.

func PageSize

func PageSize(n int) ListOption

PageSize returns an option to specify the page size for list operation.

Negative page size is treated as zero.

type PendingTask

type PendingTask struct {
	*asynq.Task
	ID        string
	Queue     string
	MaxRetry  int
	Retried   int
	LastError string
}

PendingTask is a task in a queue and is ready to be processed.

func (*PendingTask) Key

func (t *PendingTask) Key() string

Key returns a key used to delete, and archive the pending task.

type QueueStats

type QueueStats struct {
	// Name of the queue.
	Queue string
	// Total number of bytes that the queue and its tasks require to be stored in redis.
	MemoryUsage int64
	// Size is the total number of tasks in the queue.
	// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
	Size int
	// Number of pending tasks.
	Pending int
	// Number of active tasks.
	Active int
	// Number of scheduled tasks.
	Scheduled int
	// Number of retry tasks.
	Retry int
	// Number of archived tasks.
	Archived int
	// Total number of tasks being processed during the given date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed during the given date.
	Failed int
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue will not be processed.
	Paused bool
	// Time when this stats was taken.
	Timestamp time.Time
}

QueueStats represents a state of queues at a certain time.

type RetryTask

type RetryTask struct {
	*asynq.Task
	ID            string
	Queue         string
	NextProcessAt time.Time
	MaxRetry      int
	Retried       int
	LastError     string
	// contains filtered or unexported fields
}

RetryTask is a task scheduled to be retried in the future.

func (*RetryTask) Key

func (t *RetryTask) Key() string

Key returns a key used to delete, run, and archive the retry task.

type ScheduledTask

type ScheduledTask struct {
	*asynq.Task
	ID            string
	Queue         string
	MaxRetry      int
	Retried       int
	LastError     string
	NextProcessAt time.Time
	// contains filtered or unexported fields
}

ScheduledTask is a task scheduled to be processed in the future.

func (*ScheduledTask) Key

func (t *ScheduledTask) Key() string

Key returns a key used to delete, run, and archive the scheduled task.

type SchedulerEnqueueEvent

type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string

	// Time the task was enqueued.
	EnqueuedAt time.Time
}

SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.

type SchedulerEntry

type SchedulerEntry struct {
	// Identifier of this entry.
	ID string

	// Spec describes the schedule of this entry.
	Spec string

	// Periodic Task registered for this entry.
	Task *asynq.Task

	// Opts is the options for the periodic task.
	Opts []asynq.Option

	// Next shows the next time the task will be enqueued.
	Next time.Time

	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}

SchedulerEntry holds information about a periodic task registered with a scheduler.

type ServerInfo

type ServerInfo struct {
	// Unique Identifier for the server.
	ID string
	// Host machine on which the server is running.
	Host string
	// PID of the process in which the server is running.
	PID int

	// Server configuration details.
	// See Config doc for field descriptions.
	Concurrency    int
	Queues         map[string]int
	StrictPriority bool

	// Time the server started.
	Started time.Time
	// Status indicates the status of the server.
	// TODO: Update comment with more details.
	Status string
	// A List of active workers currently processing tasks.
	ActiveWorkers []*WorkerInfo
}

ServerInfo describes a running Server instance.

type WorkerInfo

type WorkerInfo struct {
	// The task the worker is processing.
	Task *ActiveTask
	// Time the worker started processing the task.
	Started time.Time
	// Time the worker needs to finish processing the task by.
	Deadline time.Time
}

WorkerInfo describes a running worker processing a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL