workerpool

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor interface {
	// Execute runs the task-specific logic with the given payload.
	//
	// Parameters:
	//   - ctx: context for execution
	//   - payload: task-specific data needed for execution
	//
	// Returns:
	//   - error: if execution fails
	Execute(ctx context.Context, payload any) error
}

Executor defines the interface for task execution logic. Implementations should contain the actual business logic for processing specific types of tasks.

type IJobPool

type IJobPool interface {
	// NewTask creates a new task with the specified parameters.
	//
	// Parameters:
	//   - taskId: unique identifier for the task
	//   - taskType: classification of the task (e.g., "email", "notification")
	//   - scheduledTime: optional time when the task should be executed
	//   - taskPayload: data required for task execution
	//
	// Returns:
	//   - *asynq.Task: the created task object
	//   - error: if task creation fails
	NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)

	// ProcessTask executes the given task using the provided executor.
	//
	// Parameters:
	//   - ctx: context for task execution
	//   - task: the task to be processed
	//   - f: executor implementing the task logic
	//   - payload: data needed for task execution
	//
	// Returns:
	//   - error: if task processing fails
	ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error

	// EnqueueTask schedules a task for future execution.
	//
	// Parameters:
	//   - ctx: context for task enqueuing
	//   - task: the task to be scheduled
	//   - delay: optional duration to delay task execution
	//
	// Returns:
	//   - *asynq.TaskInfo: information about the enqueued task
	//   - error: if task enqueuing fails
	EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
}

IJobPool defines the interface for task pool operations. Implementations must provide methods for creating, processing, and enqueueing tasks in a distributed task processing system.

type Option

type Option func(processor *TaskPoolProcessor)

`Option` is a functional option pattern used to modify the behavior of the `TaskPoolProcessor` struct. It is a function that takes a pointer to a `TaskPoolProcessor` instance as its argument and modifies its fields. This pattern is commonly used in Go to provide flexible and extensible APIs.

func WithConcurrencyFactor

func WithConcurrencyFactor(factor *int) Option

func WithJobPoolClient

func WithJobPoolClient(client *asynq.Client) Option

This function takes an asynq client as input and returns an option.

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger returns a new TaskPoolProcessor with the given logger

func WithMaxRetry

func WithMaxRetry(maxRetry *int) Option

WithMaxRetry sets the maximum number of retry attempts for a given task

func WithRedisAddress

func WithRedisAddress(addr *string) Option

func WithRedisClient

func WithRedisClient(client *redis.Client) Option

WithRedisClient returns a new TaskPoolProcessor that uses the given Redis connection

func WithRedisPassword

func WithRedisPassword(password *string) Option

func WithRedisUsername

func WithRedisUsername(username *string) Option

func WithTaskResultTTL

func WithTaskResultTTL(ttl *time.Duration) Option

WithTaskResultTTL returns a new TaskPoolProcessor with the given task result

func WithTaskTimeout

func WithTaskTimeout(timeout *time.Duration) Option

type TaskPoolProcessor

type TaskPoolProcessor struct {
	RedisAddress      *string
	RedisUserName     *string
	RedisPassword     *string
	ConcurrencyFactor *int
	// contains filtered or unexported fields
}

TaskPoolProcessor manages asynchronous task processing using Redis-backed queues. It provides a robust worker pool implementation for handling distributed tasks with features like retry mechanisms, timeouts, and concurrent processing.

The processor uses asynq for task queue management and requires Redis for persistence and message broker capabilities.

func NewTaskPoolProcessor

func NewTaskPoolProcessor(opts ...Option) (*TaskPoolProcessor, error)

NewTaskPoolProcessor creates a new TaskPoolProcessor with the provided options. It initializes the Redis connection, worker pool, and task processing configuration.

Parameters:

  • opts: variadic list of Option functions to configure the processor

Returns:

  • *TaskPoolProcessor: configured processor instance
  • error: if initialization fails

func (*TaskPoolProcessor) EnqueueTask

func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)

EnqueueTask enqueues a task to be processed by the worker pool with a specified delay. It returns the task information and any error encountered during enqueuing.

Example usage:

delay := 5 * time.Minute
taskInfo, err := pool.EnqueueTask(ctx, task, &delay)
if err != nil {
    log.Printf("failed to enqueue task: %v", err)
    return err
}

func (*TaskPoolProcessor) NewTask

func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)

NewTask creates a new task with the specified parameters and options. It marshals the taskPayload into JSON and sets up the task with retry and timeout configurations.

Parameters:

  • taskId: unique identifier for the task
  • taskType: type/category of the task (e.g., "email:send", "image:resize")
  • scheduledTime: optional time when the task should be executed
  • taskPayload: data required for task execution (will be marshaled to JSON)

Example usage:

type EmailPayload struct {
    UserID    string
    Template  string
    Variables map[string]string
}

payload := EmailPayload{
    UserID:    "123",
    Template:  "welcome_email",
    Variables: map[string]string{"name": "John"},
}
task, err := pool.NewTask("task-123", "email:send", nil, payload)

func (*TaskPoolProcessor) ProcessTask

func (t *TaskPoolProcessor) ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error

ProcessTask unmarshals the task payload and executes the task using the provided executor. It returns an error if either the unmarshal operation fails or the task execution fails.

Parameters:

  • ctx: context for task execution
  • task: the task to be processed
  • f: executor implementing the Execute method
  • payload: pointer to a struct where the task payload will be unmarshaled

Example usage:

type EmailExecutor struct{}

func (e *EmailExecutor) Execute(ctx context.Context, payload any) error {
    emailPayload, ok := payload.(*EmailPayload)
    if !ok {
        return fmt.Errorf("invalid payload type")
    }
    // Process the email task
    return nil
}

var payload EmailPayload
err := pool.ProcessTask(ctx, task, &EmailExecutor{}, &payload)

func (*TaskPoolProcessor) Run

func (p *TaskPoolProcessor) Run(handler asynq.Handler) error

func (*TaskPoolProcessor) Validate

func (p *TaskPoolProcessor) Validate() error

type TaskStatus

type TaskStatus string
const (
	TaskStatusWaiting   TaskStatus = "task:waiting"
	TaskStatusRunning   TaskStatus = "task:running"
	TaskStatusCompleted TaskStatus = "task:completed"
	TaskStatusFailed    TaskStatus = "task:failed"
)

type TaskType

type TaskType string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

This implementation defines a WorkerPool struct that contains a channel of channels (workers) that can hold a maximum of `max-workers` worker channels

To use the goroutine pool, you can create a WorkerPool instance and start it:

```go
	pool := NewWorkerPool(10)
	pool.Start()
```

func NewWorkerPool

func NewWorkerPool(maxWorkers int) *WorkerPool

NewWorkerPool creates a new WorkerPool with the given number of workers

func (*WorkerPool) ExecuteTask

func (w *WorkerPool) ExecuteTask(task func())

ExecuteTask takes a function `task` as an argument. It creates a new channel `taskChan` and sends it to the `workers` channel of the `WorkerPool`. Then it sends the `task` function to the `taskChan` channel. This method is used to add a new task to the worker pool for execution.

func (*WorkerPool) Start

func (w *WorkerPool) Start()

Start creates a number of goroutines that listen for tasks on their worker channel and executes them

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL