asyncer

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2024 License: MIT Imports: 11 Imported by: 2

README

asyncer

GitHub tag (latest SemVer) Go Reference License

Tests CodeQL Analysis GolangCI Lint Go Report Card

A type-safe distributed task queue in Go, built on top of hibiken/asynq.

Key Features

  • Type-safe task handlers
  • Support for immediate and scheduled tasks
  • Redis-based task queue
  • Built-in monitoring UI

Installation

go get github.com/dmitrymomot/asyncer

Usage Examples

Email Service Example

This example demonstrates how to implement an email service with different types of emails:

package email

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/dmitrymomot/asyncer"
    "github.com/redis/go-redis/v9"
)

// Task names
const (
    WelcomeEmailTask     = "email:welcome"
    PasswordResetTask    = "email:password_reset"
    WeeklyDigestTask    = "email:weekly_digest"
)

// Task payloads
type WelcomeEmail struct {
    UserID    int64  `json:"user_id"`
    Email     string `json:"email"`
    FirstName string `json:"first_name"`
}

type PasswordResetEmail struct {
    UserID      int64  `json:"user_id"`
    Email       string `json:"email"`
    ResetToken  string `json:"reset_token"`
    ExpiresAt   int64  `json:"expires_at"`
}

type WeeklyDigestEmail struct {
    UserID       int64    `json:"user_id"`
    Email        string   `json:"email"`
    ArticleIDs   []int64  `json:"article_ids"`
    WeekNumber   int      `json:"week_number"`
}

// EmailService handles email sending operations
type EmailService struct {
    enqueuer *asyncer.Enqueuer
}

// NewEmailService creates a new email service
func NewEmailService(redis *redis.Client) *EmailService {
    return &EmailService{
        enqueuer: asyncer.MustNewEnqueuer(redis),
    }
}

// SendWelcomeEmail enqueues a welcome email task
func (s *EmailService) SendWelcomeEmail(ctx context.Context, userID int64, email, firstName string) error {
    return s.enqueuer.EnqueueTask(ctx, WelcomeEmailTask, WelcomeEmail{
        UserID:    userID,
        Email:     email,
        FirstName: firstName,
    })
}

// SendPasswordResetEmail enqueues a password reset email task
func (s *EmailService) SendPasswordResetEmail(ctx context.Context, userID int64, email, token string, expiresAt int64) error {
    return s.enqueuer.EnqueueTask(ctx, PasswordResetTask, PasswordResetEmail{
        UserID:     userID,
        Email:      email,
        ResetToken: token,
        ExpiresAt:  expiresAt,
    })
}

// ScheduleWeeklyDigest schedules weekly digest emails
func (s *EmailService) ScheduleWeeklyDigest(ctx context.Context, userID int64, email string, articleIDs []int64, weekNum int) error {
    return s.enqueuer.EnqueueTask(ctx, WeeklyDigestTask, WeeklyDigestEmail{
        UserID:     userID,
        Email:      email,
        ArticleIDs: articleIDs,
        WeekNumber: weekNum,
    })
}
Email Worker Example

Implementation of the email processing worker:

package worker

import (
    "context"
    "fmt"

    "github.com/dmitrymomot/asyncer"
    "github.com/redis/go-redis/v9"
    "your/app/email"
    "your/app/mailer" // your email sending implementation
)

type EmailWorker struct {
    mailer mailer.Service
}

func NewEmailWorker(mailer mailer.Service) *EmailWorker {
    return &EmailWorker{mailer: mailer}
}

// HandleWelcomeEmail processes welcome emails
func (w *EmailWorker) HandleWelcomeEmail(ctx context.Context, payload email.WelcomeEmail) error {
    return w.mailer.Send(ctx, mailer.Email{
        To:      payload.Email,
        Subject: "Welcome to Our Platform!",
        Template: "welcome",
        Data: map[string]interface{}{
            "first_name": payload.FirstName,
        },
    })
}

// HandlePasswordReset processes password reset emails
func (w *EmailWorker) HandlePasswordReset(ctx context.Context, payload email.PasswordResetEmail) error {
    return w.mailer.Send(ctx, mailer.Email{
        To:      payload.Email,
        Subject: "Password Reset Request",
        Template: "password_reset",
        Data: map[string]interface{}{
            "reset_link": fmt.Sprintf("https://app.example.com/reset?token=%s", payload.ResetToken),
            "expires_at": payload.ExpiresAt,
        },
    })
}

// HandleWeeklyDigest processes weekly digest emails
func (w *EmailWorker) HandleWeeklyDigest(ctx context.Context, payload email.WeeklyDigestEmail) error {
    articles, err := fetchArticles(ctx, payload.ArticleIDs)
    if err != nil {
        return fmt.Errorf("failed to fetch articles: %w", err)
    }

    return w.mailer.Send(ctx, mailer.Email{
        To:      payload.Email,
        Subject: fmt.Sprintf("Your Weekly Digest - Week %d", payload.WeekNumber),
        Template: "weekly_digest",
        Data: map[string]interface{}{
            "articles": articles,
            "week_number": payload.WeekNumber,
        },
    })
}

// StartWorker initializes and runs the email worker
func StartWorker(ctx context.Context, redis *redis.Client, worker *EmailWorker) error {
    return asyncer.RunQueueServer(
        ctx,
        redis,
        nil, // default logger
        asyncer.HandlerFunc(email.WelcomeEmailTask, worker.HandleWelcomeEmail),
        asyncer.HandlerFunc(email.PasswordResetTask, worker.HandlePasswordReset),
        asyncer.HandlerFunc(email.WeeklyDigestTask, worker.HandleWeeklyDigest),
    )
}
Usage in Application

Example of using the email service in your application:

package main

import (
    "context"

    "your/app/email"
    "your/app/worker"
)

func main() {
    // Initialize your Redis client and other dependencies
    redisClient := initRedis()
    mailerService := initMailer()

    // Initialize email service for enqueueing tasks
    emailService := email.NewEmailService(redisClient)

    // Start the worker in a separate goroutine
    go func() {
        emailWorker := worker.NewEmailWorker(mailerService)
        if err := worker.StartWorker(context.Background(), redisClient, emailWorker); err != nil {
            log.Fatal(err)
        }
    }()

    // Use the email service in your application
    if err := emailService.SendWelcomeEmail(
        context.Background(),
        123,
        "user@example.com",
        "John",
    ); err != nil {
        log.Printf("Failed to send welcome email: %v", err)
    }
}
Scheduled Tasks Example

Example of setting up recurring notifications:

package notifications

import (
    "context"
    "time"

    "github.com/dmitrymomot/asyncer"
    "github.com/redis/go-redis/v9"
)

const DigestSchedulerTask = "scheduler:weekly_digest"

// StartScheduler initializes the task scheduler
func StartScheduler(ctx context.Context, redis *redis.Client) error {
    return asyncer.RunSchedulerServer(
        ctx,
        redis,
        nil, // default logger
        // Schedule weekly digest every Monday at 9 AM
        asyncer.NewTaskScheduler(
            "0 9 * * 1", // cron expression
            DigestSchedulerTask,
            asyncer.Unique(24*time.Hour), // prevent duplicate runs
        ),
    )
}

Configuration

Enqueuer Options
Option Description
WithTaskDeadline(d time.Duration) Sets maximum task execution time
WithMaxRetry(n int) Sets maximum retry attempts
WithQueue(name string) Specifies queue name
WithRetention(d time.Duration) Sets task retention period
// Enqueue with options
enqueuer.EnqueueTask(
    ctx,
    taskName,
    payload,
    asyncer.WithMaxRetry(3),
    asyncer.WithQueue("high"),
    asyncer.WithTaskDeadline(5*time.Minute),
)
Scheduler Options
Option Description
MaxRetry(n int) Sets retry attempts for scheduled tasks
Timeout(d time.Duration) Sets task timeout
Unique(d time.Duration) Prevents duplicate tasks within duration
Queue(name string) Specifies queue for scheduled tasks
// Schedule with options
asyncer.NewTaskScheduler(
    "*/30 * * * *", // every 30 minutes
    taskName,
    asyncer.MaxRetry(3),
    asyncer.Timeout(1*time.Minute),
    asyncer.Queue("scheduled"),
)

Monitoring

Asynqmon Web Interface

Access the monitoring dashboard at http://localhost:8181 to:

  • View active, pending, and completed tasks
  • Monitor queue statistics
  • Inspect task details and errors
  • Manage task queues
Logging

The package supports structured logging through the standard slog package:

asyncer.NewSlogAdapter(slog.Default().With(
    slog.String("component", "queue-server"),
))

Contributing

Contributions to the asyncer package are welcome! Here are some ways you can contribute:

  • Reporting bugs
  • Covering code with tests
  • Suggesting enhancements
  • Submitting pull requests
  • Sharing the love by telling others about this project

License

This project is licensed under the MIT License - see the LICENSE file for details. This project contains some code from hibiken/asynq package, which is also licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	LogLevelDebug = "debug"
	LogLevelInfo  = "info"
	LogLevelWarn  = "warn"
	LogLevelError = "error"
	LogLevelFatal = "fatal"
)

Log levels string representation.

Variables

View Source
var (
	ErrFailedToParseRedisURI            = errors.New("failed to parse redis connection string")
	ErrMissedAsynqClient                = errors.New("missed asynq client")
	ErrFailedToCreateEnqueuerWithClient = errors.New("failed to create enqueuer with asynq client")
	ErrFailedToEnqueueTask              = errors.New("failed to enqueue task")
	ErrFailedToCloseEnqueuer            = errors.New("failed to close enqueuer")
	ErrFailedToStartQueueServer         = errors.New("failed to start queue server")
	ErrFailedToUnmarshalPayload         = errors.New("failed to unmarshal payload")
	ErrFailedToRunQueueServer           = errors.New("failed to run queue server")
	ErrFailedToScheduleTask             = errors.New("failed to schedule task")
	ErrFailedToStartSchedulerServer     = errors.New("failed to start scheduler server")
	ErrCronSpecIsEmpty                  = errors.New("cron spec is empty")
	ErrTaskNameIsEmpty                  = errors.New("task name is empty")
	ErrFailedToRunSchedulerServer       = errors.New("failed to run scheduler server")
)

Predefined errors.

Functions

func NewClient

func NewClient(redisClient redis.UniversalClient) (*asynq.Client, error)

NewClient creates a new instance of the asynq client using the provided Redis connection string. It returns the created client, the Redis connection options, and any error encountered during the process.

func NewSlogAdapter added in v0.3.3

func NewSlogAdapter(log *slog.Logger) asynq.Logger

func RunQueueServer added in v0.2.0

func RunQueueServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, handlers ...TaskHandler) func() error

RunQueueServer starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, _ := errgroup.WithContext(context.Background())
eg.Go(asyncer.RunQueueServer(
	"redis://localhost:6379",
	logger,
	asyncer.HandlerFunc[PayloadStruct1]("task1", task1Handler),
	asyncer.HandlerFunc[PayloadStruct2]("task2", task2Handler),
))

func task1Handler(ctx context.Context, payload PayloadStruct1) error {
	// ... handle task here ...
}

func task2Handler(ctx context.Context, payload PayloadStruct2) error {
	// ... handle task here ...
}

The function panics if the redis connection string is invalid. The function returns an error if the server fails to start.

func RunSchedulerServer added in v0.2.0

func RunSchedulerServer(ctx context.Context, redisClient redis.UniversalClient, log asynq.Logger, schedulers ...TaskScheduler) func() error
eg.Go(asyncer.RunQueueServer(
	"redis://localhost:6379",
	logger,
	asyncer.ScheduledHandlerFunc("scheduled_task_1", scheduledTaskHandler),
))

func scheduledTaskHandler(ctx context.Context) error {
	// ...handle task here...
}

The function returns an error if the server fails to start. The function panics if the Redis connection string is invalid.

!!! Pay attention, that the scheduler just triggers the job, so you need to run queue server as well.

Types

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer is a helper struct for enqueuing tasks. You can encapsulate this struct in your own struct to add queue methods. See pkg/worker/_example/enqueuer.go for an example.

func MustNewEnqueuer added in v0.2.0

func MustNewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) *Enqueuer

MustNewEnqueuer creates a new Enqueuer with the given Redis connection string and options. It panics if an error occurs during the creation of the Enqueuer.

func MustNewEnqueuerWithAsynqClient added in v0.2.0

func MustNewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer

MustNewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It panics if an error occurs during the creation of the Enqueuer.

func NewEnqueuer

func NewEnqueuer(redisClient redis.UniversalClient, opt ...EnqueuerOption) (*Enqueuer, error)

NewEnqueuer creates a new Enqueuer with the given Redis connection string and options. Default values are used if no option is provided. It returns a pointer to the Enqueuer and an error if there was a problem creating the Enqueuer.

func NewEnqueuerWithAsynqClient added in v0.2.0

func NewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) (*Enqueuer, error)

NewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It returns a pointer to the Enqueuer and an error if the Asynq client is nil. The Enqueuer is responsible for enqueueing tasks to the Asynq server. Default values are used if no option is provided. Default values are:

  • queue name: "default"
  • task deadline: 1 minute
  • max retry: 3

func (*Enqueuer) Close added in v0.2.0

func (e *Enqueuer) Close() error

Close closes the Enqueuer and releases any resources associated with it. It returns an error if there was a problem closing the Enqueuer.

func (*Enqueuer) EnqueueTask

func (e *Enqueuer) EnqueueTask(ctx context.Context, taskName string, payload any, opts ...TaskOption) error

EnqueueTask enqueues a task to be processed asynchronously. It takes a context and a task as parameters. The task is enqueued with the specified queue name, deadline, maximum retry count, and uniqueness constraint. Returns an error if the task fails to enqueue.

type EnqueuerOption

type EnqueuerOption func(*Enqueuer)

EnqueuerOption is a function that configures an enqueuer.

func WithMaxRetry

func WithMaxRetry(n int) EnqueuerOption

WithMaxRetry configures the max retry. The max retry is the number of times the task will be retried if it fails.

func WithQueueNameEnq

func WithQueueNameEnq(name string) EnqueuerOption

WithQueueNameEnq configures the queue name for enqueuing. The queue name is the name of the queue where the task will be enqueued.

func WithTaskDeadline

func WithTaskDeadline(d time.Duration) EnqueuerOption

WithTaskDeadline configures the task deadline. The task deadline is the time limit for the task to be processed.

type QueueServer

type QueueServer struct {
	// contains filtered or unexported fields
}

QueueServer is a wrapper for asynq.Server.

func NewQueueServer

func NewQueueServer(redisClient redis.UniversalClient, opts ...QueueServerOption) *QueueServer

NewQueueServer creates a new instance of QueueServer. It takes a redis connection option and optional queue server options. The function returns a pointer to the created QueueServer.

func (*QueueServer) Run

func (srv *QueueServer) Run(handlers ...TaskHandler) func() error

Run starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(queueServer.Run(
	yourapp.NewTaskHandler1(),
	yourapp.NewTaskHandler2(),
))

The function returns an error if the server fails to start.

func (*QueueServer) Shutdown

func (srv *QueueServer) Shutdown()

Shutdown gracefully shuts down the queue server by waiting for all in-flight tasks to finish processing before shutdown.

type QueueServerOption

type QueueServerOption func(*asynq.Config)

QueueServerOption is a function that configures a QueueServer.

func WithQueueConcurrency

func WithQueueConcurrency(concurrency int) QueueServerOption

WithQueueConcurrency sets the queue concurrency.

func WithQueueLogLevel

func WithQueueLogLevel(level string) QueueServerOption

WithQueueLogLevel sets the queue log level.

func WithQueueLogger added in v0.1.1

func WithQueueLogger(logger asynq.Logger) QueueServerOption

WithQueueLogger sets the queue logger.

func WithQueueName

func WithQueueName(name string) QueueServerOption

WithQueueName sets the queue name.

func WithQueueShutdownTimeout

func WithQueueShutdownTimeout(timeout time.Duration) QueueServerOption

WithQueueShutdownTimeout sets the queue shutdown timeout.

func WithQueues

func WithQueues(queues map[string]int) QueueServerOption

WithQueues sets the queues. It panics if the sum of concurrency is not equal to the concurrency set in the config. If you want to increase the concurrency of a queue, you can use asyncer.WithQueueConcurrency before this option.

type SchedulerServer

type SchedulerServer struct {
	// contains filtered or unexported fields
}

SchedulerServer is a wrapper for asynq.Scheduler.

func NewSchedulerServer

func NewSchedulerServer(redisClient redis.UniversalClient, opts ...SchedulerServerOption) *SchedulerServer

NewSchedulerServer creates a new scheduler client and returns the server.

func (*SchedulerServer) Run

func (srv *SchedulerServer) Run() func() error

Run runs the scheduler with the provided handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(schedulerServer.Run())

func (*SchedulerServer) ScheduleTask added in v0.3.0

func (srv *SchedulerServer) ScheduleTask(cronSpec, taskName string, opts ...TaskOption) error

ScheduleTask schedules a task based on the given cron specification and task name. It returns an error if the cron specification or task name is empty, or if there was an error registering the task.

func (*SchedulerServer) Shutdown

func (srv *SchedulerServer) Shutdown()

Shutdown gracefully shuts down the scheduler server by waiting for all pending tasks to be processed.

type SchedulerServerOption

type SchedulerServerOption func(*asynq.SchedulerOpts)

SchedulerServerOption is a function that configures a SchedulerServer.

func WithPostEnqueueFunc

func WithPostEnqueueFunc(fn func(info *asynq.TaskInfo, err error)) SchedulerServerOption

WithPostEnqueueFunc sets the scheduler post enqueue function.

func WithPreEnqueueFunc

func WithPreEnqueueFunc(fn func(task *asynq.Task, opts []asynq.Option)) SchedulerServerOption

WithPreEnqueueFunc sets the scheduler pre enqueue function.

func WithSchedulerLocation

func WithSchedulerLocation(timeZone string) SchedulerServerOption

WithSchedulerLocation sets the scheduler location.

func WithSchedulerLogLevel

func WithSchedulerLogLevel(level string) SchedulerServerOption

WithSchedulerLogLevel sets the scheduler log level.

func WithSchedulerLogger added in v0.2.0

func WithSchedulerLogger(logger asynq.Logger) SchedulerServerOption

WithSchedulerLogger sets the scheduler logger.

type TaskHandler added in v0.2.0

type TaskHandler interface {
	// TaskName returns the name of the task. It is used to register the task handler.
	TaskName() string
	// Handle handles the task. It takes a context and a payload as parameters.
	Handle(ctx context.Context, payload []byte) error
	// Options returns the options for the task handler.
	Options() []asynq.Option
}

TaskHandler is an interface for task handlers. It is used to register task handlers in the queue server.

func HandlerFunc added in v0.2.0

func HandlerFunc[Payload any](name string, fn handlerFunc[Payload], opts ...TaskOption) TaskHandler

HandlerFunc is a function that creates a TaskHandler for handling tasks of a specific payload type. It takes a name string and a handler function as parameters and returns a TaskHandler. The name parameter represents the name of the handler, while the fn parameter is the actual handler function. The TaskHandler returned by HandlerFunc is responsible for executing the handler function when a task of the specified payload type is received. The payload type is specified using the generic type parameter Payload.

func ScheduledHandlerFunc added in v0.2.0

func ScheduledHandlerFunc(name string, fn scheduledHandlerFunc) TaskHandler

ScheduledHandlerFunc is a function that creates a TaskHandler for a scheduled task. It takes a name string and a scheduledHandlerFunc as parameters and returns a TaskHandler. The name parameter specifies the name of the scheduled task, while the fn parameter is the function to be executed when the task is triggered. The returned TaskHandler can be used to register the scheduled task in the queue server.

type TaskOption added in v0.4.0

type TaskOption = asynq.Option

func Deadline added in v0.4.0

func Deadline(t time.Time) TaskOption

Deadline sets the deadline for the task. The task will not be processed if it is received after the specified date and time.

func Group added in v0.4.0

func Group(g string) TaskOption

Group returns an option to specify the group used for the task. Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.

func MaxRetry added in v0.4.0

func MaxRetry(n int) TaskOption

MaxRetry sets the maximum number of retries for the task. The task will be marked as failed after the specified number of failed attempts.

func TaskID added in v0.4.0

func TaskID(id string) TaskOption

TaskID sets the ID for the task. The task will be assigned the specified ID. Use this option to enqueue a task with a specific ID to prevent duplicate tasks. If a task with the same ID already exists in the queue, it will be replaced by the new task.

func Timeout added in v0.4.0

func Timeout(d time.Duration) TaskOption

Timeout sets the timeout for the task. The task will be marked as failed if it takes longer than the specified duration.

func Unique added in v0.4.0

func Unique(ttl time.Duration) TaskOption

Unique sets the uniqueness constraint for the task. The task will not be enqueued if there is an identical task already in the queue. The uniqueness constraint is based on the task type and payload. The uniqueness constraint is valid for the specified duration.

type TaskScheduler added in v0.3.0

type TaskScheduler interface {
	// TaskName returns the name of the task. It is used to register the task handler.
	TaskName() string
	// Schedule returns the cron spec for the task.
	// For more information about cron spec, see https://pkg.go.dev/github.com/robfig/cron/v3#hdr-CRON_Expression_Format.
	Schedule() string
	// Options returns the options for the task scheduler.
	Options() []TaskOption
}

TaskScheduler is an interface for task schedulers. It is used to register task schedulers in the queue server.

func NewTaskScheduler added in v0.3.0

func NewTaskScheduler(cronSpec, name string, opts ...TaskOption) TaskScheduler

NewTaskScheduler creates a new task scheduler with the given cron spec and name.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL