Documentation ¶
Overview ¶
Package amboy provides basic infrastructure for running and describing jobs and job workflows with, potentially, minimal overhead and additional complexity.
Overview and Motivation ¶
Amboy works with 4 basic logical objects: jobs representing work; runners, which are responsible for executing jobs; queues, that represent pipelines and offline workflows of jobs (i.e. not real time, processes that run outside of the primary execution path of a program); and dependencies that represent relationships between jobs.
The inspiration for amboy was to be able to provide a unified way to define and run jobs, that would feel equally "native" for distributed applications and distributed web application, and move easily between different architectures.
While amboy users will generally implement their own Job and dependency implementations, Amboy itself provides several example Queue implementations, as well as several generic examples and prototypes of Job and dependency.Manager objects.
Generally speaking you should be able to use included amboy components to provide the queue and runner components, in conjunction with custom and generic job and dependency variations.
Consider the following example:
queue := queue.NewLocalLimitedSize(12, 50) // pass the number of workers and max capacity job := job.NewShellJob("make compile") err := queue.Put(job) if err != nil { // handle error case } err = queue.Start(ctx) // the queue starts a SimpleRunner object and // creates required channels. if err != nil { // handle error case } defer queue.Close() // Waits for all jobs to finish and releases all resources. amboy.Wait(ctx, queue) // Waits for all jobs to finish.
Waiting for Jobs to Complete ¶
The amboy package proves a number of generic methods that, using the Queue.Stats() method, block until all jobs are complete. They provide different semantics, which may be useful in different circumstances. All of the Wait* functions wait until the total number of jobs submitted to the queue is equal to the number of completed jobs, and as a result these methods don't prevent other threads from adding jobs to the queue after beginning to wait. As a special case, retryable queues will also wait until there are no retrying jobs remaining.
Additionally, there are a set of methods, WaitJob*, that allow callers to wait for a specific job to complete.
Index ¶
- Constants
- func CollateWriteErrors(errs []error) error
- func EnqueueManyUniqueJobs(ctx context.Context, queue Queue, jobs []Job) error
- func EnqueueUniqueJob(ctx context.Context, queue Queue, job Job) error
- func IntervalGroupQueueOperation(ctx context.Context, qg QueueGroup, interval time.Duration, startAt time.Time, ...)
- func IntervalQueueOperation(ctx context.Context, q Queue, interval time.Duration, startAt time.Time, ...)
- func IsDuplicateJobError(err error) bool
- func IsDuplicateJobScopeError(err error) bool
- func IsJobNotFoundError(err error) bool
- func MakeDuplicateJobError(err error) error
- func MakeDuplicateJobScopeError(err error) error
- func NewDuplicateJobError(msg string) error
- func NewDuplicateJobErrorf(msg string, args ...interface{}) error
- func NewDuplicateJobScopeError(msg string) error
- func NewDuplicateJobScopeErrorf(msg string, args ...interface{}) error
- func PopulateQueue(ctx context.Context, q Queue, jobs <-chan Job) error
- func ResolveErrors(ctx context.Context, q Queue) error
- func RunJob(ctx context.Context, job Job) error
- func Wait(ctx context.Context, q Queue) bool
- func WaitInterval(ctx context.Context, q Queue, interval time.Duration) bool
- func WaitIntervalNum(ctx context.Context, q Queue, interval time.Duration, num int) bool
- func WaitJob(ctx context.Context, j Job, q Queue) bool
- func WaitJobInterval(ctx context.Context, j Job, q Queue, interval time.Duration) bool
- func WithRetryableQueue(q Queue, op func(RetryableQueue)) bool
- type AbortableRunner
- type Format
- type GroupQueueOperation
- type Job
- type JobInfo
- type JobNotFoundError
- type JobRetryInfo
- type JobRetryOptions
- type JobStatusInfo
- type JobTimeInfo
- type JobType
- type Queue
- type QueueGroup
- type QueueInfo
- type QueueOperation
- type QueueOperationConfig
- type QueueOptions
- type QueueReport
- type QueueStats
- func (s *QueueStats) Annotate(key string, value interface{}) error
- func (s QueueStats) IsComplete() bool
- func (s QueueStats) Loggable() bool
- func (s QueueStats) Priority() level.Priority
- func (s QueueStats) Raw() interface{}
- func (s *QueueStats) SetPriority(l level.Priority) error
- func (s QueueStats) String() string
- type RetryHandler
- type RetryHandlerOptions
- type RetryableQueue
- type Runner
Constants ¶
const LockTimeout = 10 * time.Minute
LockTimeout describes the default period of time that a queue will respect a stale lock from another queue before beginning work on a job.
Variables ¶
This section is empty.
Functions ¶
func CollateWriteErrors ¶
CollateWriteErrors collates errors into a [writeErrors].
func EnqueueManyUniqueJobs ¶
EnqueueManyUniqueJobs is a generic wrapper for adding jobs to a queue (using the PutMany() method) ignoring duplicate job errors.
func EnqueueUniqueJob ¶
EnqueueUniqueJob is a generic wrapper for adding jobs to queues (using the Put() method), but that ignores duplicate job errors.
func IntervalGroupQueueOperation ¶
func IntervalGroupQueueOperation(ctx context.Context, qg QueueGroup, interval time.Duration, startAt time.Time, conf QueueOperationConfig, ops ...GroupQueueOperation)
IntervalGroupQueueOperation schedules jobs on a queue group with similar semantics as IntervalQueueOperation.
Operations will continue to run as long as the context is not canceled. If you do not pass any GroupQueueOperation items to this function, it panics.
func IntervalQueueOperation ¶
func IntervalQueueOperation(ctx context.Context, q Queue, interval time.Duration, startAt time.Time, conf QueueOperationConfig, op QueueOperation)
IntervalQueueOperation runs a queue scheduling operation on a regular interval, starting at specific time. Use this method to schedule jobs every hour, or similar use-cases.
func IsDuplicateJobError ¶
IsDuplicateJobError checks if an error is due to a duplicate job in the queue.
func IsDuplicateJobScopeError ¶
IsDuplicateJobScopeError checks if an error is due to a duplicate job scope in the queue.
func IsJobNotFoundError ¶
IsJobNotFound checks if an error was due to not being able to find the job in the queue.
func MakeDuplicateJobError ¶
MakeDuplicateJobError constructs a duplicate job error from an existing error of any type, for use by queue implementations.
func MakeDuplicateJobScopeError ¶
MakeDuplicateJobScopeError constructs a duplicate job scope error from an existing error of any type, for use by queue implementations.
func NewDuplicateJobError ¶
NewDuplicateJobError creates a new error to represent a duplicate job error, for use by queue implementations.
func NewDuplicateJobErrorf ¶
NewDuplicateJobErrorf creates a new error to represent a duplicate job error with a formatted message, for use by queue implementations.
func NewDuplicateJobScopeError ¶
NewDuplicateJobScopeError creates a new error object to represent a duplicate job scope error, for use by queue implementations.
func NewDuplicateJobScopeErrorf ¶
NewDuplicateJobScopeErrorf creates a new error object to represent a duplicate job scope error with a formatted message, for use by queue implementations.
func PopulateQueue ¶
PopulateQueue adds Jobs from a channel to a Queue and returns an error with the aggregated results of these operations.
func ResolveErrors ¶
ResolveErrors takes a Queue and iterates over the completed Jobs' results, returning a single aggregated error for all of the Queue's Jobs.
func RunJob ¶
RunJob executes a single job directly, without a Queue, with similar semantics as it would execute in a Queue: MaxTime is respected, and it uses similar logging as is present in the queue, with errors propagated functionally.
func Wait ¶
Wait takes a queue and blocks until all job are completed or the context is canceled. This operation runs in a tight-loop, which means that the Wait will return *as soon* as all jobs are complete. Conversely, it's also possible that frequent repeated calls to Stats() may contend with resources needed for dispatching jobs or marking them complete. Retrying jobs are not considered complete.
func WaitInterval ¶
WaitInterval provides the Wait operation and accepts a context for cancellation while also waiting for an interval between stats calls. The return value reports if the operation was canceled or if all jobs are complete. Retrying jobs are not considered complete.
func WaitIntervalNum ¶
WaitIntervalNum waits for a certain number of jobs to complete. Retrying jobs are not considered complete.
func WaitJob ¶
WaitJob blocks until the job, based on its ID, is marked complete in the queue, or the context is canceled. The return value is false if the job does not exist (or is removed) and true when the job completes. A retrying job is not considered complete.
func WaitJobInterval ¶
WaitJobInterval takes a job and queue object and waits for the job to be marked complete. The interval parameter controls how long the operation waits between checks, and can be used to limit the impact of waiting on a busy queue. The operation returns false if the job is not registered in the queue, and true when the job completes. A retrying job is not considered complete.
func WithRetryableQueue ¶
func WithRetryableQueue(q Queue, op func(RetryableQueue)) bool
WithRetryableQueue is a convenience function to perform an operation if the Queue is a RetryableQueue; otherwise, it is a no-op. Returns whether ot not the queue was a RetryableQueue.
Types ¶
type AbortableRunner ¶
type AbortableRunner interface { Runner IsRunning(string) bool RunningJobs() []string Abort(context.Context, string) error AbortAll(context.Context) error }
AbortableRunner provides a superset of the Runner interface but allows callers to abort jobs by ID.
type Format ¶
type Format int
Format defines a sequence of constants used to distinguish between different serialization formats for job objects used in the amboy.ConvertTo and amboy.ConvertFrom functions, which support the functionality of the Export and Import methods in the job interface.
Supported values of the Format type, which represent different supported serialization methods.
type GroupQueueOperation ¶
type GroupQueueOperation struct { Operation QueueOperation Queue string Check func(context.Context) bool }
GroupQueueOperation describes a single queue population operation for a group queue.
type Job ¶
type Job interface { // Provides a unique identifier for a job. Queues may error if // two jobs have different IDs. ID() string // The primary execution method for the job. Should toggle the // completed state for the job. Run(context.Context) // Type returns a JobType object that describes the kind of job that's being // run. Queue implementations can use to de-serialize jobs. Type() JobType // Provides access to the job's dependency information, and // allows queues to override a dependency (e.g. in a force // build state, or as part of serializing dependency objects // with jobs.) Dependency() dependency.Manager SetDependency(dependency.Manager) // Provides access to the JobStatusInfo object for the job, // which reports the current state. Status() JobStatusInfo SetStatus(JobStatusInfo) // TimeInfo reports the start/end time of jobs, as well as "wait until" and // "dispatch by" options that queues can use to schedule jobs in the future. // UpdateTimeInfo only modifies non-zero fields. TimeInfo() JobTimeInfo UpdateTimeInfo(JobTimeInfo) // SetTimeInfo is like UpdateTimeInfo but overwrites all time info, // including zero fields. SetTimeInfo(JobTimeInfo) // RetryInfo reports information about the job's retry behavior. RetryInfo() JobRetryInfo // UpdateRetryInfo method modifies all set fields from the given options. UpdateRetryInfo(JobRetryOptions) // AddError allows another actor to annotate the job with an // error. AddError(error) // AddRetryableError annotates the job with an error and marks the job as // needing to retry. AddRetryableError(error) // Error returns an error object if the job encountered an // error. Typically if the job has not run, this is nil. Error() error // IsLastAttempt returns whether or not the job is on its final attempt. if // true, the job will not retry. IsLastAttempt() bool // Lock and Unlock are responsible for handling the locking // behavor for the job. Lock is responsible for setting the // owner (its argument), incrementing the modification count // and marking the job in progress, and returning an error if // another worker has access to the job. Unlock is responsible // for unsetting the owner and marking the job as // not-in-progress, and should be a no-op if the job does not // belong to the owner. In general the owner should be the value // of queue.ID() Lock(owner string, lockTimeout time.Duration) error Unlock(owner string, lockTimeout time.Duration) // Scopes provide the ability to configure mutual exclusion for a job in a // queue. The Scopes method returns the current mutual exclusion locks for // the job. // SetScopes configures the mutually exclusive lock(s) that a job in a queue // should acquire. When called, it does not actually take a lock; rather, it // signals the intention to lock within the queue. This is typically called // when first initializing the job before enqueueing it; it is invalid for // end users to call SetScopes after the job has already dispatched. Scopes() []string SetScopes([]string) // EnqueueScopes allows the scope exclusion functionality to be configured // so that exclusion occurs during either job dispatch or enqueue. Scopes // can either be individually selected (with SetEnqueueScopes) or all // selected (with SetEnqueueAllScopes) to be enqueued with their scopes // applied. EnqueueScopes() []string SetEnqueueScopes(scopes ...string) // EnqueueAllScopes is a convenience function to apply the // scope exclusion behavior to all scopes. If set, all scopes will be // applied on enqueue. EnqueueAllScopes() bool SetEnqueueAllScopes(bool) }
Job describes a unit of work. Implementations of Job instances are the content of the Queue. The amboy/job package contains several general purpose and example implementations. Jobs are responsible, primarily via their Dependency property, for determining: if they need to run, and what Jobs they depend on. Actual use of the dependency system is the responsibility of the Queue implementation.
In most cases, applications only need to implement the Run() method, all additional functionality is provided by the job.Base type, which can be embedded anonymously in implementations of the Job.
type JobInfo ¶
type JobInfo struct { ID string Type JobType Status JobStatusInfo Time JobTimeInfo Retry JobRetryInfo }
JobInfo provides a view of information for a Job.
type JobNotFoundError ¶
type JobNotFoundError struct {
// contains filtered or unexported fields
}
JobNotFoundError represents an error indicating that a job could not be found in a queue.
func MakeJobNotFoundError ¶
func MakeJobNotFoundError(err error) *JobNotFoundError
MakeJobNotFoundError constructs an error from an existing one, indicating that a job could not be found in the queue.
func NewJobNotFoundError ¶
func NewJobNotFoundError(msg string) *JobNotFoundError
NewJobNotFoundError creates a new error indicating that a job could not be found in the queue.
func NewJobNotFoundErrorf ¶
func NewJobNotFoundErrorf(msg string, args ...interface{}) *JobNotFoundError
NewJobNotFoundErrorf creates a new error with a formatted message, indicating that a job could not be found in the queue.
func (*JobNotFoundError) Error ¶
func (e *JobNotFoundError) Error() string
Error returns the error message from the job not found error to provide more context as to why the job was not found.
type JobRetryInfo ¶
type JobRetryInfo struct { // Retryable indicates whether the job can use Amboy's built-in retry // mechanism. This should typically be set when first initializing the job; // it is invalid for end users to modify Retryable once the job has already // been dispatched. Retryable bool `bson:"retryable" json:"retryable,omitempty" yaml:"retryable,omitempty"` // NeedsRetry indicates whether the job is supposed to retry when it is // complete. This will only be considered if Retryable is true. NeedsRetry bool `bson:"needs_retry" json:"needs_retry,omitempty" yaml:"needs_retry,omitempty"` // BaseJobID is the job ID of the original job that was retried, ignoring // any additional retry metadata. BaseJobID string `bson:"base_job_id,omitempty" json:"base_job_id,omitempty" yaml:"base_job_id,omitempty"` // CurrentAttempt is the current attempt number. This is zero-indexed // (unless otherwise set on enqueue), so the first time the job attempts to // run, its value is 0. Each subsequent retry increments this value. CurrentAttempt int `bson:"current_attempt" json:"current_attempt,omitempty" yaml:"current_attempt,omitempty"` // MaxAttempts is the maximum number of attempts for a job. This is // 1-indexed since it is a count. For example, if this is set to 3, the job // will be allowed to run 3 times at most. If unset, the default maximum // attempts is 10. MaxAttempts int `bson:"max_attempts,omitempty" json:"max_attempts,omitempty" yaml:"max_attempts,omitempty"` // DispatchBy reflects the amount of time (relative to when the job is // retried) that the retried job has to dispatch for execution. If this // deadline elapses, the job will not run. This is analogous to // (JobTimeInfo).DispatchBy. DispatchBy time.Duration `bson:"dispatch_by,omitempty" json:"dispatch_by,omitempty" yaml:"dispatch_by,omitempty"` // WaitUntil reflects the amount of time (relative to when the job is // retried) that the retried job has to wait before it can be dispatched for // execution. The job will not run until this waiting period elapses. This // is analogous to (JobTimeInfo).WaitUntil. WaitUntil time.Duration `bson:"wait_until,omitempty" json:"wait_until,omitempty" yaml:"wait_until,omitempty"` // Start is the time that the job began retrying. Start time.Time `bson:"start,omitempty" json:"start,omitempty" yaml:"start,omitempty"` // End is the time that the job finished retrying. End time.Time `bson:"end,omitempty" json:"end,omitempty" yaml:"end,omitempty"` }
JobRetryInfo stores configuration and information for a job that can retry. Support for retrying jobs is only supported by RetryableQueues.
func (JobRetryInfo) GetMaxAttempts ¶
func (info JobRetryInfo) GetMaxAttempts() int
GetMaxAttempts returns the maximum number of times a job is allowed to attempt. It defaults the maximum attempts if it's unset.
func (JobRetryInfo) GetRemainingAttempts ¶
func (info JobRetryInfo) GetRemainingAttempts() int
GetRemainingAttempts returns the number of times this job is still allow to attempt, excluding the current attempt.
func (*JobRetryInfo) Options ¶
func (i *JobRetryInfo) Options() JobRetryOptions
Options returns a JobRetryInfo as its equivalent JobRetryOptions. In other words, if the returned result is used with Job.UpdateRetryInfo(), the job will be populated with the same information as this JobRetryInfo.
func (JobRetryInfo) ShouldRetry ¶
func (i JobRetryInfo) ShouldRetry() bool
ShouldRetry returns whether or not the associated job is supposed to retry upon completion.
type JobRetryOptions ¶
type JobRetryOptions struct { Retryable *bool `bson:"-" json:"-" yaml:"-"` NeedsRetry *bool `bson:"-" json:"-" yaml:"-"` CurrentAttempt *int `bson:"-" json:"-" yaml:"-"` MaxAttempts *int `bson:"-" json:"-" yaml:"-"` DispatchBy *time.Duration `bson:"-" json:"-" yaml:"-"` WaitUntil *time.Duration `bson:"-" json:"-" yaml:"-"` Start *time.Time `bson:"-" json:"-" yaml:"-"` End *time.Time `bson:"-" json:"-" yaml:"-"` }
JobRetryOptions represents configuration options for a job that can retry. Their meaning corresponds to the fields in JobRetryInfo, but is more amenable to optional input values.
type JobStatusInfo ¶
type JobStatusInfo struct { Owner string `bson:"owner" json:"owner" yaml:"owner"` Completed bool `bson:"completed" json:"completed" yaml:"completed"` InProgress bool `bson:"in_prog" json:"in_progress" yaml:"in_progress"` ModificationTime time.Time `bson:"mod_ts" json:"mod_time" yaml:"mod_time"` ModificationCount int `bson:"mod_count" json:"mod_count" yaml:"mod_count"` ErrorCount int `bson:"err_count" json:"err_count" yaml:"err_count"` Errors []string `bson:"errors,omitempty" json:"errors,omitempty" yaml:"errors,omitempty"` }
JobStatusInfo contains information about the current status of a job and is reported by the Status and set by the SetStatus methods in the Job interface.
type JobTimeInfo ¶
type JobTimeInfo struct { Created time.Time `bson:"created,omitempty" json:"created,omitempty" yaml:"created,omitempty"` Start time.Time `bson:"start,omitempty" json:"start,omitempty" yaml:"start,omitempty"` End time.Time `bson:"end,omitempty" json:"end,omitempty" yaml:"end,omitempty"` // WaitUntil defers execution of a job until a particular time has elapsed. // Support for this feature in Queue implementations is optional. WaitUntil time.Time `bson:"wait_until,omitempty" json:"wait_until,omitempty" yaml:"wait_until,omitempty"` // DispatchBy is a deadline before which the job must run. Support for this // feature in Queue implementations is optional. Queues that support this // feature may remove the job if the deadline has passed. DispatchBy time.Time `bson:"dispatch_by,omitempty" json:"dispatch_by,omitempty" yaml:"dispatch_by,omitempty"` // MaxTime is the maximum time that the job is allowed to run. If the // runtime exceeds this duration, the Queue should abort the job. MaxTime time.Duration `bson:"max_time,omitempty" json:"max_time,omitempty" yaml:"max_time,omitempty"` }
JobTimeInfo stores timing information for a job and is used by both the Runner and Job implementations to track how long jobs take to execute.
func (JobTimeInfo) Duration ¶
func (j JobTimeInfo) Duration() time.Duration
Duration is a convenience function to return a duration for a job.
func (JobTimeInfo) IsDispatchable ¶
func (j JobTimeInfo) IsDispatchable() bool
IsDispatchable determines if the job should be dispatched based on the value of WaitUntil.
func (JobTimeInfo) IsStale ¶
func (j JobTimeInfo) IsStale() bool
IsStale determines if the job is too old to be dispatched, and if so, queues may skip or dequeue the job.
func (JobTimeInfo) Validate ¶
func (j JobTimeInfo) Validate() error
Validate ensures that the structure has reasonable values set.
type JobType ¶
type JobType struct { Name string `json:"name" bson:"name" yaml:"name"` Version int `json:"version" bson:"version" yaml:"version"` }
JobType contains information about the type of a job, which queues can use to serialize objects. All Job implementations must store and produce instances of this type that identify the type and implementation version.
type Queue ¶
type Queue interface { // ID returns a unique identifier for the instance of the queue. ID() string // Put adds a job to the queue. Put(context.Context, Job) error // PutMany adds jobs to the queue. PutMany(context.Context, []Job) error // Get finds a Job by ID. The boolean return value indicates if the Job was // found or not. Get(context.Context, string) (Job, bool) // Next returns the next available Job to run in the Queue. Next(context.Context) Job // Info returns information related to management of the Queue. Info() QueueInfo // Complete marks a Job as completed executing. Complete(context.Context, Job) error // Save persists the state of a current Job to the underlying storage, // generally in support of locking and incremental persistence. // Implementations should error if the job does not exist in the Queue or if // the Job state within the Queue has been modified to invalidate the // in-memory ownership of the Job. Save(context.Context, Job) error // Results returns a channel that produces completed Job objects. Results(context.Context) <-chan Job // JobInfo returns a channel that produces the information for all Jobs in // the Queue. JobInfo(context.Context) <-chan JobInfo // Stats returns statistics about the current state of the Queue. Stats(context.Context) QueueStats // Runner returns the Runner implementation that is running Jobs for the // Queue. Runner() Runner // SetRunner sets the Runner that is running Jobs for the Queue. This // permits runtime substitution of Runner implementations. However, Queue // implementations are not expected to permit users to change Runner // implementations after starting the Queue. SetRunner(Runner) error // Start begins the execution of Jobs in the Queue. Start(context.Context) error // Close cleans up all resources used by the Queue. Close(context.Context) }
Queue describes a very simple Job queue interface that allows users to define Job objects, add them to a worker queue, and execute jobs from that queue. Queue implementations may run locally or as part of a distributed application, with multiple workers and submitter Queue instances, which can support different job dispatching and organization properties.
type QueueGroup ¶
type QueueGroup interface { // Get a queue with the given identifier, creating a new queue dynamically // if necessary. Implementations should respect QueueOptions if creating a // new queue, but may ignore the options if the queue already exists. Get(context.Context, string, ...QueueOptions) (Queue, error) // Put a queue at the given index. Put(context.Context, string, Queue) error // Prune old queues. Prune(context.Context) error // Close the queues. Close(context.Context) error // Len returns the number of active queues managed in the // group. Len() int // Queues returns all currently registered and running queues Queues(context.Context) []string }
QueueGroup describes a group of queues. Each queue is indexed by a string. Users can use these queues if there are many different types of work or if the types of work are only knowable at runtime.
type QueueOperation ¶
QueueOperation is a named function literal for use in the PeriodicQueueOperation function. Typically these functions add jobs to a queue, or could be used to perform periodic maintenance (e.g. removing stale jobs or removing stuck jobs in a dependency queue.)
func GroupQueueOperationFactory ¶
func GroupQueueOperationFactory(first QueueOperation, ops ...QueueOperation) QueueOperation
GroupQueueOperationFactory produces a QueueOperation that aggregates and runs one or more QueueOperations. The QueueOperation has continue-on-error semantics, and returns an error if any of the QueueOperations fail, but attempts to run all specified QueueOperations before propagating errors.
func ScheduleJobFactory ¶
func ScheduleJobFactory(op func() Job) QueueOperation
ScheduleJobFactory produces a QueueOpertion that calls a single function which returns a Job and puts that job into the queue.
func ScheduleJobsFromGeneratorFactory ¶
func ScheduleJobsFromGeneratorFactory(op func() <-chan Job) QueueOperation
ScheduleJobsFromGeneratorFactory produces a queue operation that calls a single generator function which returns channel of Jobs and puts those jobs into the queue. The QueueOperation attempts to add all jobs in the slice and returns an error if the Queue.Put opertion failed for any (e.g. continue-on-error semantics). The error returned aggregates all errors encountered.
func ScheduleManyJobsFactory ¶
func ScheduleManyJobsFactory(op func() []Job) QueueOperation
ScheduleManyJobsFactory produces a queue operation that calls a single function which returns a slice of jobs and puts those jobs into the queue. The QueueOperation attempts to add all jobs in the slice and returns an error if the Queue.Put opertion failed for any (e.g. continue-on-error semantics). The error returned aggregates all errors encountered.
type QueueOperationConfig ¶
type QueueOperationConfig struct { ContinueOnError bool `bson:"continue_on_error" json:"continue_on_error" yaml:"continue_on_error"` LogErrors bool `bson:"log_errors" json:"log_errors" yaml:"log_errors"` DebugLogging bool `bson:"debug_logging" json:"debug_logging" yaml:"debug_logging"` RespectThreshold bool `bson:"respect_threshold" json:"respect_threshold" yaml:"respect_threshold"` EnableDuplicateJobReporting bool `bson:"enable_duplicate_reporting" json:"enable_duplicate_reporting" yaml:"enable_duplicate_reporting"` Threshold int `bson:"threshold" json:"threshold" yaml:"threshold"` }
QueueOperationConfig describes the behavior of the periodic interval schedulers.
The threshold, if RespectThreshold is set, causes the periodic scheduler to no-op if there are more than that many pending jobs.
type QueueOptions ¶
type QueueOptions interface { // BuildQueue creates a queue from the queue options. BuildQueue(ctx context.Context) (Queue, error) // Validate checks that the queue options are valid. Validate() error }
QueueOptions represent options for an individual queue in a queue group. Options are typically dependent on the particular queue implementation.
type QueueReport ¶
type QueueReport struct { Pending []string `json:"pending"` InProgress []string `json:"in_progress"` Completed []string `json:"completed"` Retrying []string `json:"retrying"` }
QueueReport holds the IDs of Jobs in a Queue based on their current state.
type QueueStats ¶
type QueueStats struct { Running int `bson:"running" json:"running" yaml:"running"` Completed int `bson:"completed" json:"completed" yaml:"completed"` Retrying int `bson:"retrying" json:"retrying" yaml:"retrying"` Pending int `bson:"pending" json:"pending" yaml:"pending"` Blocked int `bson:"blocked" json:"blocked" yaml:"blocked"` Total int `bson:"total" json:"total" yaml:"total"` Context message.Fields `bson:"context,omitempty" json:"context,omitempty" yaml:"context,omitempty"` // contains filtered or unexported fields }
QueueStats is a simple structure that the Stats() method in the Queue interface returns and tracks the state of the queue, and provides a common format for different Queue implementations to report on their state.
Implements grip's message.Composer interface when passed as a pointer.
func (*QueueStats) Annotate ¶
func (s *QueueStats) Annotate(key string, value interface{}) error
Annotate is part of the grip/message.Composer interface and allows the logging infrastructure to inject content and context into log messages.
func (QueueStats) IsComplete ¶
func (s QueueStats) IsComplete() bool
IsComplete returns true when the total number of jobs are equal to the number completed, or if the number of completed and blocked are greater than or equal to total. For retryable queues, if any job is retrying, it is not considered complete. This method is used by the Wait functions to determine when a queue has completed all actionable work.
func (QueueStats) Loggable ¶
func (s QueueStats) Loggable() bool
Loggable is part of the grip/message.Composer interface and only returns true if the queue has at least one job.
func (QueueStats) Priority ¶
func (s QueueStats) Priority() level.Priority
Priority is part of the grip/message.Composer interface and returns the priority of the message.
func (QueueStats) Raw ¶
func (s QueueStats) Raw() interface{}
Raw is part of the grip/message.Composer interface and simply returns the QueueStats object.
func (*QueueStats) SetPriority ¶
func (s *QueueStats) SetPriority(l level.Priority) error
SetPriority is part of the grip/message.Composer interface and allows the caller to configure the piroity of the message.
func (QueueStats) String ¶
func (s QueueStats) String() string
String prints a long form report of the queue for human consumption.
type RetryHandler ¶
type RetryHandler interface { // SetQueue provides a method to change the RetryableQueue where the job // should be retried. Implementations may not be able to change their Queue // association after starting. SetQueue(RetryableQueue) error // Start prepares the RetryHandler to begin processing jobs to retry. Start(context.Context) error // Started reports if the RetryHandler has started. Started() bool // Put adds a job that must be retried into the RetryHandler. Put(context.Context, Job) error // Close aborts all retry work in progress and waits for all work to finish. Close(context.Context) }
RetryHandler provides a means to retry jobs (see JobRetryOptions) within a RetryableQueue.
type RetryHandlerOptions ¶
type RetryHandlerOptions struct { // MaxRetryAttempts is the maximum number of times that the retry handler is // allowed to attempt to retry a job before it gives up. This is referring // to the retry handler's attempts to internally retry the job and is // unrelated to the job's particular max attempt setting. MaxRetryAttempts int // MaxRetryAttempts is the maximum time that the retry handler is allowed to // attempt to retry a job before it gives up. MaxRetryTime time.Duration // RetryBackoff is how long the retry handler will back off after a failed // retry attempt. RetryBackoff time.Duration // NumWorkers is the maximum number of jobs that are allowed to retry in // parallel. NumWorkers int // MaxCapacity is the total number of jobs that the RetryHandler is allowed // to hold in preparation to retry. If MaxCapacity is 0, it will be set to a // default maximum capacity. If MaxCapacity is -1, it will have unlimited // capacity. MaxCapacity int }
RetryHandlerOptions configures the behavior of a RetryHandler.
func (*RetryHandlerOptions) IsUnlimitedMaxCapacity ¶
func (opts *RetryHandlerOptions) IsUnlimitedMaxCapacity() bool
IsUnlimitedMaxCapacity returns whether or not the options specify unlimited capacity.
func (*RetryHandlerOptions) Validate ¶
func (opts *RetryHandlerOptions) Validate() error
Validate checks that all retry handler options are valid.
type RetryableQueue ¶
type RetryableQueue interface { // Queue is identical to the standard queue interface, except: // For retryable jobs, Get will retrieve the latest attempt of a job by ID. // Results will only return completed jobs that are not retrying. Queue // RetryHandler returns the handler for retrying a job in this queue. RetryHandler() RetryHandler // SetRetryHandler permits runtime substitution of RetryHandler // implementations. Queue implementations are not expected to permit users // to change RetryHandler implementations after starting the Queue. SetRetryHandler(RetryHandler) error // GetAttempt returns the retryable job associated with the given ID and // execution attempt. If it cannot find a matching job, it will return // ErrJobNotFound. This will only return retryable jobs. GetAttempt(ctx context.Context, id string, attempt int) (Job, error) // GetAllAttempts returns all execution attempts of a retryable job // associated with the given job ID. If it cannot find a matching job, it // will return ErrJobNotFound.This will only return retryable jobs. GetAllAttempts(ctx context.Context, id string) ([]Job, error) // CompleteRetryingAndPut marks an existing job toComplete in the queue (see // CompleteRetrying) as finished retrying and inserts a new job toPut in the // queue (see Put). Implementations must make this operation atomic. CompleteRetryingAndPut(ctx context.Context, toComplete, toPut Job) error // CompleteRetrying marks a job that is retrying as finished processing, so // that it will no longer retry. CompleteRetrying(ctx context.Context, j Job) error }
RetryableQueue is the same as a Queue but supports additional operations for retryable jobs.
type Runner ¶
type Runner interface { // Reports if the pool has started. Started() bool // Provides a method to change or set the pointer to the // enclosing Queue object after instance creation. Runner // implementations may not be able to change their Queue // association after starting. SetQueue(Queue) error // Prepares the runner implementation to begin doing work, if // any is required (e.g. starting workers.) Typically called // by the enclosing Queue object's Start() method. Start(context.Context) error // Terminates all in progress work and waits for processes to // return. Close(context.Context) }
Runner describes a simple worker interface for executing jobs in the context of a Queue. Used by queue implementations to run jobs. Generally Queue implementations will spawn a runner as part of their constructor or Start() methods, but client code can inject alternate Runner implementations, as required.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package dependency contains the Manager interface, along with several implementations for different kinds of dependency checks.
|
Package dependency contains the Manager interface, along with several implementations for different kinds of dependency checks. |
Package job provides tools and generic implementations of jobs for amboy Queues.
|
Package job provides tools and generic implementations of jobs for amboy Queues. |
Package logger is a set of implementations to support amboy.Queue backed grip/send.Senders for asynchronous and (generally) non-blocking log message delivery.
|
Package logger is a set of implementations to support amboy.Queue backed grip/send.Senders for asynchronous and (generally) non-blocking log message delivery. |
Package management provides increased observability and control of the state of amboy queues.
|
Package management provides increased observability and control of the state of amboy queues. |
Package pool provides specific implementations of the amboy.Runner interface that serve as the worker pools for jobs in work queues.
|
Package pool provides specific implementations of the amboy.Runner interface that serve as the worker pools for jobs in work queues. |
Package queue provides several implementations of the amboy.Queue and amboy.RemoteQueue interfaces capable of processing amboy.Job implementations.
|
Package queue provides several implementations of the amboy.Queue and amboy.RemoteQueue interfaces capable of processing amboy.Job implementations. |
Package registry contains infrastructure to support the persistence of Job definitions.
|
Package registry contains infrastructure to support the persistence of Job definitions. |