jobs

package
v0.0.0-...-978c921 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2019 License: MIT, CC-BY-4.0, MIT, + 1 more Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxWorkDuration = time.Minute

	EmailQueue = "emails"
)

Variables

View Source
var (
	ErrInvalidJobType        = fmt.Errorf("invalid job type")
	ErrJobCancelled          = fmt.Errorf("job cancelled")
	ErrJobCanceled           = ErrJobCancelled
	ErrJobNotFound           = fmt.Errorf("job not found")
	ErrJobNotClaimed         = fmt.Errorf("job not claimed")
	ErrJobQueueAlreadyExists = fmt.Errorf("job queue already exists")
	ErrJobQueueNotFound      = fmt.Errorf("job queue not found")
)
View Source
var (
	BackoffDuration = 5 * time.Second

	EmailJobType    = JobType("email")
	EmailJobOptions = []JobOption{
		JobOptions.MaxAttempts(3),
		JobOptions.MaxWorkDuration(30 * time.Second),
	}
)

Functions

This section is empty.

Types

type EmailJob

type EmailJob struct {
	AccountID snowflake.Snowflake
	EmailID   string
}

type Job

type Job struct {
	ID                snowflake.Snowflake
	Type              JobType
	Data              json.RawMessage
	Created           time.Time
	Due               time.Time
	MaxWorkDuration   time.Duration
	AttemptsMade      int32
	AttemptsRemaining int32

	*JobClaim
}

func Claim

func Claim(ctx scope.Context, jq JobQueue, handlerID string, pollTime time.Duration, stealChance float64) (*Job, error)

func (*Job) Encode

func (j *Job) Encode() ([]byte, error)

func (*Job) Exec

func (j *Job) Exec(ctx scope.Context, f func(scope.Context) error) error

func (*Job) Payload

func (j *Job) Payload() (interface{}, error)

type JobClaim

type JobClaim struct {
	bytes.Buffer
	JobID         snowflake.Snowflake
	HandlerID     string
	AttemptNumber int32
	Queue         JobQueue
}

func (*JobClaim) Complete

func (jc *JobClaim) Complete(ctx scope.Context) error

func (*JobClaim) Fail

func (jc *JobClaim) Fail(ctx scope.Context, reason string) error

type JobDue

type JobDue time.Time

func (JobDue) Apply

func (t JobDue) Apply(job *Job) error

type JobLog

type JobLog struct {
	AttemptNumber int32
	HandlerID     string
	Success       bool
	FailureReason string
	Log           []byte
}

type JobMaxAttempts

type JobMaxAttempts int32

func (JobMaxAttempts) Apply

func (a JobMaxAttempts) Apply(job *Job) error

type JobMaxWorkDuration

type JobMaxWorkDuration time.Duration

func (JobMaxWorkDuration) Apply

func (d JobMaxWorkDuration) Apply(job *Job) error

type JobOption

type JobOption interface {
	Apply(*Job) error
}

type JobOptionConstructor

type JobOptionConstructor struct{}
var JobOptions JobOptionConstructor

func (JobOptionConstructor) Due

func (JobOptionConstructor) MaxAttempts

func (JobOptionConstructor) MaxAttempts(n int32) JobMaxAttempts

func (JobOptionConstructor) MaxWorkDuration

type JobQueue

type JobQueue interface {
	// Name returns the name of the queue.
	Name() string

	// Add enqueues a new job, as defined by the given type/payload.
	// If any callers waiting in WaitForJob (not just in the local
	// process), at least one should be woken.
	Add(ctx scope.Context, jobType JobType, payload interface{}, options ...JobOption) (
		snowflake.Snowflake, error)

	// AddAndClaim enqueues a new job, atomically marking it as claimed by the
	// caller. Returns the added and claimed job.
	AddAndClaim(
		ctx scope.Context, jobType JobType, payload interface{}, handlerID string, options ...JobOption) (*Job, error)

	// WaitForJob blocks until notification of a new claimable job
	// in the queue. This does not guarantee that a job will be
	// immediately claimable.
	WaitForJob(ctx scope.Context) error

	// TryClaim tries to acquire a currently unclaimed job. If none is
	// available, returns ErrJobNotFound.
	TryClaim(ctx scope.Context, handlerID string) (*Job, error)

	// TrySteal attempts to preempt another handler's claim. Only jobs
	// that have been claimed longer than their MaxWorkDuration setting
	// can be stolen. Only jobs claimed by a different handlerID can
	// be stolen.
	//
	// If no job can be immediately stolen, returns ErrJobNotFound.
	//
	// Stolen jobs are at risk of being completed twice. It's important
	// for handlers to set a completion timeout and self-cancel well
	// within the job's MaxWorkDuration.
	TrySteal(ctx scope.Context, handlerID string) (*Job, error)

	// Cancel removes a job from the queue. If it is currently claimed, then it
	// may still be completed by the handler that claimed it, but no future call
	// to Claim or Steal will return this job.
	Cancel(ctx scope.Context, jobID snowflake.Snowflake) error

	// Complete marks a job as completed. If the job has been stolen
	// by another handler, the queueing service should attempt to
	// cancel the other handler's work in progress, but this cannot
	// be guaranteed.
	Complete(ctx scope.Context, jobID snowflake.Snowflake, handlerID string, attemptNumber int32, log []byte) error

	// Fail marks a job as failed and releases the claim on it.
	// If the job has not been stolen and still has attempts
	// remaining, it will return to the queue and be immediately
	// up for claim again.
	Fail(ctx scope.Context, jobID snowflake.Snowflake, handlerID string, attemptNumber int32, reason string, log []byte) error

	// Stats returns information about the number of jobs in the queue.
	Stats(ctx scope.Context) (JobQueueStats, error)

	// Log returns the output of a given job attempt.
	Log(ctx scope.Context, jobID snowflake.Snowflake, attemptNumber int32) (*JobLog, error)
}

type JobQueueStats

type JobQueueStats struct {
	Waiting int64 // number of jobs waiting to be claimed
	Due     int64 // number of jobs that are due (whether claimed or waiting)
	Claimed int64 // number of jobs currently claimed
}

type JobService

type JobService interface {
	GetQueue(ctx scope.Context, name string) (JobQueue, error)
}

type JobType

type JobType string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL