async

package
v0.0.0-...-bf444b6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2024 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PathRoot    = "/"
	PathMetrics = "/metrics"
)

Variables

View Source
var (
	ErrNoEmailService       = errors.New("no email service set")             // no email service set
	ErrNoRateLimiter        = errors.New("no rate limiter set")              // no rate limiter set
	ErrNoTaskHandler        = errors.New("no task handler set")              // no task handler set
	ErrRateLimitExceeded    = errors.New("rate limit exceeded")              // rate limit exceeded
	ErrTaskPayloadUnmarshal = errors.New("failed to unmarshal task payload") // failed to unmarshal task payload
)

Functions

func NewWorkerMetricsServer

func NewWorkerMetricsServer(serverConfig *config.ServerConfig, tracer tracing.Tracer) (http.Handler, error)

NewWorkerMetricsServer creates a new metrics server to export prometheus metrics.

func SetRateLimiter

func SetRateLimiter(limit float64, burst int)

SetRateLimiter configures the rate limiter if not yet configured.

func WithErrorLogger

func WithErrorLogger(tracer tracing.Tracer) func(next asynq.Handler) asynq.Handler

WithErrorLogger logs task processing errors.

func WithMetricsExporter

func WithMetricsExporter(tracer tracing.Tracer) func(next asynq.Handler) asynq.Handler

WithMetricsExporter middleware exports prometheus metrics for asynq.

func WithRateLimiter

func WithRateLimiter(tracer tracing.Tracer, r RateLimiter) func(next asynq.Handler) asynq.Handler

WithRateLimiter middleware limits the number of tasks processed per second.

Types

type RateLimiter

type RateLimiter interface {
	Limit() rate.Limit
	Burst() int
	TokensAt(t time.Time) float64
	Tokens() float64
	Allow() bool
	AllowN(t time.Time, n int) bool
}

RateLimiter is an interface for rate limiter.

type SystemHealthCheckTaskHandler

type SystemHealthCheckTaskHandler struct {
	// contains filtered or unexported fields
}

SystemHealthCheckTaskHandler is the health check task. The health check task is used to check the health of the async worker. If the async worker is unhealthy, the task won't be processed.

func NewSystemHealthCheckTaskHandler

func NewSystemHealthCheckTaskHandler(opts ...TaskHandlerOption) (*SystemHealthCheckTaskHandler, error)

NewSystemHealthCheckTaskHandler creates a new health check task handler.

func (*SystemHealthCheckTaskHandler) ProcessTask

func (h *SystemHealthCheckTaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error

ProcessTask unmarshals the task payload and returns an error if the task payload is invalid. Otherwise, it returns nil, indicating that the task has been processed successfully.

type SystemLicenseExpiryTaskHandler

type SystemLicenseExpiryTaskHandler struct {
	// contains filtered or unexported fields
}

SystemLicenseExpiryTaskHandler is the license expiry check task. If the license is about to expire, it sends an email to the licensee.

func NewSystemLicenseExpiryTaskHandler

func NewSystemLicenseExpiryTaskHandler(opts ...TaskHandlerOption) (*SystemLicenseExpiryTaskHandler, error)

NewSystemLicenseExpiryTaskHandler creates a new license expiry check task handler.

func (*SystemLicenseExpiryTaskHandler) ProcessTask

func (h *SystemLicenseExpiryTaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error

ProcessTask unmarshals the task payload and checks if the license is about to expire. If the license is about to expire, it sends an email to the licensee. Otherwise, it skips the task.

type TaskHandlerOption

type TaskHandlerOption func(*baseTaskHandler) error

TaskHandlerOption is a function that can be used to configure a task handler.

func WithTaskEmailService

func WithTaskEmailService(emailService service.EmailService) TaskHandlerOption

WithTaskEmailService sets the email service for the worker.

func WithTaskLogger

func WithTaskLogger(logger log.Logger) TaskHandlerOption

WithTaskLogger sets the logger for the task handler.

func WithTaskTracer

func WithTaskTracer(tracer tracing.Tracer) TaskHandlerOption

WithTaskTracer sets the tracer for the task handler.

type Worker

type Worker struct {
	*asynq.ServeMux
	// contains filtered or unexported fields
}

Worker is the async worker.

func NewWorker

func NewWorker(opts ...WorkerOption) (*Worker, error)

NewWorker returns a new async worker. Before creating a worker, the rate limiter should be initialized first, otherwise the worker will not be able to start and will return an error.

func (*Worker) Shutdown

func (w *Worker) Shutdown()

Shutdown gracefully shuts down the async worker.

func (*Worker) Start

func (w *Worker) Start() error

Start starts the async worker.

type WorkerOption

type WorkerOption func(*Worker) error

WorkerOption is a function that can be used to configure an async worker.

func WithWorkerConfig

func WithWorkerConfig(conf *config.WorkerConfig) WorkerOption

WithWorkerConfig sets the config for the worker.

func WithWorkerLogger

func WithWorkerLogger(logger log.Logger) WorkerOption

WithWorkerLogger sets the logger for the worker.

func WithWorkerTaskHandler

func WithWorkerTaskHandler(taskType queue.TaskType, handler asynq.Handler) WorkerOption

WithWorkerTaskHandler sets a task handler for the worker.

func WithWorkerTracer

func WithWorkerTracer(tracer tracing.Tracer) WorkerOption

WithWorkerTracer sets the tracer for the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL