Documentation ¶
Index ¶
- type Executor
- type IJobPool
- type Option
- func WithConcurrencyFactor(factor *int) Option
- func WithJobPoolClient(client *asynq.Client) Option
- func WithLogger(logger *zap.Logger) Option
- func WithMaxRetry(maxRetry *int) Option
- func WithRedisAddress(addr *string) Option
- func WithRedisClient(client *redis.Client) Option
- func WithRedisPassword(password *string) Option
- func WithRedisUsername(username *string) Option
- func WithTaskResultTTL(ttl *time.Duration) Option
- func WithTaskTimeout(timeout *time.Duration) Option
- type TaskPoolProcessor
- func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
- func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)
- func (t *TaskPoolProcessor) ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error
- func (p *TaskPoolProcessor) Run(handler asynq.Handler) error
- func (p *TaskPoolProcessor) Validate() error
- type TaskStatus
- type TaskType
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor interface { // Execute runs the task-specific logic with the given payload. // // Parameters: // - ctx: context for execution // - payload: task-specific data needed for execution // // Returns: // - error: if execution fails Execute(ctx context.Context, payload any) error }
Executor defines the interface for task execution logic. Implementations should contain the actual business logic for processing specific types of tasks.
type IJobPool ¶
type IJobPool interface { // NewTask creates a new task with the specified parameters. // // Parameters: // - taskId: unique identifier for the task // - taskType: classification of the task (e.g., "email", "notification") // - scheduledTime: optional time when the task should be executed // - taskPayload: data required for task execution // // Returns: // - *asynq.Task: the created task object // - error: if task creation fails NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error) // ProcessTask executes the given task using the provided executor. // // Parameters: // - ctx: context for task execution // - task: the task to be processed // - f: executor implementing the task logic // - payload: data needed for task execution // // Returns: // - error: if task processing fails ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error // EnqueueTask schedules a task for future execution. // // Parameters: // - ctx: context for task enqueuing // - task: the task to be scheduled // - delay: optional duration to delay task execution // // Returns: // - *asynq.TaskInfo: information about the enqueued task // - error: if task enqueuing fails EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error) }
IJobPool defines the interface for task pool operations. Implementations must provide methods for creating, processing, and enqueueing tasks in a distributed task processing system.
type Option ¶
type Option func(processor *TaskPoolProcessor)
`Option` is a functional option pattern used to modify the behavior of the `TaskPoolProcessor` struct. It is a function that takes a pointer to a `TaskPoolProcessor` instance as its argument and modifies its fields. This pattern is commonly used in Go to provide flexible and extensible APIs.
func WithConcurrencyFactor ¶
func WithJobPoolClient ¶
This function takes an asynq client as input and returns an option.
func WithLogger ¶
WithLogger returns a new TaskPoolProcessor with the given logger
func WithMaxRetry ¶
WithMaxRetry sets the maximum number of retry attempts for a given task
func WithRedisAddress ¶
func WithRedisClient ¶
WithRedisClient returns a new TaskPoolProcessor that uses the given Redis connection
func WithRedisPassword ¶
func WithRedisUsername ¶
func WithTaskResultTTL ¶
WithTaskResultTTL returns a new TaskPoolProcessor with the given task result
func WithTaskTimeout ¶
type TaskPoolProcessor ¶
type TaskPoolProcessor struct { RedisAddress *string RedisUserName *string RedisPassword *string ConcurrencyFactor *int // contains filtered or unexported fields }
TaskPoolProcessor manages asynchronous task processing using Redis-backed queues. It provides a robust worker pool implementation for handling distributed tasks with features like retry mechanisms, timeouts, and concurrent processing.
The processor uses asynq for task queue management and requires Redis for persistence and message broker capabilities.
func NewTaskPoolProcessor ¶
func NewTaskPoolProcessor(opts ...Option) (*TaskPoolProcessor, error)
NewTaskPoolProcessor creates a new TaskPoolProcessor with the provided options. It initializes the Redis connection, worker pool, and task processing configuration.
Parameters:
- opts: variadic list of Option functions to configure the processor
Returns:
- *TaskPoolProcessor: configured processor instance
- error: if initialization fails
func (*TaskPoolProcessor) EnqueueTask ¶
func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
EnqueueTask enqueues a task to be processed by the worker pool with a specified delay. It returns the task information and any error encountered during enqueuing.
Example usage:
delay := 5 * time.Minute taskInfo, err := pool.EnqueueTask(ctx, task, &delay) if err != nil { log.Printf("failed to enqueue task: %v", err) return err }
func (*TaskPoolProcessor) NewTask ¶
func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)
NewTask creates a new task with the specified parameters and options. It marshals the taskPayload into JSON and sets up the task with retry and timeout configurations.
Parameters:
- taskId: unique identifier for the task
- taskType: type/category of the task (e.g., "email:send", "image:resize")
- scheduledTime: optional time when the task should be executed
- taskPayload: data required for task execution (will be marshaled to JSON)
Example usage:
type EmailPayload struct { UserID string Template string Variables map[string]string } payload := EmailPayload{ UserID: "123", Template: "welcome_email", Variables: map[string]string{"name": "John"}, } task, err := pool.NewTask("task-123", "email:send", nil, payload)
func (*TaskPoolProcessor) ProcessTask ¶
func (t *TaskPoolProcessor) ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error
ProcessTask unmarshals the task payload and executes the task using the provided executor. It returns an error if either the unmarshal operation fails or the task execution fails.
Parameters:
- ctx: context for task execution
- task: the task to be processed
- f: executor implementing the Execute method
- payload: pointer to a struct where the task payload will be unmarshaled
Example usage:
type EmailExecutor struct{} func (e *EmailExecutor) Execute(ctx context.Context, payload any) error { emailPayload, ok := payload.(*EmailPayload) if !ok { return fmt.Errorf("invalid payload type") } // Process the email task return nil } var payload EmailPayload err := pool.ProcessTask(ctx, task, &EmailExecutor{}, &payload)
func (*TaskPoolProcessor) Validate ¶
func (p *TaskPoolProcessor) Validate() error
type TaskStatus ¶
type TaskStatus string
const ( TaskStatusWaiting TaskStatus = "task:waiting" TaskStatusRunning TaskStatus = "task:running" TaskStatusCompleted TaskStatus = "task:completed" TaskStatusFailed TaskStatus = "task:failed" )
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
This implementation defines a WorkerPool struct that contains a channel of channels (workers) that can hold a maximum of `max-workers` worker channels
To use the goroutine pool, you can create a WorkerPool instance and start it:
```go pool := NewWorkerPool(10) pool.Start() ```
func NewWorkerPool ¶
func NewWorkerPool(maxWorkers int) *WorkerPool
NewWorkerPool creates a new WorkerPool with the given number of workers
func (*WorkerPool) ExecuteTask ¶
func (w *WorkerPool) ExecuteTask(task func())
ExecuteTask takes a function `task` as an argument. It creates a new channel `taskChan` and sends it to the `workers` channel of the `WorkerPool`. Then it sends the `task` function to the `taskChan` channel. This method is used to add a new task to the worker pool for execution.
func (*WorkerPool) Start ¶
func (w *WorkerPool) Start()
Start creates a number of goroutines that listen for tasks on their worker channel and executes them