Versions in this module Expand all Collapse all v0 v0.24.1 May 20, 2024 Changes in this version + var ErrDuplicateTask = errors.New("task already exists") + var ErrLeaseExpired = errors.New("asynq: task lease expired") + var ErrQueueNotEmpty = errors.New("queue is not empty") + var ErrQueueNotFound = errors.New("queue not found") + var ErrServerClosed = errors.New("asynq: Server closed") + var ErrTaskIDConflict = errors.New("task ID conflicts with another task") + var ErrTaskNotFound = errors.New("task not found") + var SkipRetry = errors.New("skip retry for the task") + func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration + func GetMaxRetry(ctx context.Context) (n int, ok bool) + func GetQueueName(ctx context.Context) (queue string, ok bool) + func GetRetryCount(ctx context.Context) (n int, ok bool) + func GetTaskID(ctx context.Context) (id string, ok bool) + func NotFound(ctx context.Context, task *Task) error + type Client struct + func NewClient(r RedisConnOpt) *Client + func (c *Client) Close() error + func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) + func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) + type ClusterNode struct + Addr string + ID string + type Config struct + BaseContext func() context.Context + Concurrency int + DelayedTaskCheckInterval time.Duration + ErrorHandler ErrorHandler + GroupAggregator GroupAggregator + GroupGracePeriod time.Duration + GroupMaxDelay time.Duration + GroupMaxSize int + HealthCheckFunc func(error) + HealthCheckInterval time.Duration + IsFailure func(error) bool + LogLevel LogLevel + Logger Logger + Queues map[string]int + RetryDelayFunc RetryDelayFunc + ShutdownTimeout time.Duration + StrictPriority bool + type DailyStats struct + Date time.Time + Failed int + Processed int + Queue string + type ErrorHandler interface + HandleError func(ctx context.Context, task *Task, err error) + type ErrorHandlerFunc func(ctx context.Context, task *Task, err error) + func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err error) + type GroupAggregator interface + Aggregate func(group string, tasks []*Task) *Task + type GroupAggregatorFunc func(group string, tasks []*Task) *Task + func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task + type GroupInfo struct + Group string + Size int + type Handler interface + ProcessTask func(context.Context, *Task) error + func NotFoundHandler() Handler + type HandlerFunc func(context.Context, *Task) error + func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error + type Heartbeater struct + Host string + Pid int + ServerID string + type Inspector struct + func NewInspector(r RedisConnOpt) *Inspector + func (i *Inspector) ArchiveAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) ArchiveAllPendingTasks(queue string) (int, error) + func (i *Inspector) ArchiveAllRetryTasks(queue string) (int, error) + func (i *Inspector) ArchiveAllScheduledTasks(queue string) (int, error) + func (i *Inspector) ArchiveTask(queue, id string) error + func (i *Inspector) CancelProcessing(id string) error + func (i *Inspector) Close() error + func (i *Inspector) ClusterKeySlot(queue string) (int64, error) + func (i *Inspector) ClusterNodes(queue string) ([]*ClusterNode, error) + func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) DeleteAllArchivedTasks(queue string) (int, error) + func (i *Inspector) DeleteAllCompletedTasks(queue string) (int, error) + func (i *Inspector) DeleteAllPendingTasks(queue string) (int, error) + func (i *Inspector) DeleteAllRetryTasks(queue string) (int, error) + func (i *Inspector) DeleteAllScheduledTasks(queue string) (int, error) + func (i *Inspector) DeleteQueue(queue string, force bool) error + func (i *Inspector) DeleteTask(queue, id string) error + func (i *Inspector) GetQueueInfo(queue string) (*QueueInfo, error) + func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) + func (i *Inspector) Groups(queue string) ([]*GroupInfo, error) + func (i *Inspector) History(queue string, n int) ([]*DailyStats, error) + func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) + func (i *Inspector) PauseQueue(queue string) error + func (i *Inspector) Queues() ([]string, error) + func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) RunAllArchivedTasks(queue string) (int, error) + func (i *Inspector) RunAllRetryTasks(queue string) (int, error) + func (i *Inspector) RunAllScheduledTasks(queue string) (int, error) + func (i *Inspector) RunTask(queue, id string) error + func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) + func (i *Inspector) Servers() ([]*ServerInfo, error) + func (i *Inspector) UnpauseQueue(queue string) error + type ListOption interface + func Page(n int) ListOption + func PageSize(n int) ListOption + type LogLevel int32 + const DebugLevel + const ErrorLevel + const FatalLevel + const InfoLevel + const WarnLevel + func (l *LogLevel) Set(val string) error + func (l *LogLevel) String() string + type Logger interface + Debug func(args ...interface{}) + Error func(args ...interface{}) + Fatal func(args ...interface{}) + Info func(args ...interface{}) + Warn func(args ...interface{}) + type MiddlewareFunc func(Handler) Handler + type Option interface + String func() string + Type func() OptionType + Value func() interface{} + func Deadline(t time.Time) Option + func Group(name string) Option + func MaxRetry(n int) Option + func ProcessAt(t time.Time) Option + func ProcessIn(d time.Duration) Option + func Queue(name string) Option + func Retention(d time.Duration) Option + func TaskID(id string) Option + func Timeout(d time.Duration) Option + func Unique(ttl time.Duration) Option + type OptionType int + const DeadlineOpt + const GroupOpt + const MaxRetryOpt + const ProcessAtOpt + const ProcessInOpt + const QueueOpt + const RetentionOpt + const TaskIDOpt + const TimeoutOpt + const UniqueOpt + type PeriodicTaskConfig struct + Cronspec string + Opts []Option + Task *Task + type PeriodicTaskConfigProvider interface + GetConfigs func() ([]*PeriodicTaskConfig, error) + type PeriodicTaskManager struct + func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, error) + func (mgr *PeriodicTaskManager) Run() error + func (mgr *PeriodicTaskManager) Shutdown() + func (mgr *PeriodicTaskManager) Start() error + type PeriodicTaskManagerOpts struct + PeriodicTaskConfigProvider PeriodicTaskConfigProvider + RedisConnOpt RedisConnOpt + SyncInterval time.Duration + type QueueInfo struct + Active int + Aggregating int + Archived int + Completed int + Failed int + FailedTotal int + Groups int + Latency time.Duration + MemoryUsage int64 + Paused bool + Pending int + Processed int + ProcessedTotal int + Queue string + Retry int + Scheduled int + Size int + Timestamp time.Time + type RedisClientOpt struct + Addr string + DB int + DialTimeout time.Duration + Network string + Password string + PoolSize int + ReadTimeout time.Duration + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisClientOpt) MakeRedisClient() interface{} + type RedisClusterClientOpt struct + Addrs []string + DialTimeout time.Duration + MaxRedirects int + Password string + ReadTimeout time.Duration + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisClusterClientOpt) MakeRedisClient() interface{} + type RedisConnOpt interface + MakeRedisClient func() interface{} + func ParseRedisURI(uri string) (RedisConnOpt, error) + type RedisFailoverClientOpt struct + DB int + DialTimeout time.Duration + MasterName string + Password string + PoolSize int + ReadTimeout time.Duration + SentinelAddrs []string + SentinelPassword string + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} + type ResultWriter struct + func (w *ResultWriter) TaskID() string + func (w *ResultWriter) Write(data []byte) (n int, err error) + type RetryDelayFunc func(n int, e error, t *Task) time.Duration + type Scheduler struct + func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler + func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) + func (s *Scheduler) Run() error + func (s *Scheduler) Shutdown() + func (s *Scheduler) Start() error + func (s *Scheduler) Unregister(entryID string) error + type SchedulerEnqueueEvent struct + EnqueuedAt time.Time + TaskID string + type SchedulerEntry struct + ID string + Next time.Time + Opts []Option + Prev time.Time + Spec string + Task *Task + type SchedulerOpts struct + EnqueueErrorHandler func(task *Task, opts []Option, err error) + Location *time.Location + LogLevel LogLevel + Logger Logger + PostEnqueueFunc func(info *TaskInfo, err error) + PreEnqueueFunc func(task *Task, opts []Option) + type ServeMux struct + func NewServeMux() *ServeMux + func (mux *ServeMux) Handle(pattern string, handler Handler) + func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) + func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) + func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error + func (mux *ServeMux) Use(mws ...MiddlewareFunc) + type Server struct + Heartbeater *Heartbeater + State *serverState + func NewServer(r RedisConnOpt, cfg Config) *Server + func (srv *Server) Run(handler Handler) error + func (srv *Server) Shutdown() + func (srv *Server) Start(handler Handler) error + func (srv *Server) Stop() + type ServerInfo struct + ActiveWorkers []*WorkerInfo + Concurrency int + Host string + ID string + PID int + Queues map[string]int + Started time.Time + Status string + StrictPriority bool + type ServerStateValue int + const SrvStateActive + const SrvStateClosed + const SrvStateNew + const SrvStateStopped + func (s ServerStateValue) String() string + type Task struct + func NewTask(typename string, payload []byte, opts ...Option) *Task + func (t *Task) Payload() []byte + func (t *Task) ResultWriter() *ResultWriter + func (t *Task) Type() string + type TaskInfo struct + CompletedAt time.Time + Deadline time.Time + Group string + ID string + IsOrphaned bool + LastErr string + LastFailedAt time.Time + MaxRetry int + NextProcessAt time.Time + Payload []byte + Queue string + Result []byte + Retention time.Duration + Retried int + State TaskState + Timeout time.Duration + Type string + type TaskState int + const TaskStateActive + const TaskStateAggregating + const TaskStateArchived + const TaskStateCompleted + const TaskStatePending + const TaskStateRetry + const TaskStateScheduled + func (s TaskState) String() string + type WorkerInfo struct + Deadline time.Time + Queue string + Started time.Time + TaskID string + TaskPayload []byte + TaskType string