workerpool

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor interface {
	Execute(ctx context.Context, payload any) error
}

The `Executor` interface is defining a new type that represents an executor for a task. It specifies a single method `Execute` that takes a context and a payload as arguments and returns an error. This interface is used to define the behavior of the function that will be executed when a task is processed by the worker pool. By defining this interface, the `TaskPoolProcessor` can be decoupled from any specific implementation of the task executor and can work with any type that satisfies the `Executor` interface. This makes the `TaskPoolProcessor` more flexible and extensible.

type IJobPool

type IJobPool interface {
	// `NewTask` is a method defined in the `IJobPool` interface. It takes a `taskId` string and a pointer
	// to a `scheduledTime` of type `time.Time` as arguments and returns a pointer to an `asynq.Task`
	// instance and an error. This method is used to create a new task with the given `taskId` and
	// `scheduledTime` and return it as an `asynq.Task` instance. The `asynq.Task` instance can then be
	// used to enqueue the task for processing by the worker pool.
	NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)
	// The `ProcessTask` method is defined in the `IJobPool` interface and takes a context and an
	// `asynq.Task` instance as arguments. It is used to process the given task asynchronously. The
	// implementation of this method will vary depending on the specific task being processed, but it
	// typically involves performing some kind of computation or I/O operation. The method returns an error
	// if there was a problem processing the task.
	ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error
	// The `EnqueueTask` method is defined in the `IJobPool` interface and is used to enqueue a task for
	// processing by the worker pool. It takes a context, a pointer to an `asynq.Task` instance, and a
	// pointer to a `time.Duration` representing the delay before the task should be processed as
	// arguments. The method returns a pointer to an `asynq.TaskInfo` instance and an error. The
	// `asynq.TaskInfo` instance contains information about the enqueued task, such as its ID and its
	// scheduled time. This method is typically used to add a new task to the worker pool for processing at
	// a later time.
	EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)
}

The `IJobPool` interface is defining a set of methods that must be implemented by any type that wants to act as a job pool for the `TaskPoolProcessor`. It specifies the behavior that the job pool must have, including creating new tasks, processing tasks, and enqueuing tasks for processing. By defining this interface, the `TaskPoolProcessor` can be decoupled from any specific implementation of the job pool and can work with any type that satisfies the `IJobPool` interface. This makes the `TaskPoolProcessor` more flexible and extensible.

type Option

type Option func(processor *TaskPoolProcessor)

`Option` is a functional option pattern used to modify the behavior of the `TaskPoolProcessor` struct. It is a function that takes a pointer to a `TaskPoolProcessor` instance as its argument and modifies its fields. This pattern is commonly used in Go to provide flexible and extensible APIs.

func WithConcurrencyFactor

func WithConcurrencyFactor(factor *int) Option

func WithJobPoolClient

func WithJobPoolClient(client *asynq.Client) Option

This function takes an asynq client as input and returns an option.

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger returns a new TaskPoolProcessor with the given logger

func WithMaxRetry

func WithMaxRetry(maxRetry *int) Option

WithMaxRetry sets the maximum number of retry attempts for a given task

func WithRedisAddress

func WithRedisAddress(addr *string) Option

func WithRedisClient

func WithRedisClient(client *redis.Client) Option

WithRedisClient returns a new TaskPoolProcessor that uses the given Redis connection

func WithRedisPassword

func WithRedisPassword(password *string) Option

func WithRedisUsername

func WithRedisUsername(username *string) Option

func WithTaskResultTTL

func WithTaskResultTTL(ttl *time.Duration) Option

WithTaskResultTTL returns a new TaskPoolProcessor with the given task result

func WithTaskTimeout

func WithTaskTimeout(timeout *time.Duration) Option

type TaskPoolProcessor

type TaskPoolProcessor struct {
	RedisAddress      *string
	RedisUserName     *string
	RedisPassword     *string
	ConcurrencyFactor *int
	// contains filtered or unexported fields
}

The `TaskPoolProcessor` struct is defining a new type that represents a worker pool processor. It contains fields for an `asynq.Client` instance, a Redis client instance, a logger instance, a task result time-to-live duration, and an `asynq.Server` instance representing the worker. This struct is used to manage the worker pool and process tasks asynchronously.

func NewTaskPoolProcessor

func NewTaskPoolProcessor(opts ...Option) (*TaskPoolProcessor, error)

func (*TaskPoolProcessor) EnqueueTask

func (t *TaskPoolProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, delay *time.Duration) (*asynq.TaskInfo, error)

EnqueueTask implements IJobPool.

func (*TaskPoolProcessor) NewTask

func (t *TaskPoolProcessor) NewTask(taskId, taskType string, scheduledTime *time.Time, taskPayload any) (*asynq.Task, error)

NewTask implements IJobPool.

func (*TaskPoolProcessor) ProcessTask

func (t *TaskPoolProcessor) ProcessTask(ctx context.Context, task *asynq.Task, f Executor, payload any) error

func (*TaskPoolProcessor) Run

func (p *TaskPoolProcessor) Run(handler asynq.Handler) error

func (*TaskPoolProcessor) Validate

func (p *TaskPoolProcessor) Validate() error

type TaskStatus

type TaskStatus string
const (
	TaskStatusWaiting   TaskStatus = "task:waiting"
	TaskStatusRunning   TaskStatus = "task:running"
	TaskStatusCompleted TaskStatus = "task:completed"
	TaskStatusFailed    TaskStatus = "task:failed"
)

type TaskType

type TaskType string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

This implementation defines a WorkerPool struct that contains a channel of channels (workers) that can hold a maximum of `max-workers` worker channels

To use the goroutine pool, you can create a WorkerPool instance and start it:

```go
	pool := NewWorkerPool(10)
	pool.Start()
```

func NewWorkerPool

func NewWorkerPool(maxWorkers int) *WorkerPool

NewWorkerPool creates a new WorkerPool with the given number of workers

func (*WorkerPool) ExecuteTask

func (w *WorkerPool) ExecuteTask(task func())

ExecuteTask takes a function `task` as an argument. It creates a new channel `taskChan` and sends it to the `workers` channel of the `WorkerPool`. Then it sends the `task` function to the `taskChan` channel. This method is used to add a new task to the worker pool for execution.

func (*WorkerPool) Start

func (w *WorkerPool) Start()

Start creates a number of goroutines that listen for tasks on their worker channel and executes them

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL