manager

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2022 License: AGPL-3.0 Imports: 11 Imported by: 8

Documentation

Index

Constants

View Source
const (
	// Jobs will be reserved for 30 minutes by default.
	// You can customize this per-job with the reserve_for attribute
	// in the job payload.
	DefaultTimeout = 30 * 60

	// Save dead jobs for 180 days, after that they will be purged
	DeadTTL = 180 * 24 * time.Hour
)

Variables

View Source
var (
	JobReservationExpired = &FailPayload{
		ErrorType:    "ReservationExpired",
		ErrorMessage: "Faktory job reservation expired",
	}
)

Functions

func Discard

func Discard(msg string) error

Middleware can use this to restart the fetch process. Useful if the job fetched from Redis is invalid and should be discarded rather than returned to the worker.

func ExpectedError

func ExpectedError(code string, msg string) error

func Halt

func Halt(code string, msg string) error

Returning a Halt error in a middleware will stop the middleware execution chain. The server will return the Halt to the client. You can use "ERR" for the code to signal an unexpected error or use a well-defined code for an error case which the client might be interested in, e.g. "NOTUNIQUE".

Types

type BasicFetch

type BasicFetch struct {
	// contains filtered or unexported fields
}

func (*BasicFetch) Fetch

func (f *BasicFetch) Fetch(ctx context.Context, wid string, queues ...string) (Lease, error)

type Context

type Context interface {
	context.Context

	Job() *client.Job
	Manager() Manager
	Reservation() *Reservation
}

type Ctx

type Ctx struct {
	context.Context
	// contains filtered or unexported fields
}

func (Ctx) Job

func (c Ctx) Job() *client.Job

func (Ctx) Manager

func (c Ctx) Manager() Manager

func (Ctx) Reservation

func (c Ctx) Reservation() *Reservation

type FailPayload

type FailPayload struct {
	Jid          string   `json:"jid"`
	ErrorMessage string   `json:"message"`
	ErrorType    string   `json:"errtype"`
	Backtrace    []string `json:"backtrace"`
}

type Fetcher

type Fetcher interface {
	Fetch(ctx context.Context, wid string, queues ...string) (Lease, error)
}

func BasicFetcher

func BasicFetcher(r *redis.Client) Fetcher

type KnownError

type KnownError interface {
	error
	Code() string
}

A KnownError is one that returns a specific error code to the client such that it can be handled explicitly. For example, the unique job feature will return a NOTUNIQUE error when the client tries to push() a job that already exists in Faktory.

Unexpected errors will always use "ERR" as their code, for instance any malformed data, network errors, IO errors, etc. Clients are expected to raise an exception for any ERR response.

type Lease

type Lease interface {
	Release() error
	Payload() []byte
	Job() (*client.Job, error)
}
var (
	// We can't pass a nil across the Fetcher interface boundary so we'll
	// use this sentinel value to mean nil.
	Nothing Lease = &simpleLease{}
)

type Manager

type Manager interface {
	Push(job *client.Job) error

	PauseQueue(qName string) error
	ResumeQueue(qName string) error
	RemoveQueue(qName string) error

	// Dispatch operations:
	//
	//  - Basic dequeue
	//    - Connection sends FETCH q1, q2
	//	 - Job moved from Queue into Working
	//  - Scheduled
	//	 - Job Pushed into Queue
	//	 - Job moved from Queue into Working
	//  - Failure
	//    - Job Pushed into Retries
	//  - Push
	//    - Job Pushed into Queue
	//  - Ack
	//    - Job removed from Working
	//
	// How are jobs passed to waiting workers?
	//
	// Socket sends "FETCH q1, q2, q3"
	// Connection pops each queue:
	//   store.GetQueue("q1").Pop()
	// and returns if it gets any non-nil data.
	//
	// If all nil, the connection registers itself, blocking for a job.
	Fetch(ctx context.Context, wid string, queues ...string) (*client.Job, error)

	Acknowledge(jid string) (*client.Job, error)

	Fail(fail *FailPayload) error

	// Allows arbitrary extension of a job's current reservation
	// This is a no-op if you set the time before the current
	// reservation expiry.
	ExtendReservation(jid string, until time.Time) error

	WorkingCount() int

	ReapExpiredJobs(when time.Time) (int64, error)

	// Purge deletes all dead jobs
	Purge(when time.Time) (int64, error)

	// EnqueueScheduledJobs enqueues scheduled jobs
	EnqueueScheduledJobs(when time.Time) (int64, error)

	// RetryJobs enqueues failed jobs
	RetryJobs(when time.Time) (int64, error)

	BusyCount(wid string) int

	AddMiddleware(fntype string, fn MiddlewareFunc)

	KV() storage.KV
	Redis() *redis.Client
	SetFetcher(f Fetcher)
}

func NewManager

func NewManager(s storage.Store) Manager

type MiddlewareChain

type MiddlewareChain []MiddlewareFunc

type MiddlewareFunc

type MiddlewareFunc func(next func() error, ctx Context) error

type Reservation

type Reservation struct {
	Job    *client.Job `json:"job"`
	Since  string      `json:"reserved_at"`
	Expiry string      `json:"expires_at"`
	Wid    string      `json:"wid"`
	// contains filtered or unexported fields
}

func (*Reservation) ExpiresAt

func (res *Reservation) ExpiresAt() time.Time

func (*Reservation) ReservedAt

func (res *Reservation) ReservedAt() time.Time

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL