Documentation ¶
Index ¶
- Variables
- func DefaultRetryPolicy(t Task) time.Duration
- type Client
- func (c *Client) ClaimTask(ctx context.Context) (Task, error)
- func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error
- func (c *Client) DeleteTask(ctx context.Context, t Task) error
- func (c *Client) ReleaseTask(ctx context.Context, t Task) error
- func (c *Client) RetryTask(ctx context.Context, t Task, retryIn time.Duration) error
- func (c *Client) RunTransaction(ctx context.Context, fn func(tx *sqlx.Tx) error) error
- type ErrorHandler
- type Handler
- type Middleware
- type Processor
- type RetryPolicy
- type Task
- type TaskOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDuplicateTask indicates that an identical task already exists in the queue. // Tasks are considered identical if they have the same fingerprint. ErrDuplicateTask = errors.New("duplicate task") // ErrNoTasks indicates that no tasks are available for processing. ErrNoTasks = errors.New("no tasks available") // ErrSkipRetry indicates that the task should not be retried. ErrSkipRetry = errors.New("skip retry") // ErrTaskTimeout indicates that the task timeout has been exceeded. ErrTaskTimeout = errors.New("task timeout exceeded") )
Functions ¶
func DefaultRetryPolicy ¶
DefaultRetryPolicy uses an exponential base delay with jitter. Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a database-backed queue client.
func (*Client) ClaimTask ¶
ClaimTask claims a task for processing.
Returns ErrNoTasks if no tasks are available.
func (*Client) CreateTask ¶
CreateTask creates the given task.
Expected to run in an existing transaction. Returns ErrDuplicateTask if a task with the same fingerprint already exists.
func (*Client) DeleteTask ¶
DeleteTask deletes the given task.
func (*Client) ReleaseTask ¶
ReleaseTask releases the given task, allowing it to be claimed again.
type ErrorHandler ¶
ErrorHandler handles a task processing error.
type Handler ¶
Handler processes tasks of a specific type.
Should return nil if the processing of a task was successful. If an error is returned, the task will be retried after a delay, until max retries are reached.
Return an ErrSkipRetry error to skip any remaining retries and remove the task from the queue.
type Middleware ¶
Middleware wrap a handler in order to run logic before/after it.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor represents the queue processor.
func NewProcessor ¶
NewProcessor creates a new processor.
func (*Processor) Handle ¶
func (p *Processor) Handle(taskType string, h Handler, ms ...Middleware)
Handle registers the handler for a task type.
func (*Processor) OnError ¶
func (p *Processor) OnError(h ErrorHandler)
OnError registers the given error handler.
func (*Processor) RetryPolicy ¶
func (p *Processor) RetryPolicy(rp RetryPolicy)
RetryPolicy registers the given retry policy.
func (*Processor) Run ¶
Run starts the processor and blocks until a shutdown signal (SIGINT/SIGTERM) is received.
Once the shutdown signal is received, workers stop claiming new tasks. The tasks that are still being processed are then given until shutdownTimeout to complete, after which they are stopped (via canceled context).
func (*Processor) Use ¶
func (p *Processor) Use(m Middleware)
Use registers a middleware that runs for all task types.
type RetryPolicy ¶
RetryPolicy determines the retry delay for a given task.
type Task ¶
type Task struct { ID string `db:"id"` Fingerprint string `db:"fingerprint"` Type string `db:"type"` Payload []byte `db:"payload"` Retries uint8 `db:"retries"` MaxRetries uint8 `db:"max_retries"` TimeoutSeconds int32 `db:"timeout_seconds"` CreatedAt time.Time `db:"created_at"` ScheduledAt time.Time `db:"scheduled_at"` ClaimedAt *time.Time `db:"claimed_at"` }
Task represents a task.
type TaskOption ¶
type TaskOption func(t *Task)
TaskOption represents a task option.
func WithFingerprintData ¶
func WithFingerprintData(data []byte) TaskOption
WithFingerprintData provides the data used to fingerprint the task.
By default, the entire task payload is used.
func WithMaxRetries ¶
func WithMaxRetries(maxRetries uint8) TaskOption
WithMaxRetries allows the task to be retried the given number of times.
Defaults to 10. Use 0 to disallow retries.
func WithScheduledAt ¶
func WithScheduledAt(scheduledAt time.Time) TaskOption
WithScheduledAt schedules the task at the given time.
func WithScheduledIn ¶
func WithScheduledIn(scheduledIn time.Duration) TaskOption
WithScheduledIn schedules the task after the given duration.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) TaskOption
WithTimeout sets the task timeout.
Must be at least 1s. Defaults to 60s.