Documentation ¶
Index ¶
- Constants
- Variables
- func Discard(msg string) error
- func ExpectedError(code string, msg string) error
- func Halt(code string, msg string) error
- type BasicFetch
- type Context
- type Ctx
- type FailPayload
- type Fetcher
- type KnownError
- type Lease
- type Manager
- type MiddlewareChain
- type MiddlewareFunc
- type Reservation
Constants ¶
const ( // Jobs will be reserved for 30 minutes by default. // You can customize this per-job with the reserve_for attribute // in the job payload. DefaultTimeout = 30 * 60 // Save dead jobs for 180 days, after that they will be purged DeadTTL = 180 * 24 * time.Hour )
Variables ¶
var (
JobReservationExpired = &FailPayload{
ErrorType: "ReservationExpired",
ErrorMessage: "Faktory job reservation expired",
}
)
Functions ¶
func Discard ¶
Middleware can use this to restart the fetch process. Useful if the job fetched from Redis is invalid and should be discarded rather than returned to the worker.
func ExpectedError ¶
func Halt ¶
Returning a Halt error in a middleware will stop the middleware execution chain. The server will return the Halt to the client. You can use "ERR" for the code to signal an unexpected error or use a well-defined code for an error case which the client might be interested in, e.g. "NOTUNIQUE".
Types ¶
type BasicFetch ¶
type BasicFetch struct {
// contains filtered or unexported fields
}
type Ctx ¶
func (Ctx) Reservation ¶
func (c Ctx) Reservation() *Reservation
type FailPayload ¶
type Fetcher ¶
func BasicFetcher ¶
type KnownError ¶
A KnownError is one that returns a specific error code to the client such that it can be handled explicitly. For example, the unique job feature will return a NOTUNIQUE error when the client tries to push() a job that already exists in Faktory.
Unexpected errors will always use "ERR" as their code, for instance any malformed data, network errors, IO errors, etc. Clients are expected to raise an exception for any ERR response.
type Lease ¶
var ( // We can't pass a nil across the Fetcher interface boundary so we'll // use this sentinel value to mean nil. Nothing Lease = &simpleLease{} )
type Manager ¶
type Manager interface { Push(job *client.Job) error PauseQueue(qName string) error ResumeQueue(qName string) error RemoveQueue(qName string) error // Dispatch operations: // // - Basic dequeue // - Connection sends FETCH q1, q2 // - Job moved from Queue into Working // - Scheduled // - Job Pushed into Queue // - Job moved from Queue into Working // - Failure // - Job Pushed into Retries // - Push // - Job Pushed into Queue // - Ack // - Job removed from Working // // How are jobs passed to waiting workers? // // Socket sends "FETCH q1, q2, q3" // Connection pops each queue: // store.GetQueue("q1").Pop() // and returns if it gets any non-nil data. // // If all nil, the connection registers itself, blocking for a job. Fetch(ctx context.Context, wid string, queues ...string) (*client.Job, error) Acknowledge(jid string) (*client.Job, error) Fail(fail *FailPayload) error // Allows arbitrary extension of a job's current reservation // This is a no-op if you set the time before the current // reservation expiry. ExtendReservation(jid string, until time.Time) error WorkingCount() int ReapExpiredJobs(when time.Time) (int64, error) // Purge deletes all dead jobs Purge(when time.Time) (int64, error) // EnqueueScheduledJobs enqueues scheduled jobs EnqueueScheduledJobs(when time.Time) (int64, error) // RetryJobs enqueues failed jobs RetryJobs(when time.Time) (int64, error) BusyCount(wid string) int AddMiddleware(fntype string, fn MiddlewareFunc) KV() storage.KV Redis() *redis.Client SetFetcher(f Fetcher) }
func NewManager ¶
type MiddlewareChain ¶
type MiddlewareChain []MiddlewareFunc
type MiddlewareFunc ¶
type Reservation ¶
type Reservation struct { Job *client.Job `json:"job"` Since string `json:"reserved_at"` Expiry string `json:"expires_at"` Wid string `json:"wid"` // contains filtered or unexported fields }
func (*Reservation) ExpiresAt ¶
func (res *Reservation) ExpiresAt() time.Time
func (*Reservation) ReservedAt ¶
func (res *Reservation) ReservedAt() time.Time