worker

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StatusDone indicates the Job is successfully finished.
	StatusDone = "DONE"

	// StatusPanic indicates there was a panic during job-execution.
	// This is a terminal status and will NOT be retried.
	StatusPanic = "PANIC"

	// StatusFailed indicates job failed to succeed even after retries.
	// This is a terminal status and will NOT be retried.
	StatusFailed = "FAILED"

	// StatusPending indicates at-least 1 attempt is still pending.
	StatusPending = "PENDING"
)

Variables

View Source
var (
	ErrJobExists   = errors.New("job with id exists")
	ErrInvalidJob  = errors.New("job is not valid")
	ErrKindExists  = errors.New("handler for given kind exists")
	ErrUnknownKind = errors.New("job kind is invalid")
)

Functions

This section is empty.

Types

type DequeueFn

type DequeueFn func(ctx context.Context, j Job) (*Job, error)

DequeueFn is invoked by the JobQueue for ready jobs. It is responsible for handling a ready job and returning the updated version after the attempt.

type Job

type Job struct {
	// Specification of the job.
	ID      string    `json:"id"`
	Kind    string    `json:"kind"`
	RunAt   time.Time `json:"run_at"`
	Payload []byte    `json:"payload"`

	// Internal metadata.
	Status    string    `json:"status"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`

	// Execution information.
	Result        []byte    `json:"result,omitempty"`
	AttemptsDone  int64     `json:"attempts_done"`
	LastAttemptAt time.Time `json:"last_attempt_at,omitempty"`
	LastError     string    `json:"last_error,omitempty"`
}

Job represents the specification for async processing and also maintains the progress so far.

func (*Job) Attempt

func (j *Job) Attempt(ctx context.Context, now time.Time, fn JobFn)

Attempt attempts to safely invoke `fn` for this job. Handles success, failure and panic scenarios and updates the job with result in-place.

func (*Job) Sanitise

func (j *Job) Sanitise() error

type JobFn

type JobFn func(ctx context.Context, job Job) ([]byte, error)

JobFn is invoked by the Worker for ready jobs. If it returns no error, job will be marked with StatusDone. If it returns RetryableError, the job will remain in StatusPending and will be enqueued for retry. If it returns any other error, job will be marked as StatusFailed. In case if a panic occurs, job will be marked as StatusPanic.

type JobQueue

type JobQueue interface {
	// Enqueue all jobs. Enqueue must ensure all-or-nothing behaviour.
	// Jobs with zero-value or historical value for ReadyAt must be
	// executed immediately.
	Enqueue(ctx context.Context, jobs ...Job) error

	// Dequeue one job having one of the given kinds and invoke `fn`.
	// The job should be 'locked' until `fn` returns. Refer DequeueFn.
	Dequeue(ctx context.Context, kinds []string, fn DequeueFn) error
}

JobQueue represents a special queue that holds jobs and releases them via Dequeue() only after their RunAt time.

type Option

type Option func(w *Worker) error

func WithJobKind

func WithJobKind(kind string, fn JobFn) Option

func WithLogger

func WithLogger(l *zap.Logger) Option

func WithRunConfig

func WithRunConfig(workers int, pollInterval time.Duration) Option

type RetryableError

type RetryableError struct {
	Cause      error
	RetryAfter time.Duration
}

RetryableError can be returned by a JobFn to instruct the worker to attempt retry after time specified by the RetryAfter field. RetryAfter can have min of 5 seconds.

func (*RetryableError) Error

func (re *RetryableError) Error() string

func (RetryableError) WithCause

func (re RetryableError) WithCause(err error) *RetryableError

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker provides asynchronous job processing using a job-queue.

func New

func New(queue JobQueue, opts ...Option) (*Worker, error)

func (*Worker) Enqueue

func (w *Worker) Enqueue(ctx context.Context, jobs ...Job) error

Enqueue enqueues all jobs for processing.

func (*Worker) Register

func (w *Worker) Register(kind string, h JobFn) error

Register registers a job-kind and the function that should be invoked for handling it. Returns ErrKindExists if the kind already registered.

func (*Worker) Run

func (w *Worker) Run(baseCtx context.Context) error

Run starts the worker threads that dequeue and process ready jobs. Run blocks until all workers exit or context is cancelled. Context cancellation will do graceful shutdown of the worker threads.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL