Versions in this module Expand all Collapse all v1 v1.0.8 Apr 21, 2024 v1.0.7 Apr 21, 2024 v1.0.6 Apr 21, 2024 v1.0.5 Apr 21, 2024 v1.0.4 Apr 20, 2024 v1.0.3 Apr 20, 2024 v1.0.1 Apr 20, 2024 Changes in this version + var ErrDuplicateTask = errors.New("task already exists") + var ErrLeaseExpired = errors.New("asynq: task lease expired") + var ErrQueueNotEmpty = errors.New("queue is not empty") + var ErrQueueNotFound = errors.New("queue not found") + var ErrServerClosed = errors.New("asynq: Server closed") + var ErrTaskIDConflict = errors.New("task ID conflicts with another task") + var ErrTaskNotFound = errors.New("task not found") + var SkipRetry = errors.New("skip retry for the task") + func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration + func GetMaxRetry(ctx context.Context) (n int, ok bool) + func GetQueueName(ctx context.Context) (queue string, ok bool) + func GetRetryCount(ctx context.Context) (n int, ok bool) + func GetTaskID(ctx context.Context) (id string, ok bool) + func IsPanicError(err error) bool + func NotFound(ctx context.Context, task *Task) error + func SetGlobalPrefix(prefix string) + type Client struct + func NewClient(r RedisConnOpt) *Client + func (c *Client) Close() error + func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) + func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) + type ClusterNode struct + Addr string + ID string + type Config struct + BaseContext func() context.Context + Concurrency int + DelayedTaskCheckInterval time.Duration + ErrorHandler ErrorHandler + GroupAggregator GroupAggregator + GroupGracePeriod time.Duration + GroupMaxDelay time.Duration + GroupMaxSize int + HealthCheckFunc func(error) + HealthCheckInterval time.Duration + IsFailure func(error) bool + LogLevel LogLevel + Logger Logger + Queues map[string]int + RetryDelayFunc RetryDelayFunc + ShutdownTimeout time.Duration + StrictPriority bool + TaskCheckInterval time.Duration + type DailyStats struct + Date time.Time + Failed int + Processed int + Queue string + type ErrorHandler interface + HandleError func(ctx context.Context, task *Task, err error) + type ErrorHandlerFunc func(ctx context.Context, task *Task, err error) + func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err error) + type GroupAggregator interface + Aggregate func(group string, tasks []*Task) *Task + type GroupAggregatorFunc func(group string, tasks []*Task) *Task + func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task + type GroupInfo struct + Group string + Size int + type Handler interface + ProcessTask func(context.Context, *Task) error + func NotFoundHandler() Handler + type HandlerFunc func(context.Context, *Task) error + func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error + type Inspector struct + func NewInspector(r RedisConnOpt) *Inspector + func (i *Inspector) ArchiveAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) ArchiveAllPendingTasks(queue string) (int, error) + func (i *Inspector) ArchiveAllRetryTasks(queue string) (int, error) + func (i *Inspector) ArchiveAllScheduledTasks(queue string) (int, error) + func (i *Inspector) ArchiveTask(queue, id string) error + func (i *Inspector) CancelProcessing(id string) error + func (i *Inspector) Close() error + func (i *Inspector) ClusterKeySlot(queue string) (int64, error) + func (i *Inspector) ClusterNodes(queue string) ([]*ClusterNode, error) + func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) DeleteAllArchivedTasks(queue string) (int, error) + func (i *Inspector) DeleteAllCompletedTasks(queue string) (int, error) + func (i *Inspector) DeleteAllPendingTasks(queue string) (int, error) + func (i *Inspector) DeleteAllRetryTasks(queue string) (int, error) + func (i *Inspector) DeleteAllScheduledTasks(queue string) (int, error) + func (i *Inspector) DeleteQueue(queue string, force bool) error + func (i *Inspector) DeleteTask(queue, id string) error + func (i *Inspector) GetQueueInfo(queue string) (*QueueInfo, error) + func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) + func (i *Inspector) Groups(queue string) ([]*GroupInfo, error) + func (i *Inspector) History(queue string, n int) ([]*DailyStats, error) + func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) + func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) + func (i *Inspector) PauseQueue(queue string) error + func (i *Inspector) Queues() ([]string, error) + func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) + func (i *Inspector) RunAllArchivedTasks(queue string) (int, error) + func (i *Inspector) RunAllRetryTasks(queue string) (int, error) + func (i *Inspector) RunAllScheduledTasks(queue string) (int, error) + func (i *Inspector) RunTask(queue, id string) error + func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) + func (i *Inspector) Servers() ([]*ServerInfo, error) + func (i *Inspector) UnpauseQueue(queue string) error + type ListOption interface + func Page(n int) ListOption + func PageSize(n int) ListOption + type LogLevel int32 + const DebugLevel + const ErrorLevel + const FatalLevel + const InfoLevel + const WarnLevel + func (l *LogLevel) Set(val string) error + func (l *LogLevel) String() string + type Logger interface + Debug func(args ...interface{}) + Error func(args ...interface{}) + Fatal func(args ...interface{}) + Info func(args ...interface{}) + Warn func(args ...interface{}) + type MiddlewareFunc func(Handler) Handler + type Option interface + String func() string + Type func() OptionType + Value func() interface{} + func Deadline(t time.Time) Option + func Group(name string) Option + func MaxRetry(n int) Option + func ProcessAt(t time.Time) Option + func ProcessIn(d time.Duration) Option + func Queue(name string) Option + func Retention(d time.Duration) Option + func TaskID(id string) Option + func Timeout(d time.Duration) Option + func Unique(ttl time.Duration) Option + type OptionType int + const DeadlineOpt + const GroupOpt + const MaxRetryOpt + const ProcessAtOpt + const ProcessInOpt + const QueueOpt + const RetentionOpt + const TaskIDOpt + const TimeoutOpt + const UniqueOpt + type PeriodicTaskConfig struct + Cronspec string + Opts []Option + Task *Task + type PeriodicTaskConfigProvider interface + GetConfigs func() ([]*PeriodicTaskConfig, error) + type PeriodicTaskManager struct + func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, error) + func (mgr *PeriodicTaskManager) Run() error + func (mgr *PeriodicTaskManager) Shutdown() + func (mgr *PeriodicTaskManager) Start() error + type PeriodicTaskManagerOpts struct + PeriodicTaskConfigProvider PeriodicTaskConfigProvider + RedisConnOpt RedisConnOpt + SyncInterval time.Duration + type QueueInfo struct + Active int + Aggregating int + Archived int + Completed int + Failed int + FailedTotal int + Groups int + Latency time.Duration + MemoryUsage int64 + Paused bool + Pending int + Processed int + ProcessedTotal int + Queue string + Retry int + Scheduled int + Size int + Timestamp time.Time + type RedisClientOpt struct + Addr string + DB int + DialTimeout time.Duration + Network string + Password string + PoolSize int + ReadTimeout time.Duration + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisClientOpt) MakeRedisClient() interface{} + type RedisClusterClientOpt struct + Addrs []string + DialTimeout time.Duration + MaxRedirects int + Password string + ReadTimeout time.Duration + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisClusterClientOpt) MakeRedisClient() interface{} + type RedisConnOpt interface + MakeRedisClient func() interface{} + func ParseRedisURI(uri string) (RedisConnOpt, error) + type RedisFailoverClientOpt struct + DB int + DialTimeout time.Duration + MasterName string + Password string + PoolSize int + ReadTimeout time.Duration + SentinelAddrs []string + SentinelPassword string + TLSConfig *tls.Config + Username string + WriteTimeout time.Duration + func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} + type ResultWriter struct + func (w *ResultWriter) TaskID() string + func (w *ResultWriter) Write(data []byte) (n int, err error) + type RetryDelayFunc func(n int, e error, t *Task) time.Duration + type Scheduler struct + func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler + func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) + func (s *Scheduler) Run() error + func (s *Scheduler) Shutdown() + func (s *Scheduler) Start() error + func (s *Scheduler) Unregister(entryID string) error + type SchedulerEnqueueEvent struct + EnqueuedAt time.Time + TaskID string + type SchedulerEntry struct + ID string + Next time.Time + Opts []Option + Prev time.Time + Spec string + Task *Task + type SchedulerOpts struct + EnqueueErrorHandler func(task *Task, opts []Option, err error) + Location *time.Location + LogLevel LogLevel + Logger Logger + PostEnqueueFunc func(info *TaskInfo, err error) + PreEnqueueFunc func(task *Task, opts []Option) + type ServeMux struct + func NewServeMux() *ServeMux + func (mux *ServeMux) Handle(pattern string, handler Handler) + func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) + func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) + func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error + func (mux *ServeMux) Use(mws ...MiddlewareFunc) + type Server struct + func NewServer(r RedisConnOpt, cfg Config) *Server + func (srv *Server) Run(handler Handler) error + func (srv *Server) Shutdown() + func (srv *Server) Start(handler Handler) error + func (srv *Server) Stop() + type ServerInfo struct + ActiveWorkers []*WorkerInfo + Concurrency int + Host string + ID string + PID int + Queues map[string]int + Started time.Time + Status string + StrictPriority bool + type Task struct + func NewTask(typename string, payload []byte, opts ...Option) *Task + func (t *Task) Payload() []byte + func (t *Task) ResultWriter() *ResultWriter + func (t *Task) Type() string + type TaskInfo struct + CompletedAt time.Time + Deadline time.Time + Group string + ID string + IsOrphaned bool + LastErr string + LastFailedAt time.Time + MaxRetry int + NextProcessAt time.Time + Payload []byte + Queue string + Result []byte + Retention time.Duration + Retried int + State TaskState + Timeout time.Duration + Type string + type TaskState int + const TaskStateActive + const TaskStateAggregating + const TaskStateArchived + const TaskStateCompleted + const TaskStatePending + const TaskStateRetry + const TaskStateScheduled + func (s TaskState) String() string + type WorkerInfo struct + Deadline time.Time + Queue string + Started time.Time + TaskID string + TaskPayload []byte + TaskType string