Documentation
¶
Index ¶
- Constants
- Variables
- func DeadJobManagementHandler(mgr DeadJobManager) http.Handler
- type BackoffFunc
- type BackoffStrategy
- type ConstBackoff
- type DeadJobManager
- type ExponentialBackoff
- type Job
- type JobExecutorFunc
- type JobFunc
- type JobHandler
- type JobOptions
- type JobProcessor
- type JobSpec
- type JobStatus
- type JobTypeStats
- type LinearBackoff
- type LinearModBackoff
- type Option
- type RetryableError
- type Worker
Constants ¶
const ( StatusUnknown = "" StatusDone = "done" StatusDead = "dead" )
Variables ¶
var ( DefaultMaxAttempts = 3 DefaultTimeout = 5 * time.Second DefaultBackoffStrategy BackoffStrategy = DefaultExponentialBackoff )
Default run options
var ( ErrTypeExists = errors.New("handler for given job type exists") ErrUnknownType = errors.New("job type is invalid") ErrJobExists = errors.New("job with id exists") ErrNoJob = errors.New("no job found") )
var DefaultExponentialBackoff = &ExponentialBackoff{ Multiplier: 1.6, InitialDelay: 1 * time.Second, MaxDelay: 900 * time.Second, Jitter: 0.2, }
var ErrInvalidJob = errors.New("job is not valid")
var ErrInvalidJobHandler = errors.New("job handler is not valid")
Functions ¶
func DeadJobManagementHandler ¶
func DeadJobManagementHandler(mgr DeadJobManager) http.Handler
DeadJobManagementHandler returns a http handler with endpoints for dead job management:
- /dead-jobs: JSON/HTML response with content negotiation. Response is paginated.
- /resurrect-jobs: Move specified dead jobs to jobs_queue table.
- /clear-jobs: Remove specified dead jobs from dead_jobs table.
Types ¶
type BackoffFunc ¶
BackoffFunc is a adapter to use ordinary function as retry BackoffStrategy
type BackoffStrategy ¶
type BackoffStrategy interface { // Backoff returns how much duration to wait for a given attempt Backoff(attempt int) time.Duration }
BackoffStrategy defines the different kind of Backoff strategy for retry. eg. Linear, Const, Exponential ..etc
type ConstBackoff ¶
type DeadJobManager ¶
type ExponentialBackoff ¶
type ExponentialBackoff struct { // InitialDelay is multiplied by Multiplier after each attempt Multiplier float64 // InitialDelay is the initial duration for retrial, i.e. backoff duration for 1st retry attempt InitialDelay time.Duration // Backoff duration will be capped at MaxDelay. MaxDelay time.Duration // Amount of jitter applied after each iteration. // This can be used to randomize duration after each attempt Jitter float64 }
ExponentialBackoff implements exponential backoff. It is capped at MaxDelay.
type Job ¶
type Job struct { // Specification of the job. ID ulid.ULID `json:"id"` JobSpec // Internal metadata. CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` // Execution information. AttemptsDone int `json:"attempts_done"` Status JobStatus `json:"-"` LastAttemptAt time.Time `json:"last_attempt_at,omitempty"` LastError string `json:"last_error,omitempty"` }
Job represents the specification for async processing and also maintains the progress so far.
type JobExecutorFunc ¶
JobExecutorFunc is invoked by JobProcessor for ready jobs. It is responsible for handling a ready job and returning the updated job execution result after the attempt.
type JobFunc ¶
JobFunc is invoked by the Worker when a job is ready. If it returns RetryableError, the Worker may retry the job execution with an appropriate backoff. If it returns any other error or if it panics, the job will be marked as a dead job.
type JobHandler ¶
type JobHandler struct { Handle JobFunc JobOpts JobOptions }
JobHandler is used to execute a job by the Worker when ready. The Handle function executes the given job with additional control via JobOpts.
func (*JobHandler) Sanitize ¶
func (j *JobHandler) Sanitize() error
Sanitize sanitizes the job handler and sets defaults for unspecified job options. Returns ErrInvalidJobHandler if the Handle function is not set.
type JobOptions ¶
type JobOptions struct { MaxAttempts int Timeout time.Duration BackoffStrategy }
JobOptions control the retry strategy and the job execution timeout.
type JobProcessor ¶
type JobProcessor interface { // Enqueue all jobs. Enqueue must ensure all-or-nothing behavior. // Jobs with zero-value or historical value for ReadyAt must be executed // immediately. Enqueue(ctx context.Context, jobs ...Job) error // Process dequeues one job from the data store and invokes `fn`. The job // should be 'locked' until `fn` returns. Refer JobExecutorFunc. // Process is also responsible for clearing the job or marking the job as // dead or setting up the retry for the job depending on the job result. Process(ctx context.Context, types []string, fn JobExecutorFunc) error // Stats returns the job statistics by job type. It includes the number of // active and dead jobs. Stats(ctx context.Context) ([]JobTypeStats, error) }
JobProcessor represents a special job store or queue that holds jobs and processes them via Process() only after the jobs are ready.
type JobSpec ¶
type JobSpec struct { Type string `json:"type"` Payload []byte `json:"args"` RunAt time.Time `json:"run_at"` }
JobSpec is the specification for async processing.
type JobTypeStats ¶
type JobTypeStats struct { Type string `json:"type"` Active int `json:"active"` Dead int `json:"dead"` }
JobTypeStats is the statistics for the job type with number of active and dead jobs.
type LinearBackoff ¶
type LinearBackoff struct { // backoff will start at InitialDelay, i.e. backoff duration for 1st retry attempt InitialDelay time.Duration // Backoff duration will be capped at MaxDelay duration. MaxDelay time.Duration }
LinearBackoff will backoff linearly. It will start at InitialDelay, capping at MaxDelay.
type LinearModBackoff ¶
LinearModBackoff will backoff linearly. It will start at InitialDelay. Backoff duration will oscillate linearly between [0,MaxDelay] forming a sawtooth pattern. refer https://www.researchgate.net/figure/Types-of-back-off-algorithms-constant-linear-linear-modulus-exponential-and_fig1_224440820
type Option ¶
func WithActivePollPercent ¶
func WithJobHandler ¶
func WithJobHandler(typ string, h JobHandler) Option
func WithLogger ¶
type RetryableError ¶
type RetryableError struct {
Cause error
}
RetryableError can be returned by a JobFunc to instruct the worker to attempt retry. Returning a retryable error does not guarantee that the job will be retried since the job would have a limit on number of retries.
func (*RetryableError) Error ¶
func (re *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (re *RetryableError) Unwrap() error
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker provides asynchronous job processing using a job processor.
func New ¶
func New(processor JobProcessor, opts ...Option) (*Worker, error)
New returns an instance of Worker initialized with defaults. By default, the Worker uses a noop logger with run config of 1 worker, 1s poll interval and 0 jitter.