Documentation ¶
Index ¶
- Constants
- Variables
- func Discard(msg string) error
- func ExpectedError(code string, msg string) error
- func Halt(code string, msg string) error
- type BasicFetch
- type Context
- type Ctx
- type FailPayload
- type Fetcher
- type KnownError
- type Lease
- type Manager
- type MiddlewareChain
- type MiddlewareFunc
- type Reservation
Constants ¶
const ( // Jobs will be reserved for 30 minutes by default. // You can customize this per-job with the reserve_for attribute // in the job payload. DefaultTimeout = 30 * 60 // Save dead jobs for 180 days, after that they will be purged DeadTTL = 180 * 24 * time.Hour )
const (
MiddlewareHelperKey helperKey = "_mh"
)
Variables ¶
var (
JobReservationExpired = &FailPayload{
ErrorType: "ReservationExpired",
ErrorMessage: "Faktory job reservation expired",
}
)
Functions ¶
func Discard ¶
Middleware can use this to restart the fetch process. Useful if the job fetched from Redis is invalid and should be discarded rather than returned to the worker.
func ExpectedError ¶
func Halt ¶
Returning a Halt error in a middleware will stop the middleware execution chain. The server will return the Halt to the client. You can use "ERR" for the code to signal an unexpected error or use a well-defined code for an error case which the client might be interested in, e.g. "NOTUNIQUE".
Types ¶
type BasicFetch ¶
type BasicFetch struct {
// contains filtered or unexported fields
}
type Context ¶
type Context interface { Job() *client.Job Reservation() *Reservation Manager() Manager }
type Ctx ¶
type Ctx struct {
// contains filtered or unexported fields
}
func (Ctx) Reservation ¶
func (c Ctx) Reservation() *Reservation
type FailPayload ¶
type Fetcher ¶
func BasicFetcher ¶
func BasicFetcher(r *redis.Client) Fetcher
type KnownError ¶
A KnownError is one that returns a specific error code to the client such that it can be handled explicitly. For example, the unique job feature will return a NOTUNIQUE error when the client tries to push() a job that already exists in Faktory.
Unexpected errors will always use "ERR" as their code, for instance any malformed data, network errors, IO errors, etc. Clients are expected to raise an exception for any ERR response.
type Lease ¶
var ( // We can't pass a nil across the Fetcher interface boundary so we'll // use this sentinel value to mean nil. Nothing Lease = &simpleLease{} )
type Manager ¶
type Manager interface { Push(ctx context.Context, job *client.Job) error PauseQueue(ctx context.Context, qName string) error ResumeQueue(ctx context.Context, qName string) error RemoveQueue(ctx context.Context, qName string) error // Dispatch operations: // // - Basic dequeue // - Connection sends FETCH q1, q2 // - Job moved from Queue into Working // - Scheduled // - Job Pushed into Queue // - Job moved from Queue into Working // - Failure // - Job Pushed into Retries // - Push // - Job Pushed into Queue // - Ack // - Job removed from Working // // How are jobs passed to waiting workers? // // Socket sends "FETCH q1, q2, q3" // Connection pops each queue: // store.GetQueue("q1").Pop() // and returns if it gets any non-nil data. // // If all nil, the connection registers itself, blocking for a job. Fetch(ctx context.Context, wid string, queues ...string) (*client.Job, error) Acknowledge(ctx context.Context, jid string) (*client.Job, error) Fail(ctx context.Context, fail *FailPayload) error // Allows arbitrary extension of a job's current reservation // This is a no-op if you set the time before the current // reservation expiry. ExtendReservation(ctx context.Context, jid string, until time.Time) error WorkingCount() int ReapExpiredJobs(ctx context.Context, when time.Time) (int64, error) // Purge deletes all dead jobs Purge(ctx context.Context, when time.Time) (int64, error) // EnqueueScheduledJobs enqueues scheduled jobs EnqueueScheduledJobs(ctx context.Context, when time.Time) (int64, error) // RetryJobs enqueues failed jobs RetryJobs(ctx context.Context, when time.Time) (int64, error) BusyCount(wid string) int AddMiddleware(fntype string, fn MiddlewareFunc) KV() storage.KV Redis() *redis.Client SetFetcher(f Fetcher) }
func NewManager ¶
type MiddlewareChain ¶
type MiddlewareChain []MiddlewareFunc
type Reservation ¶
type Reservation struct { Job *client.Job `json:"job"` Since string `json:"reserved_at"` Expiry string `json:"expires_at"` Wid string `json:"wid"` // contains filtered or unexported fields }
func (*Reservation) ExpiresAt ¶
func (res *Reservation) ExpiresAt() time.Time
func (*Reservation) ReservedAt ¶
func (res *Reservation) ReservedAt() time.Time