Documentation ¶
Index ¶
- Constants
- Variables
- func NewClient(redisClient redis.UniversalClient) (*asynq.Client, error)
- func NewSlogAdapter(log *slog.Logger) asynq.Logger
- func RunQueueServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, ...) func() error
- func RunSchedulerServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, ...) func() error
- type Enqueuer
- func MustNewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) *Enqueuer
- func MustNewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer
- func NewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) (*Enqueuer, error)
- func NewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) (*Enqueuer, error)
- type EnqueuerOption
- type QueueServer
- type QueueServerOption
- func WithQueueConcurrency(concurrency int) QueueServerOption
- func WithQueueLogLevel(level string) QueueServerOption
- func WithQueueLogger(logger asynq.Logger) QueueServerOption
- func WithQueueName(name string) QueueServerOption
- func WithQueueShutdownTimeout(timeout time.Duration) QueueServerOption
- func WithQueues(queues map[string]int) QueueServerOption
- type SchedulerServer
- type SchedulerServerOption
- func WithPostEnqueueFunc(fn func(info *asynq.TaskInfo, err error)) SchedulerServerOption
- func WithPreEnqueueFunc(fn func(task *asynq.Task, opts []asynq.Option)) SchedulerServerOption
- func WithSchedulerLocation(timeZone string) SchedulerServerOption
- func WithSchedulerLogLevel(level string) SchedulerServerOption
- func WithSchedulerLogger(logger asynq.Logger) SchedulerServerOption
- type TaskHandler
- type TaskOption
- type TaskScheduler
Constants ¶
const ( LogLevelDebug = "debug" LogLevelInfo = "info" LogLevelWarn = "warn" LogLevelError = "error" LogLevelFatal = "fatal" )
Log levels string representation.
Variables ¶
var ( ErrFailedToParseRedisURI = errors.New("failed to parse redis connection string") ErrMissedAsynqClient = errors.New("missed asynq client") ErrFailedToCreateEnqueuerWithClient = errors.New("failed to create enqueuer with asynq client") ErrFailedToEnqueueTask = errors.New("failed to enqueue task") ErrFailedToCloseEnqueuer = errors.New("failed to close enqueuer") ErrFailedToStartQueueServer = errors.New("failed to start queue server") ErrFailedToUnmarshalPayload = errors.New("failed to unmarshal payload") ErrFailedToRunQueueServer = errors.New("failed to run queue server") ErrFailedToScheduleTask = errors.New("failed to schedule task") ErrFailedToStartSchedulerServer = errors.New("failed to start scheduler server") ErrCronSpecIsEmpty = errors.New("cron spec is empty") ErrTaskNameIsEmpty = errors.New("task name is empty") ErrFailedToRunSchedulerServer = errors.New("failed to run scheduler server") )
Predefined errors.
Functions ¶
func NewClient ¶
NewClient creates a new instance of the asynq client using the provided Redis connection string. It returns the created client, the Redis connection options, and any error encountered during the process.
func RunQueueServer ¶ added in v0.2.0
func RunQueueServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, handlers ...TaskHandler) func() error
RunQueueServer starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:
eg, _ := errgroup.WithContext(context.Background()) eg.Go(asyncer.RunQueueServer( "redis://localhost:6379", logger, asyncer.HandlerFunc[PayloadStruct1]("task1", task1Handler), asyncer.HandlerFunc[PayloadStruct2]("task2", task2Handler), )) func task1Handler(ctx context.Context, payload PayloadStruct1) error { // ... handle task here ... } func task2Handler(ctx context.Context, payload PayloadStruct2) error { // ... handle task here ... }
The function panics if the redis connection string is invalid. The function returns an error if the server fails to start.
func RunSchedulerServer ¶ added in v0.2.0
func RunSchedulerServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, schedulers ...TaskScheduler) func() error
eg.Go(asyncer.RunQueueServer( "redis://localhost:6379", logger, asyncer.ScheduledHandlerFunc("scheduled_task_1", scheduledTaskHandler), )) func scheduledTaskHandler(ctx context.Context) error { // ...handle task here... }
The function returns an error if the server fails to start. The function panics if the Redis connection string is invalid.
!!! Pay attention, that the scheduler just triggers the job, so you need to run queue server as well.
Types ¶
type Enqueuer ¶
type Enqueuer struct {
// contains filtered or unexported fields
}
Enqueuer is a helper struct for enqueuing tasks. You can encapsulate this struct in your own struct to add queue methods. See pkg/worker/_example/enqueuer.go for an example.
func MustNewEnqueuer ¶ added in v0.2.0
func MustNewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) *Enqueuer
MustNewEnqueuer creates a new Enqueuer with the given Redis connection string and options. It panics if an error occurs during the creation of the Enqueuer.
func MustNewEnqueuerWithAsynqClient ¶ added in v0.2.0
func MustNewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer
MustNewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It panics if an error occurs during the creation of the Enqueuer.
func NewEnqueuer ¶
func NewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) (*Enqueuer, error)
NewEnqueuer creates a new Enqueuer with the given Redis connection string and options. Default values are used if no option is provided. It returns a pointer to the Enqueuer and an error if there was a problem creating the Enqueuer.
func NewEnqueuerWithAsynqClient ¶ added in v0.2.0
func NewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) (*Enqueuer, error)
NewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It returns a pointer to the Enqueuer and an error if the Asynq client is nil. The Enqueuer is responsible for enqueueing tasks to the Asynq server. Default values are used if no option is provided. Default values are:
- queue name: "default"
- task deadline: 1 minute
- max retry: 3
func (*Enqueuer) Close ¶ added in v0.2.0
Close closes the Enqueuer and releases any resources associated with it. It returns an error if there was a problem closing the Enqueuer.
func (*Enqueuer) EnqueueTask ¶
func (e *Enqueuer) EnqueueTask(ctx context.Context, taskName string, payload any, opts ...TaskOption) error
EnqueueTask enqueues a task to be processed asynchronously. It takes a context and a task as parameters. The task is enqueued with the specified queue name, deadline, maximum retry count, and uniqueness constraint. Returns an error if the task fails to enqueue.
type EnqueuerOption ¶
type EnqueuerOption func(*Enqueuer)
EnqueuerOption is a function that configures an enqueuer.
func WithMaxRetry ¶
func WithMaxRetry(n int) EnqueuerOption
WithMaxRetry configures the max retry. The max retry is the number of times the task will be retried if it fails.
func WithQueueNameEnq ¶
func WithQueueNameEnq(name string) EnqueuerOption
WithQueueNameEnq configures the queue name for enqueuing. The queue name is the name of the queue where the task will be enqueued.
func WithTaskDeadline ¶
func WithTaskDeadline(d time.Duration) EnqueuerOption
WithTaskDeadline configures the task deadline. The task deadline is the time limit for the task to be processed.
type QueueServer ¶
type QueueServer struct {
// contains filtered or unexported fields
}
QueueServer is a wrapper for asynq.Server.
func NewQueueServer ¶
func NewQueueServer(redisClient redis.UniversalClient, opts ...QueueServerOption) *QueueServer
NewQueueServer creates a new instance of QueueServer. It takes a redis connection option and optional queue server options. The function returns a pointer to the created QueueServer.
func (*QueueServer) Run ¶
func (srv *QueueServer) Run(handlers ...TaskHandler) func() error
Run starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:
eg, ctx := errgroup.WithContext(context.Background()) eg.Go(queueServer.Run( yourapp.NewTaskHandler1(), yourapp.NewTaskHandler2(), ))
The function returns an error if the server fails to start.
func (*QueueServer) Shutdown ¶
func (srv *QueueServer) Shutdown()
Shutdown gracefully shuts down the queue server by waiting for all in-flight tasks to finish processing before shutdown.
type QueueServerOption ¶
QueueServerOption is a function that configures a QueueServer.
func WithQueueConcurrency ¶
func WithQueueConcurrency(concurrency int) QueueServerOption
WithQueueConcurrency sets the queue concurrency.
func WithQueueLogLevel ¶
func WithQueueLogLevel(level string) QueueServerOption
WithQueueLogLevel sets the queue log level.
func WithQueueLogger ¶ added in v0.1.1
func WithQueueLogger(logger asynq.Logger) QueueServerOption
WithQueueLogger sets the queue logger.
func WithQueueName ¶
func WithQueueName(name string) QueueServerOption
WithQueueName sets the queue name.
func WithQueueShutdownTimeout ¶
func WithQueueShutdownTimeout(timeout time.Duration) QueueServerOption
WithQueueShutdownTimeout sets the queue shutdown timeout.
func WithQueues ¶
func WithQueues(queues map[string]int) QueueServerOption
WithQueues sets the queues. It panics if the sum of concurrency is not equal to the concurrency set in the config. If you want to increase the concurrency of a queue, you can use asyncer.WithQueueConcurrency before this option.
type SchedulerServer ¶
type SchedulerServer struct {
// contains filtered or unexported fields
}
SchedulerServer is a wrapper for asynq.Scheduler.
func NewSchedulerServer ¶
func NewSchedulerServer(redisClient redis.UniversalClient, opts ...SchedulerServerOption) *SchedulerServer
NewSchedulerServer creates a new scheduler client and returns the server.
func (*SchedulerServer) Run ¶
func (srv *SchedulerServer) Run() func() error
Run runs the scheduler with the provided handlers. It returns a function that can be used to run server in a error group. E.g.:
eg, ctx := errgroup.WithContext(context.Background()) eg.Go(schedulerServer.Run())
func (*SchedulerServer) ScheduleTask ¶ added in v0.3.0
func (srv *SchedulerServer) ScheduleTask(cronSpec, taskName string, opts ...TaskOption) error
ScheduleTask schedules a task based on the given cron specification and task name. It returns an error if the cron specification or task name is empty, or if there was an error registering the task.
func (*SchedulerServer) Shutdown ¶
func (srv *SchedulerServer) Shutdown()
Shutdown gracefully shuts down the scheduler server by waiting for all pending tasks to be processed.
type SchedulerServerOption ¶
type SchedulerServerOption func(*asynq.SchedulerOpts)
SchedulerServerOption is a function that configures a SchedulerServer.
func WithPostEnqueueFunc ¶
func WithPostEnqueueFunc(fn func(info *asynq.TaskInfo, err error)) SchedulerServerOption
WithPostEnqueueFunc sets the scheduler post enqueue function.
func WithPreEnqueueFunc ¶
func WithPreEnqueueFunc(fn func(task *asynq.Task, opts []asynq.Option)) SchedulerServerOption
WithPreEnqueueFunc sets the scheduler pre enqueue function.
func WithSchedulerLocation ¶
func WithSchedulerLocation(timeZone string) SchedulerServerOption
WithSchedulerLocation sets the scheduler location.
func WithSchedulerLogLevel ¶
func WithSchedulerLogLevel(level string) SchedulerServerOption
WithSchedulerLogLevel sets the scheduler log level.
func WithSchedulerLogger ¶ added in v0.2.0
func WithSchedulerLogger(logger asynq.Logger) SchedulerServerOption
WithSchedulerLogger sets the scheduler logger.
type TaskHandler ¶ added in v0.2.0
type TaskHandler interface { // TaskName returns the name of the task. It is used to register the task handler. TaskName() string // Handle handles the task. It takes a context and a payload as parameters. Handle(ctx context.Context, payload []byte) error // Options returns the options for the task handler. Options() []asynq.Option }
TaskHandler is an interface for task handlers. It is used to register task handlers in the queue server.
func HandlerFunc ¶ added in v0.2.0
func HandlerFunc[Payload any](name string, fn handlerFunc[Payload], opts ...TaskOption) TaskHandler
HandlerFunc is a function that creates a TaskHandler for handling tasks of a specific payload type. It takes a name string and a handler function as parameters and returns a TaskHandler. The name parameter represents the name of the handler, while the fn parameter is the actual handler function. The TaskHandler returned by HandlerFunc is responsible for executing the handler function when a task of the specified payload type is received. The payload type is specified using the generic type parameter Payload.
func ScheduledHandlerFunc ¶ added in v0.2.0
func ScheduledHandlerFunc(name string, fn scheduledHandlerFunc) TaskHandler
ScheduledHandlerFunc is a function that creates a TaskHandler for a scheduled task. It takes a name string and a scheduledHandlerFunc as parameters and returns a TaskHandler. The name parameter specifies the name of the scheduled task, while the fn parameter is the function to be executed when the task is triggered. The returned TaskHandler can be used to register the scheduled task in the queue server.
type TaskOption ¶ added in v0.4.0
func Deadline ¶ added in v0.4.0
func Deadline(t time.Time) TaskOption
Deadline sets the deadline for the task. The task will not be processed if it is received after the specified date and time.
func Group ¶ added in v0.4.0
func Group(g string) TaskOption
Group returns an option to specify the group used for the task. Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.
func MaxRetry ¶ added in v0.4.0
func MaxRetry(n int) TaskOption
MaxRetry sets the maximum number of retries for the task. The task will be marked as failed after the specified number of failed attempts.
func TaskID ¶ added in v0.4.0
func TaskID(id string) TaskOption
TaskID sets the ID for the task. The task will be assigned the specified ID. Use this option to enqueue a task with a specific ID to prevent duplicate tasks. If a task with the same ID already exists in the queue, it will be replaced by the new task.
func Timeout ¶ added in v0.4.0
func Timeout(d time.Duration) TaskOption
Timeout sets the timeout for the task. The task will be marked as failed if it takes longer than the specified duration.
func Unique ¶ added in v0.4.0
func Unique(ttl time.Duration) TaskOption
Unique sets the uniqueness constraint for the task. The task will not be enqueued if there is an identical task already in the queue. The uniqueness constraint is based on the task type and payload. The uniqueness constraint is valid for the specified duration.
type TaskScheduler ¶ added in v0.3.0
type TaskScheduler interface { // TaskName returns the name of the task. It is used to register the task handler. TaskName() string // Schedule returns the cron spec for the task. // For more information about cron spec, see https://pkg.go.dev/github.com/robfig/cron/v3#hdr-CRON_Expression_Format. Schedule() string // Options returns the options for the task scheduler. Options() []TaskOption }
TaskScheduler is an interface for task schedulers. It is used to register task schedulers in the queue server.
func NewTaskScheduler ¶ added in v0.3.0
func NewTaskScheduler(cronSpec, name string, opts ...TaskOption) TaskScheduler
NewTaskScheduler creates a new task scheduler with the given cron spec and name.