worker

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusUnknown = ""
	StatusDone    = "done"
	StatusDead    = "dead"
)

Variables

View Source
var (
	DefaultMaxAttempts                     = 3
	DefaultTimeout                         = 5 * time.Second
	DefaultBackoffStrategy BackoffStrategy = DefaultExponentialBackoff
)

Default run options

View Source
var (
	ErrTypeExists  = errors.New("handler for given job type exists")
	ErrUnknownType = errors.New("job type is invalid")
	ErrJobExists   = errors.New("job with id exists")
	ErrNoJob       = errors.New("no job found")
)
View Source
var DefaultExponentialBackoff = &ExponentialBackoff{
	Multiplier:   1.6,
	InitialDelay: 1 * time.Second,
	MaxDelay:     900 * time.Second,
	Jitter:       0.2,
}
View Source
var ErrInvalidJob = errors.New("job is not valid")
View Source
var ErrInvalidJobHandler = errors.New("job handler is not valid")

Functions

func DeadJobManagementHandler

func DeadJobManagementHandler(mgr DeadJobManager) http.Handler

DeadJobManagementHandler returns a http handler with endpoints for dead job management:

  • /dead-jobs: JSON/HTML response with content negotiation. Response is paginated.
  • /resurrect-jobs: Move specified dead jobs to jobs_queue table.
  • /clear-jobs: Remove specified dead jobs from dead_jobs table.

Types

type BackoffFunc

type BackoffFunc func(attempt int) time.Duration

BackoffFunc is a adapter to use ordinary function as retry BackoffStrategy

func (BackoffFunc) Backoff

func (s BackoffFunc) Backoff(attempt int) time.Duration

type BackoffStrategy

type BackoffStrategy interface {
	// Backoff returns how much duration to wait for a given attempt
	Backoff(attempt int) time.Duration
}

BackoffStrategy defines the different kind of Backoff strategy for retry. eg. Linear, Const, Exponential ..etc

type ConstBackoff

type ConstBackoff struct {
	// Delay is the time duration to wait before each retry attempt
	Delay time.Duration
}

func (ConstBackoff) Backoff

func (c ConstBackoff) Backoff(int) time.Duration

type DeadJobManager

type DeadJobManager interface {
	DeadJobs(ctx context.Context, size, offset int) ([]Job, error)
	Resurrect(ctx context.Context, jobIDs []string) error
	ClearDeadJobs(ctx context.Context, jobIDs []string) error
}

type ExponentialBackoff

type ExponentialBackoff struct {
	// InitialDelay is multiplied by Multiplier after each attempt
	Multiplier float64
	// InitialDelay is the initial duration for retrial, i.e. backoff duration for 1st retry attempt
	InitialDelay time.Duration
	// Backoff duration will be capped at MaxDelay.
	MaxDelay time.Duration
	// Amount of jitter applied after each iteration.
	// This can be used to randomize duration after each attempt
	Jitter float64
}

ExponentialBackoff implements exponential backoff. It is capped at MaxDelay.

func (*ExponentialBackoff) Backoff

func (b *ExponentialBackoff) Backoff(attempt int) time.Duration

type Job

type Job struct {
	// Specification of the job.
	ID ulid.ULID `json:"id"`
	JobSpec

	// Internal metadata.
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`

	// Execution information.
	AttemptsDone  int       `json:"attempts_done"`
	Status        JobStatus `json:"-"`
	LastAttemptAt time.Time `json:"last_attempt_at,omitempty"`
	LastError     string    `json:"last_error,omitempty"`
}

Job represents the specification for async processing and also maintains the progress so far.

func NewJob

func NewJob(j JobSpec) (Job, error)

NewJob sanitizes the given JobSpec and returns a new instance of Job created with the given job. Returns ErrInvalidJob if the job type is empty.

func (*Job) Attempt

func (j *Job) Attempt(baseCtx context.Context, now time.Time, h JobHandler)

Attempt attempts to safely invoke the handler for this job. Handles success, failure and panic scenarios and updates the job with result in-place.

type JobExecutorFunc

type JobExecutorFunc func(context.Context, Job) Job

JobExecutorFunc is invoked by JobProcessor for ready jobs. It is responsible for handling a ready job and returning the updated job execution result after the attempt.

type JobFunc

type JobFunc func(context.Context, JobSpec) error

JobFunc is invoked by the Worker when a job is ready. If it returns RetryableError, the Worker may retry the job execution with an appropriate backoff. If it returns any other error or if it panics, the job will be marked as a dead job.

type JobHandler

type JobHandler struct {
	Handle  JobFunc
	JobOpts JobOptions
}

JobHandler is used to execute a job by the Worker when ready. The Handle function executes the given job with additional control via JobOpts.

func (*JobHandler) Sanitize

func (j *JobHandler) Sanitize() error

Sanitize sanitizes the job handler and sets defaults for unspecified job options. Returns ErrInvalidJobHandler if the Handle function is not set.

type JobOptions

type JobOptions struct {
	MaxAttempts int
	Timeout     time.Duration
	BackoffStrategy
}

JobOptions control the retry strategy and the job execution timeout.

type JobProcessor

type JobProcessor interface {
	// Enqueue all jobs. Enqueue must ensure all-or-nothing behavior.
	// Jobs with zero-value or historical value for ReadyAt must be executed
	// immediately.
	Enqueue(ctx context.Context, jobs ...Job) error

	// Process dequeues one job from the data store and invokes `fn`. The job
	// should be 'locked' until `fn` returns. Refer JobExecutorFunc.
	// Process is also responsible for clearing the job or marking the job as
	// dead or setting up the retry for the job depending on the job result.
	Process(ctx context.Context, types []string, fn JobExecutorFunc) error

	// Stats returns the job statistics by job type. It includes the number of
	// active and dead jobs.
	Stats(ctx context.Context) ([]JobTypeStats, error)
}

JobProcessor represents a special job store or queue that holds jobs and processes them via Process() only after the jobs are ready.

type JobSpec

type JobSpec struct {
	Type    string    `json:"type"`
	Payload []byte    `json:"args"`
	RunAt   time.Time `json:"run_at"`
}

JobSpec is the specification for async processing.

type JobStatus

type JobStatus string

type JobTypeStats

type JobTypeStats struct {
	Type   string `json:"type"`
	Active int    `json:"active"`
	Dead   int    `json:"dead"`
}

JobTypeStats is the statistics for the job type with number of active and dead jobs.

type LinearBackoff

type LinearBackoff struct {
	// backoff will start at InitialDelay, i.e. backoff duration for 1st retry attempt
	InitialDelay time.Duration
	// Backoff duration will be capped at MaxDelay duration.
	MaxDelay time.Duration
}

LinearBackoff will backoff linearly. It will start at InitialDelay, capping at MaxDelay.

func (LinearBackoff) Backoff

func (l LinearBackoff) Backoff(attempt int) time.Duration

type LinearModBackoff

type LinearModBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
}

LinearModBackoff will backoff linearly. It will start at InitialDelay. Backoff duration will oscillate linearly between [0,MaxDelay] forming a sawtooth pattern. refer https://www.researchgate.net/figure/Types-of-back-off-algorithms-constant-linear-linear-modulus-exponential-and_fig1_224440820

func (LinearModBackoff) Backoff

func (l LinearModBackoff) Backoff(attempt int) time.Duration

type Option

type Option func(w *Worker) error

func WithActivePollPercent

func WithActivePollPercent(pct float64) Option

func WithJobHandler

func WithJobHandler(typ string, h JobHandler) Option

func WithLogger

func WithLogger(l log.Logger) Option

func WithRunConfig

func WithRunConfig(workers int, pollInterval time.Duration) Option

type RetryableError

type RetryableError struct {
	Cause error
}

RetryableError can be returned by a JobFunc to instruct the worker to attempt retry. Returning a retryable error does not guarantee that the job will be retried since the job would have a limit on number of retries.

func (*RetryableError) Error

func (re *RetryableError) Error() string

func (*RetryableError) Unwrap

func (re *RetryableError) Unwrap() error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker provides asynchronous job processing using a job processor.

func New

func New(processor JobProcessor, opts ...Option) (*Worker, error)

New returns an instance of Worker initialized with defaults. By default, the Worker uses a noop logger with run config of 1 worker, 1s poll interval and 0 jitter.

func (*Worker) Enqueue

func (w *Worker) Enqueue(ctx context.Context, jobs ...JobSpec) error

Enqueue enqueues all jobs for processing.

func (*Worker) Register

func (w *Worker) Register(typ string, h JobHandler) error

Register registers a job type and the handler that should be invoked for processing it. Returns ErrTypeExists if the type is already registered.

func (*Worker) Run

func (w *Worker) Run(baseCtx context.Context) error

Run starts the worker threads that dequeue and process ready jobs. Run blocks until all workers exit or context is canceled. Context cancellation will do graceful shutdown of the worker threads.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL