Documentation
¶
Overview ¶
Package asynq provides a framework for asynchronous task processing.
Asynq uses Redis as a message broker. To connect to redis server, specify the options using one of RedisConnOpt types.
redis = &asynq.RedisClientOpt{ Addr: "127.0.0.1:6379", Password: "xxxxx", DB: 3, }
The Client is used to enqueue a task to be processed at the specified time.
Task is created with two parameters: its type and payload.
client := asynq.NewClient(redis) t := asynq.NewTask( "send_email", map[string]interface{}{"user_id": 42}) // Enqueue the task to be processed immediately. res, err := client.Enqueue(t) // Schedule the task to be processed after one minute. res, err = client.EnqueueIn(time.Minute, t)
The Server is used to run the background task processing with a given handler.
srv := asynq.NewServer(redis, asynq.Config{ Concurrency: 10, }) srv.Run(handler)
Handler is an interface type with a method which takes a task and returns an error. Handler should return nil if the processing is successful, otherwise return a non-nil error. If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct { // ... } func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { switch task.Type { case "send_email": id, err := task.Payload.GetInt("user_id") // send email //... default: return fmt.Errorf("unexpected task type %q", task.Type) } return nil }
Index ¶
- Variables
- func GetMaxRetry(ctx context.Context) (n int, ok bool)
- func GetRetryCount(ctx context.Context) (n int, ok bool)
- func GetTaskID(ctx context.Context) (id string, ok bool)
- func NotFound(ctx context.Context, task *Task) error
- func SetRedisBasePrefix(prefix string)
- type Client
- func (c *Client) Close() error
- func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error)
- func (c *Client) EnqueueAsync(t time.Time, task *Task, opts ...Option) (*Result, <-chan error)
- func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error)
- func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) (*Result, error)
- func (c *Client) SetDefaultOptions(taskType string, opts ...Option)
- type Config
- type DailyStats
- type DeadTask
- type EnqueuedTask
- type ErrorHandler
- type ErrorHandlerFunc
- type Handler
- type HandlerFunc
- type InProgressTask
- type Inspector
- func (i *Inspector) CurrentStats() (*Stats, error)
- func (i *Inspector) DeleteAllDeadTasks() (int, error)
- func (i *Inspector) DeleteAllRetryTasks() (int, error)
- func (i *Inspector) DeleteAllScheduledTasks() (int, error)
- func (i *Inspector) DeleteTaskByKey(key string) error
- func (i *Inspector) EnqueueAllDeadTasks() (int, error)
- func (i *Inspector) EnqueueAllRetryTasks() (int, error)
- func (i *Inspector) EnqueueAllScheduledTasks() (int, error)
- func (i *Inspector) EnqueueTaskByKey(key string) error
- func (i *Inspector) History(n int) ([]*DailyStats, error)
- func (i *Inspector) KillAllRetryTasks() (int, error)
- func (i *Inspector) KillAllScheduledTasks() (int, error)
- func (i *Inspector) KillTaskByKey(key string) error
- func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error)
- func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error)
- func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error)
- func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error)
- func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error)
- func (i *Inspector) PauseQueue(qname string) error
- func (i *Inspector) UnpauseQueue(qname string) error
- type ListOption
- type LogLevel
- type Logger
- type MiddlewareFunc
- type Option
- type Payload
- func (p Payload) Bind(data interface{}) error
- func (p Payload) GetBool(key string) (bool, error)
- func (p Payload) GetDuration(key string) (time.Duration, error)
- func (p Payload) GetFloat64(key string) (float64, error)
- func (p Payload) GetInt(key string) (int, error)
- func (p Payload) GetIntSlice(key string) ([]int, error)
- func (p Payload) GetString(key string) (string, error)
- func (p Payload) GetStringMap(key string) (map[string]interface{}, error)
- func (p Payload) GetStringMapBool(key string) (map[string]bool, error)
- func (p Payload) GetStringMapInt(key string) (map[string]int, error)
- func (p Payload) GetStringMapString(key string) (map[string]string, error)
- func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error)
- func (p Payload) GetStringSlice(key string) ([]string, error)
- func (p Payload) GetTime(key string) (time.Time, error)
- func (p Payload) Has(key string) bool
- type QueueInfo
- type RedisClientOpt
- type RedisConnOpt
- type RedisFailoverClientOpt
- type Result
- type RetryTask
- type ScheduledTask
- type ServeMux
- func (mux *ServeMux) Handle(pattern string, handler Handler)
- func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)
- func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string)
- func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error
- func (mux *ServeMux) Use(mws ...MiddlewareFunc)
- type Server
- type Stats
- type Task
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicateTask = errors.New("task already exists")
ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
var ErrServerStopped = errors.New("asynq: the server has been stopped")
ErrServerStopped indicates that the operation is now illegal because of the server being stopped.
Functions ¶
func GetMaxRetry ¶
GetMaxRetry extracts maximum retry from a context, if any.
Return value n indicates the maximum number of times the assoicated task can be retried if ProcessTask returns a non-nil error.
func GetRetryCount ¶
GetRetryCount extracts retry count from a context, if any.
Return value n indicates the number of times associated task has been retried so far.
func GetTaskID ¶
GetTaskID extracts a task ID from a context, if any.
ID of a task is guaranteed to be unique. ID of a task doesn't change if the task is being retried.
func NotFound ¶
NotFound returns an error indicating that the handler was not found for the given task.
func SetRedisBasePrefix ¶
func SetRedisBasePrefix(prefix string)
set redis base prefix only can set once before client or server launch
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
A Client is responsible for scheduling tasks.
A Client is used to register tasks that should be processed immediately or some time in the future.
Clients are safe for concurrent use by multiple goroutines.
func NewClient ¶
func NewClient(r RedisConnOpt) *Client
NewClient and returns a new Client given a redis connection option.
func (*Client) Enqueue ¶
Enqueue enqueues task to be processed immediately.
Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others. By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (*Client) EnqueueAsync ¶ added in v0.11.2
func (*Client) EnqueueAt ¶
EnqueueAt schedules task to be enqueued at the specified time.
EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others. By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (*Client) EnqueueIn ¶
EnqueueIn schedules task to be enqueued after the specified delay.
EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others. By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (*Client) SetDefaultOptions ¶
SetDefaultOptions sets options to be used for a given task type. The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others.
Default options can be overridden by options passed at enqueue time.
type Config ¶
type Config struct { // Maximum number of concurrent processing of tasks. // // If set to a zero or negative value, NewServer will overwrite the value // to the number of CPUs usable by the currennt process. Concurrency int // Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. // // n is the number of times the task has been retried. // e is the error returned by the task handler. // t is the task in question. RetryDelayFunc func(n int, e error, t *Task) time.Duration // List of queues to process with given priority value. Keys are the names of the // queues and values are associated priority value. // // If set to nil or not specified, the server will process only the "default" queue. // // Priority is treated as follows to avoid starving low priority queues. // // Example: // Queues: map[string]int{ // "critical": 6, // "default": 3, // "low": 1, // } // With the above config and given that all queues are not empty, the tasks // in "critical", "default", "low" should be processed 60%, 30%, 10% of // the time respectively. // // If a queue has a zero or negative priority value, the queue will be ignored. Queues map[string]int // StrictPriority indicates whether the queue priority should be treated strictly. // // If set to true, tasks in the queue with the highest priority is processed first. // The tasks in lower priority queues are processed only when those queues with // higher priorities are empty. StrictPriority bool // ErrorHandler handles errors returned by the task handler. // // HandleError is invoked only if the task handler returns a non-nil error. // // Example: // func reportError(task *asynq.Task, err error, retried, maxRetry int) { // if retried >= maxRetry { // err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err) // } // errorReportingService.Notify(err) // }) // // ErrorHandler: asynq.ErrorHandlerFunc(reportError) ErrorHandler ErrorHandler // Logger specifies the logger used by the server instance. // // If unset, default logger is used. Logger Logger // LogLevel specifies the minimum log level to enable. // // If unset, InfoLevel is used by default. LogLevel LogLevel // ShutdownTimeout specifies the duration to wait to let workers finish their tasks // before forcing them to abort when stopping the server. // // If unset or zero, default timeout of 8 seconds is used. ShutdownTimeout time.Duration // HealthCheckFunc is called periodically with any errors encountered during ping to the // connected redis server. HealthCheckFunc func(error) // HealthCheckInterval specifies the interval between healthchecks. // // If unset or zero, the interval is set to 15 seconds. HealthCheckInterval time.Duration }
Config specifies the server's background-task processing behavior.
type DailyStats ¶ added in v0.11.1
DailyStats holds aggregate data for a given day.
type DeadTask ¶ added in v0.11.1
type DeadTask struct { *Task ID string Queue string MaxRetry int Retried int LastFailedAt time.Time ErrorMsg string // contains filtered or unexported fields }
DeadTask is a task exhausted its retries. DeadTask won't be retried automatically.
type EnqueuedTask ¶ added in v0.11.1
EnqueuedTask is a task in a queue and is ready to be processed.
type ErrorHandler ¶
An ErrorHandler handles an error occured during task processing.
type ErrorHandlerFunc ¶
The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler. If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f.
func (ErrorHandlerFunc) HandleError ¶
func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err error)
HandleError calls fn(ctx, task, err)
type Handler ¶
A Handler processes tasks.
ProcessTask should return nil if the processing of a task is successful.
If ProcessTask return a non-nil error or panics, the task will be retried after delay.
func NotFoundHandler ¶
func NotFoundHandler() Handler
NotFoundHandler returns a simple task handler that returns a “not found“ error.
type HandlerFunc ¶
The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
func (HandlerFunc) ProcessTask ¶
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
ProcessTask calls fn(ctx, task)
type InProgressTask ¶ added in v0.11.1
InProgressTask is a task that's currently being processed.
type Inspector ¶ added in v0.11.1
type Inspector struct {
// contains filtered or unexported fields
}
Inspector is a client interface to inspect and mutate the state of queues and tasks.
func NewInspector ¶ added in v0.11.1
func NewInspector(r RedisConnOpt) *Inspector
New returns a new instance of Inspector.
func (*Inspector) CurrentStats ¶ added in v0.11.1
CurrentStats returns a current stats of the queues.
func (*Inspector) DeleteAllDeadTasks ¶ added in v0.11.1
DeleteAllDeadTasks deletes all tasks in dead state, and reports the number tasks deleted.
func (*Inspector) DeleteAllRetryTasks ¶ added in v0.11.1
DeleteAllRetryTasks deletes all tasks in retry state, and reports the number tasks deleted.
func (*Inspector) DeleteAllScheduledTasks ¶ added in v0.11.1
DeleteAllScheduledTasks deletes all tasks in scheduled state, and reports the number tasks deleted.
func (*Inspector) DeleteTaskByKey ¶ added in v0.11.1
DeleteTaskByKey deletes a task with the given key.
func (*Inspector) EnqueueAllDeadTasks ¶ added in v0.11.1
EnqueueAllDeadTasks enqueues all tasks in the dead state, and reports the number of tasks enqueued.
func (*Inspector) EnqueueAllRetryTasks ¶ added in v0.11.1
EnqueueAllRetryTasks enqueues all tasks in the retry state, and reports the number of tasks enqueued.
func (*Inspector) EnqueueAllScheduledTasks ¶ added in v0.11.1
EnqueueAllScheduledTasks enqueues all tasks in the scheduled state, and reports the number of tasks enqueued.
func (*Inspector) EnqueueTaskByKey ¶ added in v0.11.1
EnqueueTaskByKey enqueues a task with the given key.
func (*Inspector) History ¶ added in v0.11.1
func (i *Inspector) History(n int) ([]*DailyStats, error)
History returns a list of stats from the last n days.
func (*Inspector) KillAllRetryTasks ¶ added in v0.11.1
KillAllRetryTasks kills all tasks in retry state, and reports the number of tasks killed.
func (*Inspector) KillAllScheduledTasks ¶ added in v0.11.1
KillAllScheduledTasks kills all tasks in scheduled state, and reports the number of tasks killed.
func (*Inspector) KillTaskByKey ¶ added in v0.11.1
KillTaskByKey kills a task with the given key.
func (*Inspector) ListDeadTasks ¶ added in v0.11.1
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error)
ListScheduledTasks retrieves tasks in retry state. Tasks are sorted by LastFailedAt field in descending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListEnqueuedTasks ¶ added in v0.11.1
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error)
ListScheduledTasks retrieves tasks in the specified queue.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListInProgressTasks ¶ added in v0.11.1
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error)
ListScheduledTasks retrieves tasks currently being processed.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListRetryTasks ¶ added in v0.11.1
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error)
ListScheduledTasks retrieves tasks in retry state. Tasks are sorted by NextEnqueueAt field in ascending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) ListScheduledTasks ¶ added in v0.11.1
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error)
ListScheduledTasks retrieves tasks in scheduled state. Tasks are sorted by NextEnqueueAt field in ascending order.
By default, it retrieves the first 30 tasks.
func (*Inspector) PauseQueue ¶ added in v0.11.1
PauseQueue pauses task processing on the specified queue. If the queue is already paused, it will return a non-nil error.
func (*Inspector) UnpauseQueue ¶ added in v0.11.1
UnpauseQueue resumes task processing on the specified queue. If the queue is not paused, it will return a non-nil error.
type ListOption ¶ added in v0.11.1
type ListOption interface{}
ListOption specifies behavior of list operation.
func Page ¶ added in v0.11.1
func Page(n int) ListOption
Page returns an option to specify the page number for list operation. The value 1 fetches the first page.
Negative page number is treated as one.
func PageSize ¶ added in v0.11.1
func PageSize(n int) ListOption
PageSize returns an option to specify the page size for list operation.
Negative page size is treated as zero.
type LogLevel ¶
type LogLevel int32
LogLevel represents logging level.
It satisfies flag.Value interface.
const ( // DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes. DebugLevel LogLevel // InfoLevel is used for general informational log messages. InfoLevel // WarnLevel is used for undesired but relatively expected events, // which may indicate a problem. WarnLevel // ErrorLevel is used for undesired and unexpected events that // the program can recover from. ErrorLevel // FatalLevel is used for undesired and unexpected events that // the program cannot recover from. FatalLevel )
type Logger ¶
type Logger interface { // Debug logs a message at Debug level. Debug(args ...interface{}) // Info logs a message at Info level. Info(args ...interface{}) // Warn logs a message at Warning level. Warn(args ...interface{}) // Error logs a message at Error level. Error(args ...interface{}) // Fatal logs a message at Fatal level // and process will exit with status set to 1. Fatal(args ...interface{}) }
Logger supports logging at various log levels.
type MiddlewareFunc ¶
MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler. Typically, the returned handler is a closure which does something with the context and task passed to it, and then calls the handler passed as parameter to the MiddlewareFunc.
type Option ¶
type Option interface{}
Option specifies the task processing behavior.
func Deadline ¶
Deadline returns an option to specify the deadline for the given task. If it reaches the deadline before the Handler returns, then the task will be retried.
If there's a conflicting Timeout option, whichever comes earliest will be used.
func MaxRetry ¶
MaxRetry returns an option to specify the max number of times the task will be retried.
Negative retry count is treated as zero retry.
func Queue ¶
Queue returns an option to specify the queue to enqueue the task into.
Queue name is case-insensitive and the lowercased version is used.
func Timeout ¶
Timeout returns an option to specify how long a task may run. If the timeout elapses before the Handler returns, then the task will be retried.
Zero duration means no limit.
If there's a conflicting Deadline option, whichever comes earliest will be used.
func Unique ¶
Unique returns an option to enqueue a task only if the given task is unique. Task enqueued with this option is guaranteed to be unique within the given ttl. Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued. ErrDuplicateTask error is returned when enqueueing a duplicate task.
Uniqueness of a task is based on the following properties:
- Task Type
- Task Payload
- Queue Name
type Payload ¶
type Payload struct {
// contains filtered or unexported fields
}
Payload holds arbitrary data needed for task execution.
func NewPayloadFormStruct ¶ added in v0.10.3
func (Payload) GetBool ¶
GetBool returns a boolean value if a boolean type is associated with the key, otherwise reports an error.
func (Payload) GetDuration ¶
GetDuration returns a duration value if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetFloat64 ¶
GetFloat64 returns a float64 value if a numeric type is associated with the key, otherwise reports an error.
func (Payload) GetInt ¶
GetInt returns an int value if a numeric type is associated with the key, otherwise reports an error.
func (Payload) GetIntSlice ¶
GetIntSlice returns a slice of ints if a int slice type is associated with the key, otherwise reports an error.
func (Payload) GetString ¶
GetString returns a string value if a string type is associated with the key, otherwise reports an error.
func (Payload) GetStringMap ¶
GetStringMap returns a map of string to empty interface if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetStringMapBool ¶
GetStringMapBool returns a map of string to boolean if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetStringMapInt ¶
GetStringMapInt returns a map of string to int if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetStringMapString ¶
GetStringMapString returns a map of string to string if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetStringMapStringSlice ¶
GetStringMapStringSlice returns a map of string to string slice if a correct map type is associated with the key, otherwise reports an error.
func (Payload) GetStringSlice ¶
GetStringSlice returns a slice of strings if a string slice type is associated with the key, otherwise reports an error.
type QueueInfo ¶ added in v0.11.1
type QueueInfo struct { // Name of the queue (e.g. "default", "critical"). // Note: It doesn't include the prefix "asynq:queues:". Name string // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool // Size is the number of tasks in the queue. Size int }
QueueInfo holds information about a queue.
type RedisClientOpt ¶
type RedisClientOpt struct { // Network type to use, either tcp or unix. // Default is tcp. Network string // Redis server address in "host:port" format. Addr string // Redis server password. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config }
RedisClientOpt is used to create a redis client that connects to a redis server directly.
type RedisConnOpt ¶
type RedisConnOpt interface{}
RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
RedisConnOpt represents a sum of following types:
RedisClientOpt | *RedisClientOpt | RedisFailoverClientOpt | *RedisFailoverClientOpt
func ParseRedisURI ¶
func ParseRedisURI(uri string) (RedisConnOpt, error)
ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. It returns a non-nil error if uri cannot be parsed.
Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:. Supported formats are:
redis://[:password@]host[:port][/dbnumber] redis-socket://[:password@]path[?db=dbnumber] redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
Example ¶
package main import ( "fmt" "log" "github.com/cloudjjcc/asynq" ) func main() { rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10") if err != nil { log.Fatal(err) } r, ok := rconn.(asynq.RedisClientOpt) if !ok { log.Fatal("unexpected type") } fmt.Println(r.Addr) fmt.Println(r.DB) }
Output: localhost:6379 10
type RedisFailoverClientOpt ¶
type RedisFailoverClientOpt struct { // Redis master name that monitored by sentinels. MasterName string // Addresses of sentinels in "host:port" format. // Use at least three sentinels to avoid problems described in // https://redis.io/topics/sentinel. SentinelAddrs []string // Redis sentinel password. SentinelPassword string // Redis server password. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config }
RedisFailoverClientOpt is used to creates a redis client that talks to redis sentinels for service discovery and has an automatic failover capability.
type Result ¶
type Result struct { // ID is a unique identifier for the task. ID string // Retry is the maximum number of retry for the task. Retry int // Queue is a name of the queue the task is enqueued to. Queue string // Timeout is the timeout value for the task. // Counting for timeout starts when a worker starts processing the task. // If task processing doesn't complete within the timeout, the task will be retried. // The value zero means no timeout. // // If deadline is set, min(now+timeout, deadline) is used, where the now is the time when // a worker starts processing the task. Timeout time.Duration // Deadline is the deadline value for the task. // If task processing doesn't complete before the deadline, the task will be retried. // The value time.Unix(0, 0) means no deadline. // // If timeout is set, min(now+timeout, deadline) is used, where the now is the time when // a worker starts processing the task. Deadline time.Time }
A Result holds enqueued task's metadata.
type RetryTask ¶ added in v0.11.1
type RetryTask struct { *Task ID string Queue string NextEnqueueAt time.Time MaxRetry int Retried int ErrorMsg string // contains filtered or unexported fields }
RetryTask is a task scheduled to be retried in the future.
type ScheduledTask ¶ added in v0.11.1
type ScheduledTask struct { *Task ID string Queue string NextEnqueueAt time.Time // contains filtered or unexported fields }
ScheduledTask is a task scheduled to be processed in the future.
func (*ScheduledTask) Key ¶ added in v0.11.1
func (t *ScheduledTask) Key() string
Key returns a key used to delete, enqueue, and kill the task.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a multiplexer for asynchronous tasks. It matches the type of each task against a list of registered patterns and calls the handler for the pattern that most closely matches the task's type name.
Longer patterns take precedence over shorter ones, so that if there are handlers registered for both "images" and "images:thumbnails", the latter handler will be called for tasks with a type name beginning with "images:thumbnails" and the former will receive tasks with type name beginning with "images".
func (*ServeMux) Handle ¶
Handle registers the handler for the given pattern. If a handler already exists for pattern, Handle panics.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given pattern.
func (*ServeMux) Handler ¶
Handler returns the handler to use for the given task. It always return a non-nil handler.
Handler also returns the registered pattern that matches the task.
If there is no registered handler that applies to the task, handler returns a 'not found' handler which returns an error.
func (*ServeMux) ProcessTask ¶
ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.
func (*ServeMux) Use ¶
func (mux *ServeMux) Use(mws ...MiddlewareFunc)
Use appends a MiddlewareFunc to the chain. Middlewares are executed in the order that they are applied to the ServeMux.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is responsible for managing the background-task processing.
Server pulls tasks off queues and processes them. If the processing of a task is unsuccessful, server will schedule it for a retry. A task will be retried until either the task gets processed successfully or until it reaches its max retry count.
If a task exhausts its retries, it will be moved to the "dead" queue and will be kept in the queue for some time until a certain condition is met (e.g., queue size reaches a certain limit, or the task has been in the queue for a certain amount of time).
func NewServer ¶
func NewServer(r RedisConnOpt, cfg Config) *Server
NewServer returns a new Server given a redis connection option and background processing configuration.
func (*Server) Quiet ¶
func (srv *Server) Quiet()
Quiet signals the server to stop pulling new tasks off queues. Quiet should be used before stopping the server.
Example ¶
package main import ( "log" "os" "os/signal" "github.com/cloudjjcc/asynq" "golang.org/x/sys/unix" ) func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, ) h := asynq.NewServeMux() // ... Register handlers if err := srv.Start(h); err != nil { log.Fatal(err) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) // Handle SIGTERM, SIGINT to exit the program. // Handle SIGTSTP to stop processing new tasks. for { s := <-sigs if s == unix.SIGTSTP { srv.Quiet() // stop processing new tasks continue } break } srv.Stop() }
Output:
func (*Server) Run ¶
Run starts the background-task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.
Run returns any error encountered during server startup time. If the server has already been stopped, ErrServerStopped is returned.
Example ¶
package main import ( "log" "github.com/cloudjjcc/asynq" ) func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, ) h := asynq.NewServeMux() // ... Register handlers // Run blocks and waits for os signal to terminate the program. if err := srv.Run(h); err != nil { log.Fatal(err) } }
Output:
func (*Server) Start ¶
Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task. Tasks are processed concurrently by the workers up to the number of concurrency specified at the initialization time.
Start returns any error encountered during server startup time. If the server has already been stopped, ErrServerStopped is returned.
func (*Server) Stop ¶
func (srv *Server) Stop()
Stop stops the worker server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
Example ¶
package main import ( "log" "os" "os/signal" "github.com/cloudjjcc/asynq" "golang.org/x/sys/unix" ) func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, ) h := asynq.NewServeMux() // ... Register handlers if err := srv.Start(h); err != nil { log.Fatal(err) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) <-sigs // wait for termination signal srv.Stop() }
Output:
type Stats ¶ added in v0.11.1
type Stats struct { Enqueued int InProgress int Scheduled int Retry int Dead int Processed int Failed int Queues []*QueueInfo Timestamp time.Time }
Stats represents a state of queues at a certain time.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
internal
|
|
asynqtest
Package asynqtest defines test helpers for asynq and its internal packages.
|
Package asynqtest defines test helpers for asynq and its internal packages. |
base
Package base defines foundational types and constants used in asynq package.
|
Package base defines foundational types and constants used in asynq package. |
log
Package log exports logging related types and functions.
|
Package log exports logging related types and functions. |
rdb
Package rdb encapsulates the interactions with redis.
|
Package rdb encapsulates the interactions with redis. |
testbroker
Package testbroker exports a broker implementation that should be used in package testing.
|
Package testbroker exports a broker implementation that should be used in package testing. |
tools
module
|