Documentation ¶
Index ¶
- Variables
- type IProcessor
- type Option
- func WithConcurrencyFactorOpt(concurrencyFactor *int) Option
- func WithInstrumentationClientOpt(instrumentationClient *instrumentation.Client) Option
- func WithLoggerOpt(logger *zap.Logger) Option
- func WithRedisAddressOpt(redisAddress string) Option
- func WithTaskHandlerOpt(taskHandler taskhandler.ITaskHandler) Option
- type ProcessingInterval
- type TaskProcessor
- func (tp *TaskProcessor) Close() error
- func (tp *TaskProcessor) EnqueueRecurringTask(ctx context.Context, task *asynq.Task, interval ProcessingInterval, ...) (*string, error)
- func (tp *TaskProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
- func (tp *TaskProcessor) Start() error
- func (tp *TaskProcessor) Validate() error
- type TaskProcessorHandler
Constants ¶
This section is empty.
Variables ¶
var ( // `ErrClientNotSet` is an error that is returned when the `client` property of the `TaskProcessor` // struct is not set. ErrClientNotSet = errors.New("client not set") // `ErrWorkerNotSet` is an error that is returned when the `worker` property of the `TaskProcessor` // struct is not set. ErrWorkerNotSet = errors.New("worker not set") // `ErrLoggerNotSet` is an error that is returned when the `logger` property of the `TaskProcessor` // struct is not set. ErrLoggerNotSet = errors.New("logger not set") // `ErrInstrumentationClientNotSet` is an error that is returned when the `instrumentationClient` // property of the `TaskProcessor` struct is not set. ErrInstrumentationClientNotSet = errors.New("instrumentation client not set") // `ErrConcurrencyFactorNotSet` is an error that is returned when the `concurrencyFactor` // property of the `TaskProcessor` struct is not set. ErrConcurrencyFactorNotSet = errors.New("concurrently factor not set") // `ErrTaskNotSet` is an error that is returned when the `task` argument of the `EnqueueTask` method // is not set. ErrTaskNotSet = errors.New("task not set") // `ErrTaskHandlerNotSet` is an error that is returned when the `taskHandler` property of the // `TaskProcessor` struct is not set. ErrTaskHandlerNotSet = errors.New("task handler not set") )
Functions ¶
This section is empty.
Types ¶
type IProcessor ¶
type IProcessor interface { // `Start() error` is a method of the `TaskProcessor` struct that starts the worker // to process tasks from the Asynq task queue as well as the scheduler process. It takes a `TaskProcessorHandler` function as an // argument, which is a function that will be called for each task that is processed by the worker. The // `TaskProcessorHandler` function takes a `context.Context` and an `*asynq.Task` as arguments, and // returns an error. The `Start` method returns an error if there is a problem starting the worker. Start() error // `Validate()` is a method of the `TaskProcessor` struct that checks if all the required properties of // the struct have been set. It returns an error if any of the required properties are not set, // indicating that the `TaskProcessor` is not ready to start processing tasks. This method is typically // called before starting the worker to ensure that all the necessary dependencies are in place. Validate() error // `EnqueueTask` is a method of the `TaskProcessor` struct that is used to enqueue a task in the Asynq // task queue. It takes a `context.Context`, an `*asynq.Task`, and an optional variadic argument of // `asynq.Option` as arguments. The `ctx` argument is a context that can be used to cancel the task or // set a timeout. The `task` argument is a pointer to an instance of the `asynq.Task` struct, which // contains information about the task to be executed, such as the task type, payload, and priority. // The `opts` argument is a variadic argument of `asynq.Option`, which can be used to set additional // options for the task, such as a delay or a custom queue name. The method returns an error if there // is a problem enqueueing the task in the Asynq task queue. EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) }
IProcessor is an interface that defines the methods that must be implemented by a task processor
type Option ¶
type Option func(*TaskProcessor)
Option is a function that is used to set properties of the TaskProcessor struct
func WithInstrumentationClientOpt ¶
func WithInstrumentationClientOpt(instrumentationClient *instrumentation.Client) Option
func WithLoggerOpt ¶
func WithRedisAddressOpt ¶
func WithTaskHandlerOpt ¶
func WithTaskHandlerOpt(taskHandler taskhandler.ITaskHandler) Option
type ProcessingInterval ¶
type ProcessingInterval string
ProcessingInterval represents the various processing intervals for a recurring task
const ( EveryYear ProcessingInterval = "@yearly" EveryMonth ProcessingInterval = "@monthly" EveryWeek ProcessingInterval = "@weekly" EveryDayAtMidnight ProcessingInterval = "@midnight" EveryDay ProcessingInterval = "@daily" Every24Hours ProcessingInterval = "@every 24h" Every12Hours ProcessingInterval = "@every 12h" Every6Hours ProcessingInterval = "@every 6h" Every3Hours ProcessingInterval = "@every 3h" EveryHour ProcessingInterval = "@every 1h" Every30Minutes ProcessingInterval = "@every 30m" Every15Minutes ProcessingInterval = "@every 15m" Every10Minutes ProcessingInterval = "@every 10m" Every5Minutes ProcessingInterval = "@every 5m" Every3Minutes ProcessingInterval = "@every 3m" Every1Minutes ProcessingInterval = "@every 1m" Every30Seconds ProcessingInterval = "@every 30s" )
func (ProcessingInterval) String ¶
func (p ProcessingInterval) String() string
The `String()` method is a method defined on the `ProcessingInterval` type. It is used to convert a `ProcessingInterval` value to its corresponding string representation.
type TaskProcessor ¶
type TaskProcessor struct {
// contains filtered or unexported fields
}
func NewTaskProcessor ¶
func NewTaskProcessor(opts ...Option) (*TaskProcessor, error)
NewTaskProcessor creates a new TaskProcessor instance ```go tp, err := NewTaskProcessor(
WithInstrumentationClient(ic), WithLogger(logger), WithRedisConnectionAddress("localhost:6379"), WithConcurrencyFactor(10),
)
if err != nil { log.Fatal(err) } defer tp.Close() // start the worker asynchronously go func() { if err := tp.Start(handler); err != nil { log.Fatal(err) }
```
func (*TaskProcessor) Close ¶
func (tp *TaskProcessor) Close() error
Close closes the task processor
```go
tp, err := NewTaskProcessor(...opts) if err != nil { return err } defer tp.Close() // start the worker asynchronously in another go routine go func(fn TaskProcessorHandler) { tp.Start(fn) }(fn)
```
func (*TaskProcessor) EnqueueRecurringTask ¶
func (tp *TaskProcessor) EnqueueRecurringTask(ctx context.Context, task *asynq.Task, interval ProcessingInterval, opts ...asynq.Option) (*string, error)
The `EnqueueRecurringTask` function is used to enqueue a recurring task with a specified interval. It takes in the context, the task to be enqueued, the interval at which the task should be repeated, and optional options for the task. It returns a pointer to a string representing the entry ID of the recurring task and an error if any.
func (*TaskProcessor) EnqueueTask ¶
func (tp *TaskProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
EnqueueTask implements IProcessor.
func (*TaskProcessor) Start ¶
func (tp *TaskProcessor) Start() error
Start starts the task processor worker as well as the scheduler ```go
tp, err := NewTaskProcessor(...opts) if err != nil { return err } // start the worker asynchronously in another go routine go tp.Start()
```
func (*TaskProcessor) Validate ¶
func (tp *TaskProcessor) Validate() error
Validate validates the task processor