Documentation ¶
Index ¶
- func AddJobIndexes(catalog *coal.Catalog, removeAfter time.Duration)
- func Backoff(min, max time.Duration, factor float64, attempt int) time.Duration
- func Cancel(store *coal.Store, id coal.ID, reason string) error
- func Complete(store *coal.Store, id coal.ID, result coal.Map) error
- func Fail(store *coal.Store, id coal.ID, reason string, delay time.Duration) error
- type Blueprint
- type Context
- type Error
- type Job
- type Model
- type Queue
- func (q *Queue) Action(methods []string, cb func(ctx *fire.Context) Blueprint) *fire.Action
- func (q *Queue) Add(task *Task)
- func (q *Queue) Callback(matcher fire.Matcher, cb func(ctx *fire.Context) Blueprint) *fire.Callback
- func (q *Queue) Close()
- func (q *Queue) Enqueue(bp Blueprint) (*Job, error)
- func (q *Queue) Run()
- type Status
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddJobIndexes ¶
AddJobIndexes will add job indexes to the specified catalog. If a duration is specified, completed and cancelled jobs are automatically removed when their finished timestamp falls behind the specified duration.
func Backoff ¶ added in v0.21.0
Backoff is the simple backoff algorithm to calculate the delay of job.
func Cancel ¶ added in v0.21.0
Cancel will cancel the job with the specified id and the specified reason. Only jobs in the "dequeued" state can be cancelled.
Types ¶
type Blueprint ¶ added in v0.22.1
type Blueprint struct { // The task name. Name string // The job label. If given, the job will only be enqueued if no other job is // available with the same name and label. Label string // The job model. If given, data is overridden with the marshaled model. Model Model // The job data. Data coal.Map // The initial delay. If specified the job will not be dequeued until the // specified time has passed. Delay time.Duration // The job period. If given, and a label is present, the job will only // enqueued if no job has been finished in the specified duration. Period time.Duration }
Blueprint describes a job to enqueued.
type Context ¶ added in v0.20.0
type Context struct { // Context is the standard context that is cancelled when the timeout has // been exceeded. context.Context // Model is the model carried by the job. Model Model // Result can be set with a custom result. Result coal.Map // Task is the task that processes this job. // // Usage: Read Only Task *Task // Queue is the queue this job was dequeued from. // // Usage: Read Only Queue *Queue // Store is the store used by the queue. // // Usage: Read Only Store *coal.Store // The tracer used to trace code execution. // // Usage: Read Only Tracer *fire.Tracer }
Context holds and stores contextual data.
type Job ¶
type Job struct { coal.Base `json:"-" bson:",inline" coal:"jobs"` // The name of the job. Name string `json:"name"` // The custom job label. Label string `json:"label"` // The data that has been supplied on creation. Data coal.Map `json:"data"` // The current status of the job. Status Status `json:"status"` // The time when the job was created. Created time.Time `json:"created-at" bson:"created_at"` // The time when the job is available for execution. Available time.Time `json:"available-at" bson:"available_at"` // The time when the job was dequeue the last time. Started *time.Time `json:"started-at" bson:"started_at"` // The time when the last attempt ended (completed, failed or cancelled). Ended *time.Time `json:"ended-at" bson:"ended_at"` // The time when the job was finished (completed or cancelled). Finished *time.Time `json:"finished-at" bson:"finished_at"` // Attempts is incremented with each execution attempt. Attempts int `json:"attempts"` // The result submitted during completion. Result coal.Map `json:"result"` // The last message submitted when the job was failed or cancelled. Reason string `json:"reason"` }
Job is a single job managed by a queue.
type Queue ¶
type Queue struct { // MaxLag defines the maximum amount of lag that should be applied to every // dequeue attempt. // // By default multiple processes compete with each other when getting jobs // from the same queue. An artificial lag prevents multiple simultaneous // dequeue attempts and allows the worker with the smallest lag to dequeue // the job and inform the other processes to prevent another dequeue attempt. // // Default: 100ms. MaxLag time.Duration // DequeueInterval defines the time after a worker might try to dequeue a job // again that has not yet been associated. // // Default: 2s. DequeueInterval time.Duration // contains filtered or unexported fields }
Queue manages the queueing of jobs.
func (*Queue) Action ¶ added in v0.23.0
Action is a factory to create an action that can be used to enqueue jobs.
func (*Queue) Callback ¶
Callback is a factory to create callbacks that can be used to enqueue jobs during request processing.
type Status ¶
type Status string
Status defines the allowed statuses of a job.
const ( // StatusEnqueued is used as the initial status when jobs are created. StatusEnqueued Status = "enqueued" // StatusDequeued is set when a job has been successfully dequeued. StatusDequeued Status = "dequeued" // StatusCompleted is set when a job has been successfully executed. StatusCompleted Status = "completed" // StatusFailed is set when an execution of a job failed. StatusFailed Status = "failed" // StatusCancelled is set when a job has been cancelled. StatusCancelled Status = "cancelled" )
The available job statuses.
type Task ¶
type Task struct { // Name is the unique name of the task. Name string // Model is the model that holds task related data. Model Model // Handler is the callback called with jobs for processing. The handler // should return errors formatted with E to properly indicate the status of // the job. If a task execution is successful the handler may return some // data that is attached to the job. Handler func(*Context) error // Workers defines the number for spawned workers that dequeue and execute // jobs in parallel. // // Default: 2. Workers int // MaxAttempts defines the maximum attempts to complete a task. Zero means // that the jobs is retried forever. The error retry field will take // precedence to this setting and allow retry beyond the configured maximum. // // Default: 0 MaxAttempts int // Interval defines the rate at which the worker will request a job from the // queue. // // Default: 100ms. Interval time.Duration // MinDelay is the minimal time after a failed task is retried. // // Default: 1s. MinDelay time.Duration // MaxDelay is the maximal time after a failed task is retried. // // Default: 10m. MaxDelay time.Duration // DelayFactor defines the exponential increase of the delay after individual // attempts. // // Default: 2. DelayFactor float64 // Timeout is the time after which a task can be dequeued again in case the // worker was not able to set its status. // // Default: 10m. Timeout time.Duration // Lifetime is the time after which the context of a job is cancelled and // the execution should be stopped. Should be several minutes less than // timeout to prevent race conditions. // // Default: 5m. Lifetime time.Duration // Periodically may be set to let the system enqueue a job automatically // every given interval. // // Default: 0. Periodically time.Duration // PeriodicJob is the blueprint of the job that is periodically enqueued. // // Default: Blueprint{Name: Task.Name}. PeriodicJob Blueprint }
Task describes work that is managed using a job queue.