Documentation ¶
Overview ¶
Package amboy provides basic infrastructure for running and describing tasks and task workflows with, potentially, minimal overhead and additional complexity.
Overview and Motivation ¶
Amboy works with 4 basic logical objects: jobs, or descriptions of tasks; runnners, which are responsible for executing tasks; queues, that represent pipelines and offline workflows of tasks (e.g. not real time, processes that run outside of the primary execution path of a program); and dependencies that represent relationships between jobs.
The inspiration for amboy was to be able to provide a unified way to define and run jobs, that would feel equally "native" for distributed applications and distributed web application, and move easily between different architectures.
While amboy users will generally implement their own Job and dependency implementations, Amboy itself provides several example Queue implementations, as well as several generic examples and prototypes of Job and dependency.Manager objects.
Generally speaking you should be able to use included amboy components to provide the queue and runner components, in conjunction with custom and generic job and dependency variations.
Consider the following example:
queue := queue.SimpleQueue(12) // pass the number of worker threads job := job.NewShellJob("make compile") err := queue.Put(job) if err != nil { // handle error case } err = queue.Start(ctx) // the queue starts a SimpleRunner object and // creates required channels. if err != nil { // handle error case } Wait(queue) // waits for all tasks to finish. queue.Close() // waits for all tasks to finish and releases // all resources.
Waiting for Jobs to Complete ¶
The amboy package proves a number of generic methods that, using the Queue.Stats() method, block until all jobs are complete. They provide different semantics, which may be useful in different circumstances. All of these functions wait until the total number of jobs submitted to the queue is equal to the number of completed jobs, and as a result these methods don't prevent other threads from adding jobs to the queue after beginning to wait.
Additionally, there are a set of methods that allow callers to wait for a specific job to complete.
Index ¶
- Constants
- func EnqueueUniqueJob(ctx context.Context, queue Queue, job Job) error
- func ExtractErrors(ctx context.Context, catcher *erc.Collector, q Queue)
- func IntervalGroupQueueOperation(ctx context.Context, qg QueueGroup, interval time.Duration, startAt time.Time, ...)
- func IntervalQueueOperation(ctx context.Context, q Queue, interval time.Duration, startAt time.Time, ...)
- func IsDispatchable(stat JobStatusInfo, lockTimeout time.Duration) bool
- func IsDuplicateJobError(err error) bool
- func IsJobNotDefinedError(err error) bool
- func MakeDuplicateJobError(err error) error
- func MakeJobNotDefinedError(queueID, jobID string) error
- func NewDuplicateJobError(msg string) error
- func NewDuplicateJobErrorf(msg string, args ...interface{}) error
- func NewJobNotDefinedError(q Queue, id string) error
- func PopulateQueue(ctx context.Context, q Queue, jobs <-chan Job) error
- func ResolveErrors(ctx context.Context, q Queue) error
- func RetrieveErrors(ctx context.Context, errs chan<- error, q Queue)
- func RunJob(ctx context.Context, job Job) error
- func Wait(ctx context.Context, q Queue) bool
- func WaitInterval(ctx context.Context, q Queue, interval time.Duration) bool
- func WaitIntervalNum(ctx context.Context, q Queue, interval time.Duration, num int) bool
- func WaitJob(ctx context.Context, j Job, q Queue) bool
- func WaitJobInterval(ctx context.Context, j Job, q Queue, interval time.Duration) bool
- type AbortableRunner
- type DeletableJobQueue
- type GroupQueueOperation
- type Job
- type JobStatusInfo
- type JobTimeInfo
- type JobType
- type Queue
- type QueueGroup
- type QueueInfo
- type QueueOperation
- type QueueOperationConfig
- type QueueReport
- type QueueStats
- func (s *QueueStats) Annotate(key string, value interface{})
- func (s QueueStats) IsComplete() bool
- func (s QueueStats) Loggable() bool
- func (s QueueStats) Priority() level.Priority
- func (s QueueStats) Raw() interface{}
- func (QueueStats) Schema() string
- func (*QueueStats) SetOption(opts ...message.Option)
- func (s *QueueStats) SetPriority(l level.Priority)
- func (s QueueStats) String() string
- func (*QueueStats) Structured() bool
- type Runner
Constants ¶
const LockTimeout = 10 * time.Minute
LockTimeout describes the default period of time that a queue will respect a stale lock from another queue before beginning work on a job.
Variables ¶
This section is empty.
Functions ¶
func EnqueueUniqueJob ¶
EnqueueUniqueJob is a generic wrapper for adding jobs to queues (using the Put() method), but that ignores duplicate job errors.
func ExtractErrors ¶
ExtractErrors adds any errors in the completed jobs of the queue to the specified catcher.
func IntervalGroupQueueOperation ¶
func IntervalGroupQueueOperation(ctx context.Context, qg QueueGroup, interval time.Duration, startAt time.Time, conf QueueOperationConfig, ops ...GroupQueueOperation)
IntervalGroupQueueOperation schedules jobs on a queue group with similar semantics as IntervalQueueOperation.
Operations will continue to run as long as the context is not canceled. If you do not pass any GroupQueueOperation items to this function, it panics.
func IntervalQueueOperation ¶
func IntervalQueueOperation(ctx context.Context, q Queue, interval time.Duration, startAt time.Time, conf QueueOperationConfig, op QueueOperation)
IntervalQueueOperation runs a queue scheduling operation on a regular interval, starting at specific time. Use this method to schedule jobs every hour, or similar use-cases.
func IsDispatchable ¶
func IsDispatchable(stat JobStatusInfo, lockTimeout time.Duration) bool
IsDispatchable reports if a job acn be dispatched (e.g. its not in progress and not complete) or if it is in progress the lock has expired.
func IsDuplicateJobError ¶
IsDuplicateJobError tests an error object to see if it is a duplicate job error.
func IsJobNotDefinedError ¶
IsJobNotDefinedError returns true if the error is a job-not-defined error, and false otherwise.
func MakeDuplicateJobError ¶
MakeDuplicateJobError constructs a duplicate job error from an existing error of any type, for use by queue implementations.
func MakeJobNotDefinedError ¶
MakeJobNotDefinedError provides a less well typed constructor for a job-not-defined error.
func NewDuplicateJobError ¶
NewDuplicateJobError creates a new error object to represent a duplicate job error, for use by queue implementations.
func NewDuplicateJobErrorf ¶
NewDuplicateJobErrorf creates a new error object to represent a duplicate job error with a formatted message, for use by queue implementations.
func NewJobNotDefinedError ¶
NewJobNotDefinedError produces an error that is detectable with IsJobNotDefinedError, and can be produced by Get methods of queues.
func PopulateQueue ¶
PopulateQueue adds jobs from a channel to a queue and returns an error with the aggregated results of these operations.
func ResolveErrors ¶
ResolveErrors takes a queue object and iterates over the results and returns a single aggregated error for the queue's job. The completeness of this operation depends on the implementation of a the queue implementation's Results() method.
func RetrieveErrors ¶
RetrieveErrors adds any errors in the completed jobs of the queue to the specified error channel.
func RunJob ¶
RunJob executes a single job directly, without a queue, with similar semantics as it would execute in a queue: MaxTime is respected, and it uses similar logging as is present in the queue, with errors propogated functionally.
func Wait ¶
Wait takes a queue and blocks until all tasks are completed or the context is canceled. This operation runs in a tight-loop, which means that the Wait will return *as soon* as possible all tasks or complete. Conversely, it's also possible that frequent repeated calls to Stats() may contend with resources needed for dispatching jobs or marking them complete.
func WaitInterval ¶
WaitInterval provides the Wait operation and accepts a context for cancellation while also waiting for an interval between stats calls. The return value reports if the operation was canceled or if all tasks are complete.
func WaitIntervalNum ¶
WaitIntervalNum waits for a certain number of jobs to complete, with the same semantics as WaitCtxInterval.
func WaitJob ¶
WaitJob blocks until the job, based on its ID, is marked complete in the queue, or the context is canceled. The return value is false if the job does not exist (or is removed) and true when the job completes.
func WaitJobInterval ¶
WaitJobInterval takes a job and queue object and waits for the job to be marked complete. The interval parameter controls how long the operation waits between checks, and can be used to limit the impact of waiting on a busy queue. The operation returns false if the job is not registered in the queue, and true when the job completes.
Types ¶
type AbortableRunner ¶
type AbortableRunner interface { Runner IsRunning(string) bool RunningJobs() []string Abort(context.Context, string) error AbortAll(context.Context) }
AbortableRunner provides a superset of the Runner interface but allows callers to abort jobs by ID.
Implementations should be sure to abort jobs in such a way that they not set the job to be canceled, and have the effect of "completing" the job.
type DeletableJobQueue ¶
DeletableJobQueue describes an optional feature superset of a queue that allows some jobs to be deleted from the underlying storage by passing that job's ID.
Queue implementations may decide how to handle deletion of jobs if they implement it at all. In most cases the deletion of an in-progress job will result in undefined behavior.
type GroupQueueOperation ¶
type GroupQueueOperation struct { Operation QueueOperation Queue string Check func(context.Context) bool }
GroupQueueOperation describes a single queue population operation for a group queue.
type Job ¶
type Job interface { // Provides a unique identifier for a job. Queues may error if // two jobs have different IDs. ID() string // The primary execution method for the job. Should toggle the // completed state for the job. Run(context.Context) // Returns a pointer to a JobType object that Queue // implementations can use to de-serialize tasks. Type() JobType // Provides access to the job's dependency information, and // allows queues to override a dependency (e.g. in a force // build state, or as part of serializing dependency objects // with jobs.) Dependency() dependency.Manager SetDependency(dependency.Manager) // Provides access to the JobStatusInfo object for the job, // which reports the current state. Status() JobStatusInfo SetStatus(JobStatusInfo) // TimeInfo reports the start/end time of jobs, as well as // providing for a "wait until" functionality that queues can // use to schedule jobs in the future. The update method, only // updates non-zero methods. TimeInfo() JobTimeInfo UpdateTimeInfo(JobTimeInfo) // Provides access to the job's priority value, which some // queues may use to order job dispatching. Most Jobs // implement these values by composing the // amboy/priority.Value type. Priority() int SetPriority(int) // AddError allows another actor to annotate the job with an // error. AddError(error) // Error returns an error object if the task was an // error. Typically if the job has not run, this is nil. Error() error // Lock and Unlock are responsible for handling the locking // behavor for the job. Lock is responsible for setting the // owner (its argument), incrementing the modification count // and marking the job in progress, and returning an error if // another worker has access to the job. Unlock is responsible // for unsetting the owner and marking the job as // not-in-progress, and should be a no-op if the job does not // belong to the owner. In general the owner should be the value // of queue.ID() Lock(owner string, lockTimeout time.Duration) error Unlock(owner string, lockTimeout time.Duration) // Scope provides the ability to provide more configurable // exclusion a job can provide. Scopes() []string SetScopes([]string) }
Job describes a unit of work. Implementations of Job instances are the content of the Queue. The amboy/job package contains several general purpose and example implementations. Jobs are responsible, primarily via their Dependency property, for determining: if they need to run, and what Jobs they depend on. Actual use of the dependency system is the responsibility of the Queue implementation.
In most cases, applications only need to implement the Run() method, all additional functionality is provided by the job.Base type, which can be embedded anonymously in implementations of the Job.
type JobStatusInfo ¶
type JobStatusInfo struct { ID string `bson:"id,omitempty" json:"id,omitempty" yaml:"id,omitempty" db:"id"` Owner string `bson:"owner" json:"owner" yaml:"owner" db:"owner"` Completed bool `bson:"completed" json:"completed" yaml:"completed" db:"completed" ` InProgress bool `bson:"in_prog" json:"in_progress" yaml:"in_progress" db:"in_progress"` Canceled bool `bson:"canceled" json:"canceled" yaml:"canceled" db:"canceled"` ModificationTime time.Time `bson:"mod_ts" json:"mod_time" yaml:"mod_time" db:"updated_at"` ModificationCount int `bson:"mod_count" json:"mod_count" yaml:"mod_count" db:"mod_count"` ErrorCount int `bson:"err_count" json:"err_count" yaml:"err_count" db:"err_count"` Errors []string `bson:"errors,omitempty" json:"errors,omitempty" yaml:"errors,omitempty" db:"_"` }
JobStatusInfo contains information about the current status of a job and is reported by the Status and set by the SetStatus methods in the Job interface.e
type JobTimeInfo ¶
type JobTimeInfo struct { ID string `bson:"id,omitempty" json:"id,omitempty" yaml:"id,omitempty" db:"id"` Created time.Time `bson:"created,omitempty" json:"created,omitempty" yaml:"created,omitempty" db:"created_at"` Start time.Time `bson:"start,omitempty" json:"start,omitempty" yaml:"start,omitempty" db:"started_at"` End time.Time `bson:"end,omitempty" json:"end,omitempty" yaml:"end,omitempty" db:"ended_at"` WaitUntil time.Time `bson:"wait_until" json:"wait_until,omitempty" yaml:"wait_until,omitempty" db:"wait_until"` DispatchBy time.Time `bson:"dispatch_by" json:"dispatch_by,omitempty" yaml:"dispatch_by,omitempty" db:"dispatch_by"` MaxTime time.Duration `bson:"max_time" json:"max_time,omitempty" yaml:"max_time,omitempty" db:"max_time"` }
JobTimeInfo stores timing information for a job and is used by both the Runner and Job implementations to track how long jobs take to execute.
Additionally, the Queue implementations __may__ use WaitUntil to defer the execution of a job, until WaitUntil refers to a time in the past.
If the deadline is specified, and the queue implementation supports it, the queue may drop the job if the deadline is in the past when the job would be dispatched.
func (JobTimeInfo) Duration ¶
func (j JobTimeInfo) Duration() time.Duration
Duration is a convenience function to return a duration for a job.
func (JobTimeInfo) IsDispatchable ¶
func (j JobTimeInfo) IsDispatchable() bool
IsDispatchable determines if the job should be dispatched based on the value of WaitUntil.
func (JobTimeInfo) IsStale ¶
func (j JobTimeInfo) IsStale() bool
IsStale determines if the job is too old to be dispatched, and if so, queues may remove or drop the job entirely.
func (JobTimeInfo) Validate ¶
func (j JobTimeInfo) Validate() error
Validate ensures that the structure has reasonable values set.
type JobType ¶
type JobType struct { Name string `json:"name" bson:"name" yaml:"name"` Version int `json:"version" bson:"version" yaml:"version"` }
JobType contains information about the type of a job, which queues can use to serialize objects. All Job implementations must store and produce instances of this type that identify the type and implementation version.
type Queue ¶
type Queue interface { // Used to add a job to the queue. Should only error if the // Queue cannot accept jobs if the job already exists in a // queue. Put(context.Context, Job) error // Returns a unique identifier for the instance of the queue. ID() string // Given a job id, get that job. The second return value is a // Boolean, which indicates if the named job had been // registered by a Queue. Get(context.Context, string) (Job, error) // Returns the next job in the queue. These calls are // blocking, but may be interrupted with a canceled // context. Next must return a non-nil job if error is nil. Next(context.Context) (Job, error) // Info returns information related to management of the Queue. Info() QueueInfo // Used to mark a Job complete and remove it from the pending // work of the queue. Complete(context.Context, Job) error // Saves the state of a current job to the underlying storage, // generally in support of locking and incremental // persistence. Should error if the job does not exist (use // put,) or if the queue does not have ownership of the job. Save(context.Context, Job) error // Channel that produces all job objects Job objects. Jobs(context.Context) <-chan Job // Returns an object that contains statistics about the // current state of the Queue. Stats(context.Context) QueueStats // Getter for the Runner implementation embedded in the Queue // instance. Runner() Runner // Setter for the Runner implementation embedded in the Queue // instance. Permits runtime substitution of interfaces, but // implementations are not expected to permit users to change // runner implementations after starting the Queue. SetRunner(Runner) error // Begins the execution of the job Queue, using the embedded // Runner. Start(context.Context) error // Close will terminate the runner (e.g. Runner().Close()) // and also release any queue specific resources. Close(context.Context) error }
Queue describes a very simple Job queue interface that allows users to define Job objects, add them to a worker queue and execute tasks from that queue. Queue implementations may run locally or as part of a distributed application, with multiple workers and submitter Queue instances, which can support different job dispatching and organization properties.
type QueueGroup ¶
type QueueGroup interface { // Get a queue with the given index. Most implementations will // create a queue if it doesn't exist in a Get operation. Get(context.Context, string) (Queue, error) // Put a queue at the given index. Because most // implementations create new queues in their Get operation, // the Put operation for groups of queues that don't rely on // the default queue construction in Get, and need not be used // directly. Put(context.Context, string, Queue) error // Start all queues with pending work, and for QueueGroup // implementations that have background workers to prune idle // queues or create queues with pending work (for queues with // distributed backends.) Start(context.Context) error // Prune old queues. Most queue implementations automatically // prune queues that are inactive based on some TTL. For most // implementations, users will never need to call this // directly. Prune(context.Context) error // Close the queues. Close(context.Context) error // Len returns the number of active queues managed in the // group. Len() int // Queues returns all currently registered and running queues Queues(context.Context) []string }
QueueGroup describes a group of queues. Each queue is indexed by a string. Users can use these queues if there are many different types of work or if the types of work are only knowable at runtime.
type QueueOperation ¶
QueueOperation is a named function literal for use in the PeriodicQueueOperation function. Typically these functions add jobs to a queue, or could be used to perform periodic maintenance (e.g. removing stale jobs or removing stuck jobs in a dependency queue.)
func GroupQueueOperationFactory ¶
func GroupQueueOperationFactory(first QueueOperation, ops ...QueueOperation) QueueOperation
GroupQueueOperationFactory produces a QueueOperation that aggregates and runs one or more QueueOperations. The QueueOperation has continue-on-error semantics, and returns an error if any of the QueueOperations fail, but attempts to run all specified QueueOperations before propagating errors.
func ScheduleJobFactory ¶
func ScheduleJobFactory(op func() Job) QueueOperation
ScheduleJobFactory produces a QueueOpertion that calls a single function which returns a Job and puts that job into the queue.
func ScheduleJobsFromGeneratorFactory ¶
func ScheduleJobsFromGeneratorFactory(op func() <-chan Job) QueueOperation
ScheduleJobsFromGeneratorFactory produces a queue operation that calls a single generator function which returns channel of Jobs and puts those jobs into the queue. The QueueOperation attempts to add all jobs in the slice and returns an error if the Queue.Put opertion failed for any (e.g. continue-on-error semantics). The error returned aggregates all errors encountered.
func ScheduleManyJobsFactory ¶
func ScheduleManyJobsFactory(op func() []Job) QueueOperation
ScheduleManyJobsFactory produces a queue operation that calls a single function which returns a slice of jobs and puts those jobs into the queue. The QueueOperation attempts to add all jobs in the slice and returns an error if the Queue.Put opertion failed for any (e.g. continue-on-error semantics). The error returned aggregates all errors encountered.
type QueueOperationConfig ¶
type QueueOperationConfig struct { ContinueOnError bool `bson:"continue_on_error" json:"continue_on_error" yaml:"continue_on_error"` LogErrors bool `bson:"log_errors" json:"log_errors" yaml:"log_errors"` DebugLogging bool `bson:"debug_logging" json:"debug_logging" yaml:"debug_logging"` RespectThreshold bool `bson:"respect_threshold" json:"respect_threshold" yaml:"respect_threshold"` EnableDuplicateJobReporting bool `bson:"enable_duplicate_reporting" json:"enable_duplicate_reporting" yaml:"enable_duplicate_reporting"` Threshold int `bson:"threshold" json:"threshold" yaml:"threshold"` Jitter time.Duration `bson:"jitter" json:"jitter" yaml:"jitter"` }
QueueOperationConfig describes the behavior of the periodic interval schedulers.
The theshold, if ResepectThreshold is set, causes the periodic scheduler to noop if there are more than that many pending jobs.
type QueueReport ¶
type QueueReport struct { Completed []string `json:"completed"` InProgress []string `json:"in_progress"` Pending []string `json:"pending"` }
QueueReport holds the ids of all tasks in a queue by state.
type QueueStats ¶
type QueueStats struct { Running int `bson:"running" json:"running" yaml:"running"` Completed int `bson:"completed" json:"completed" yaml:"completed"` Pending int `bson:"pending" json:"pending" yaml:"pending"` Blocked int `bson:"blocked" json:"blocked" yaml:"blocked"` Total int `bson:"total" json:"total" yaml:"total"` Context message.Fields `bson:"context,omitempty" json:"context,omitempty" yaml:"context,omitempty"` // contains filtered or unexported fields }
QueueStats is a simple structure that the Stats() method in the Queue interface returns and tracks the state of the queue, and provides a common format for different Queue implementations to report on their state.
Implement's grip's message.Composer interface when passed as a pointer.
func (*QueueStats) Annotate ¶
func (s *QueueStats) Annotate(key string, value interface{})
Annotate is part of the grip/message.Composer interface and allows the logging infrastructure to inject content and context into log messages.
func (QueueStats) IsComplete ¶
func (s QueueStats) IsComplete() bool
IsComplete reutrns true when the total number of tasks are equal to the number completed, or if the number of completed and blocked are greater than or equal to total. This method is used by the Wait<> functions to determine when a queue has completed all actionable work.
func (QueueStats) Loggable ¶
func (s QueueStats) Loggable() bool
Loggable is part of the grip/message.Composer interface and only returns true if the queue has at least one job.
func (QueueStats) Priority ¶
func (s QueueStats) Priority() level.Priority
Priority is part of the grip/message.Composer interface and returns the priority of the message.
func (QueueStats) Raw ¶
func (s QueueStats) Raw() interface{}
Raw is part of the grip/message.Composer interface and simply returns the QueueStats object.
func (QueueStats) Schema ¶
func (QueueStats) Schema() string
Schema definese a version schema for the messages.
func (*QueueStats) SetOption ¶
func (*QueueStats) SetOption(opts ...message.Option)
SetOption is a part of the grip/message.Composer interface but is a noop for QueueStats.
func (*QueueStats) SetPriority ¶
func (s *QueueStats) SetPriority(l level.Priority)
SetPriority is part of the grip/message.Composer interface and allows the caller to configure the piroity of the message.
func (QueueStats) String ¶
func (s QueueStats) String() string
String prints a long form report of the queue for human consumption.
func (*QueueStats) Structured ¶
func (*QueueStats) Structured() bool
Structured indicates that the message type is structured and can be handled using structured logging methods
type Runner ¶
type Runner interface { // Reports if the pool has started. Started() bool // Provides a method to change or set the pointer to the // enclosing Queue object after instance creation. Runner // implementations may not be able to change their Queue // association after starting. SetQueue(Queue) error // Prepares the runner implementation to begin doing work, if // any is required (e.g. starting workers.) Typically called // by the enclosing Queue object's Start() method. Start(context.Context) error // Termaintes all in progress work and waits for processes to // return. Close(context.Context) }
Runner describes a simple worker interface for executing jobs in the context of a Queue. Used by queue implementations to run tasks. Generally Queue implementations will spawn a runner as part of their constructor or Start() methods, but client code can inject alternate Runner implementations, as required.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package dependency contains the Manager interface, along with several implementations for different kinds of dependency checks.
|
Package dependency contains the Manager interface, along with several implementations for different kinds of dependency checks. |
Package job provides tools and generic implementations of jobs for amboy Queues.
|
Package job provides tools and generic implementations of jobs for amboy Queues. |
Package logqueue is a set of implementations to support amboy.Queue backed grip/send.Senders for asynchronous and (generally) non-blocking log message delivery.
|
Package logqueue is a set of implementations to support amboy.Queue backed grip/send.Senders for asynchronous and (generally) non-blocking log message delivery. |
Package management provides increased observability and control of the state of amboy queues.
|
Package management provides increased observability and control of the state of amboy queues. |
Package pool provides specific implementations of the amboy.Runner interface that serve as the worker pools for tasks in work queues.
|
Package pool provides specific implementations of the amboy.Runner interface that serve as the worker pools for tasks in work queues. |
Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.
|
Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations. |
Package registry contains infrastructure to support the persistence of Job definitions.
|
Package registry contains infrastructure to support the persistence of Job definitions. |
x
|
|
amzsqs
Module
|
|
cli
Module
|
|
mdbq
Module
|
|
pgq
Module
|
|
rest
Module
|