Documentation ¶
Index ¶
- type Executor
- type IJobPool
- type Option
- func WithConcurrencyFactor(factor *int) Option
- func WithJobPoolClient(client *asynq.Client) Option
- func WithLogger(logger *zap.Logger) Option
- func WithMaxRetry(maxRetry *int) Option
- func WithRedisAddress(addr *string) Option
- func WithRedisClient(client *redis.Client) Option
- func WithRedisPassword(password *string) Option
- func WithRedisUsername(username *string) Option
- func WithTaskResultTTL(ttl *time.Duration) Option
- func WithTaskTimeout(timeout *time.Duration) Option
- type TaskPoolProcessor
- func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
- func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)
- func (t *TaskPoolProcessor) ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error
- func (p *TaskPoolProcessor) Run(handler asynq.Handler) error
- func (p *TaskPoolProcessor) Validate() error
- type TaskStatus
- type TaskType
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Executor ¶
The `Executor` interface is defining a new type that represents an executor for a task. It specifies a single method `Execute` that takes a context and a payload as arguments and returns an error. This interface is used to define the behavior of the function that will be executed when a task is processed by the worker pool. By defining this interface, the `TaskPoolProcessor` can be decoupled from any specific implementation of the task executor and can work with any type that satisfies the `Executor` interface. This makes the `TaskPoolProcessor` more flexible and extensible.
type IJobPool ¶
type IJobPool interface { // `NewTask` is a method defined in the `IJobPool` interface. It takes a `taskId` string and a pointer // to a `scheduledTime` of type `time.Time` as arguments and returns a pointer to an `asynq.Task` // instance and an error. This method is used to create a new task with the given `taskId` and // `scheduledTime` and return it as an `asynq.Task` instance. The `asynq.Task` instance can then be // used to enqueue the task for processing by the worker pool. NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error) // The `ProcessTask` method is defined in the `IJobPool` interface and takes a context and an // `asynq.Task` instance as arguments. It is used to process the given task asynchronously. The // implementation of this method will vary depending on the specific task being processed, but it // typically involves performing some kind of computation or I/O operation. The method returns an error // if there was a problem processing the task. ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error // The `EnqueueTask` method is defined in the `IJobPool` interface and is used to enqueue a task for // processing by the worker pool. It takes a context, a pointer to an `asynq.Task` instance, and a // pointer to a `time.Duration` representing the delay before the task should be processed as // arguments. The method returns a pointer to an `asynq.TaskInfo` instance and an error. The // `asynq.TaskInfo` instance contains information about the enqueued task, such as its ID and its // scheduled time. This method is typically used to add a new task to the worker pool for processing at // a later time. EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error) }
The `IJobPool` interface is defining a set of methods that must be implemented by any type that wants to act as a job pool for the `TaskPoolProcessor`. It specifies the behavior that the job pool must have, including creating new tasks, processing tasks, and enqueuing tasks for processing. By defining this interface, the `TaskPoolProcessor` can be decoupled from any specific implementation of the job pool and can work with any type that satisfies the `IJobPool` interface. This makes the `TaskPoolProcessor` more flexible and extensible.
type Option ¶
type Option func(processor *TaskPoolProcessor)
`Option` is a functional option pattern used to modify the behavior of the `TaskPoolProcessor` struct. It is a function that takes a pointer to a `TaskPoolProcessor` instance as its argument and modifies its fields. This pattern is commonly used in Go to provide flexible and extensible APIs.
func WithConcurrencyFactor ¶
func WithJobPoolClient ¶
This function takes an asynq client as input and returns an option.
func WithLogger ¶
WithLogger returns a new TaskPoolProcessor with the given logger
func WithMaxRetry ¶
WithMaxRetry sets the maximum number of retry attempts for a given task
func WithRedisAddress ¶
func WithRedisClient ¶
WithRedisClient returns a new TaskPoolProcessor that uses the given Redis connection
func WithRedisPassword ¶
func WithRedisUsername ¶
func WithTaskResultTTL ¶
WithTaskResultTTL returns a new TaskPoolProcessor with the given task result
func WithTaskTimeout ¶
type TaskPoolProcessor ¶
type TaskPoolProcessor struct { RedisAddress *string RedisUserName *string RedisPassword *string ConcurrencyFactor *int // contains filtered or unexported fields }
The `TaskPoolProcessor` struct is defining a new type that represents a worker pool processor. It contains fields for an `asynq.Client` instance, a Redis client instance, a logger instance, a task result time-to-live duration, and an `asynq.Server` instance representing the worker. This struct is used to manage the worker pool and process tasks asynchronously.
func NewTaskPoolProcessor ¶
func NewTaskPoolProcessor(opts ...Option) (*TaskPoolProcessor, error)
func (*TaskPoolProcessor) EnqueueTask ¶
func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
EnqueueTask implements IJobPool.
func (*TaskPoolProcessor) NewTask ¶
func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)
NewTask implements IJobPool.
func (*TaskPoolProcessor) ProcessTask ¶
func (*TaskPoolProcessor) Validate ¶
func (p *TaskPoolProcessor) Validate() error
type TaskStatus ¶
type TaskStatus string
const ( TaskStatusWaiting TaskStatus = "task:waiting" TaskStatusRunning TaskStatus = "task:running" TaskStatusCompleted TaskStatus = "task:completed" TaskStatusFailed TaskStatus = "task:failed" )
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
This implementation defines a WorkerPool struct that contains a channel of channels (workers) that can hold a maximum of `max-workers` worker channels
To use the goroutine pool, you can create a WorkerPool instance and start it:
```go pool := NewWorkerPool(10) pool.Start() ```
func NewWorkerPool ¶
func NewWorkerPool(maxWorkers int) *WorkerPool
NewWorkerPool creates a new WorkerPool with the given number of workers
func (*WorkerPool) ExecuteTask ¶
func (w *WorkerPool) ExecuteTask(task func())
ExecuteTask takes a function `task` as an argument. It creates a new channel `taskChan` and sends it to the `workers` channel of the `WorkerPool`. Then it sends the `task` function to the `taskChan` channel. This method is used to add a new task to the worker pool for execution.
func (*WorkerPool) Start ¶
func (w *WorkerPool) Start()
Start creates a number of goroutines that listen for tasks on their worker channel and executes them