taskprocessor

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// `ErrClientNotSet` is an error that is returned when the `client` property of the `TaskProcessor`
	// struct is not set.
	ErrClientNotSet = errors.New("client not set")
	// `ErrWorkerNotSet` is an error that is returned when the `worker` property of the `TaskProcessor`
	// struct is not set.
	ErrWorkerNotSet = errors.New("worker not set")
	// `ErrLoggerNotSet` is an error that is returned when the `logger` property of the `TaskProcessor`
	// struct is not set.
	ErrLoggerNotSet = errors.New("logger not set")
	// `ErrInstrumentationClientNotSet` is an error that is returned when the `instrumentationClient`
	// property of the `TaskProcessor` struct is not set.
	ErrInstrumentationClientNotSet = errors.New("instrumentation client not set")
	// `ErrConcurrencyFactorNotSet` is an error that is returned when the `concurrencyFactor`
	// property of the `TaskProcessor` struct is not set.
	ErrConcurrencyFactorNotSet = errors.New("concurrently factor not set")
	// `ErrTaskNotSet` is an error that is returned when the `task` argument of the `EnqueueTask` method
	// is not set.
	ErrTaskNotSet = errors.New("task not set")
	// `ErrTaskHandlerNotSet` is an error that is returned when the `taskHandler` property of the
	// `TaskProcessor` struct is not set.
	ErrTaskHandlerNotSet = errors.New("task handler not set")
)

Functions

This section is empty.

Types

type IProcessor

type IProcessor interface {
	// `Start() error` is a method of the `TaskProcessor` struct that starts the worker
	// to process tasks from the Asynq task queue as well as the scheduler process. It takes a `TaskProcessorHandler` function as an
	// argument, which is a function that will be called for each task that is processed by the worker. The
	// `TaskProcessorHandler` function takes a `context.Context` and an `*asynq.Task` as arguments, and
	// returns an error. The `Start` method returns an error if there is a problem starting the worker.
	Start() error
	// `Validate()` is a method of the `TaskProcessor` struct that checks if all the required properties of
	// the struct have been set. It returns an error if any of the required properties are not set,
	// indicating that the `TaskProcessor` is not ready to start processing tasks. This method is typically
	// called before starting the worker to ensure that all the necessary dependencies are in place.
	Validate() error
	// `EnqueueTask` is a method of the `TaskProcessor` struct that is used to enqueue a task in the Asynq
	// task queue. It takes a `context.Context`, an `*asynq.Task`, and an optional variadic argument of
	// `asynq.Option` as arguments. The `ctx` argument is a context that can be used to cancel the task or
	// set a timeout. The `task` argument is a pointer to an instance of the `asynq.Task` struct, which
	// contains information about the task to be executed, such as the task type, payload, and priority.
	// The `opts` argument is a variadic argument of `asynq.Option`, which can be used to set additional
	// options for the task, such as a delay or a custom queue name. The method returns an error if there
	// is a problem enqueueing the task in the Asynq task queue.
	EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
}

IProcessor is an interface that defines the methods that must be implemented by a task processor

type Option

type Option func(*TaskProcessor)

Option is a function that is used to set properties of the TaskProcessor struct

func WithConcurrencyFactorOpt

func WithConcurrencyFactorOpt(concurrencyFactor *int) Option

func WithInstrumentationClientOpt

func WithInstrumentationClientOpt(instrumentationClient *instrumentation.Client) Option

func WithLoggerOpt

func WithLoggerOpt(logger *zap.Logger) Option

func WithRedisAddressOpt

func WithRedisAddressOpt(redisAddress string) Option

func WithTaskHandlerOpt

func WithTaskHandlerOpt(taskHandler taskhandler.ITaskHandler) Option

type ProcessingInterval

type ProcessingInterval string

ProcessingInterval represents the various processing intervals for a recurring task

const (
	EveryYear          ProcessingInterval = "@yearly"
	EveryMonth         ProcessingInterval = "@monthly"
	EveryWeek          ProcessingInterval = "@weekly"
	EveryDayAtMidnight ProcessingInterval = "@midnight"
	EveryDay           ProcessingInterval = "@daily"
	Every24Hours       ProcessingInterval = "@every 24h"
	Every12Hours       ProcessingInterval = "@every 12h"
	Every6Hours        ProcessingInterval = "@every 6h"
	Every3Hours        ProcessingInterval = "@every 3h"
	EveryHour          ProcessingInterval = "@every 1h"
	Every30Minutes     ProcessingInterval = "@every 30m"
	Every15Minutes     ProcessingInterval = "@every 15m"
	Every10Minutes     ProcessingInterval = "@every 10m"
	Every5Minutes      ProcessingInterval = "@every 5m"
	Every3Minutes      ProcessingInterval = "@every 3m"
	Every1Minutes      ProcessingInterval = "@every 1m"
	Every30Seconds     ProcessingInterval = "@every 30s"
)

func (ProcessingInterval) String

func (p ProcessingInterval) String() string

The `String()` method is a method defined on the `ProcessingInterval` type. It is used to convert a `ProcessingInterval` value to its corresponding string representation.

type TaskProcessor

type TaskProcessor struct {
	// contains filtered or unexported fields
}

func NewTaskProcessor

func NewTaskProcessor(opts ...Option) (*TaskProcessor, error)

NewTaskProcessor creates a new TaskProcessor instance ```go tp, err := NewTaskProcessor(

WithInstrumentationClient(ic),
WithLogger(logger),
WithRedisConnectionAddress("localhost:6379"),
WithConcurrencyFactor(10),

)

if err != nil {
	log.Fatal(err)
}

defer tp.Close()
// start the worker asynchronously
go func() {
	if err := tp.Start(handler); err != nil {
		log.Fatal(err)
	}

```

func (*TaskProcessor) Close

func (tp *TaskProcessor) Close() error

Close closes the task processor

```go

		tp, err := NewTaskProcessor(...opts)
		if err != nil {
			return err
		}

		defer tp.Close()
     	// start the worker asynchronously in another go routine
	 	go func(fn TaskProcessorHandler) {
			tp.Start(fn)
		}(fn)

```

func (*TaskProcessor) EnqueueRecurringTask

func (tp *TaskProcessor) EnqueueRecurringTask(ctx context.Context, task *asynq.Task, interval ProcessingInterval, opts ...asynq.Option) (*string, error)

The `EnqueueRecurringTask` function is used to enqueue a recurring task with a specified interval. It takes in the context, the task to be enqueued, the interval at which the task should be repeated, and optional options for the task. It returns a pointer to a string representing the entry ID of the recurring task and an error if any.

func (*TaskProcessor) EnqueueTask

func (tp *TaskProcessor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)

EnqueueTask implements IProcessor.

func (*TaskProcessor) Start

func (tp *TaskProcessor) Start() error

Start starts the task processor worker as well as the scheduler ```go

		tp, err := NewTaskProcessor(...opts)
		if err != nil {
			return err
		}

     	// start the worker asynchronously in another go routine
	 	go tp.Start()

```

func (*TaskProcessor) Validate

func (tp *TaskProcessor) Validate() error

Validate validates the task processor

type TaskProcessorHandler

type TaskProcessorHandler func(context.Context, *asynq.Task) error

`TaskProcessorHandler` is a type alias for a function that processes a task. This function is used by the `TaskProcessor` to process tasks from the Asynq task queue.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL