Documentation ¶
Overview ¶
Package queue provides several implementations of the amboy.Queue and amboy.RemoteQueue interfaces capable of processing amboy.Job implementations. It also provides implementations of ancillary queue components, such as the ScopeManager for supporting scopes and the amboy.RetryHandler for supporting retryable queues.
Index ¶
- func NewLocalLimitedSize(workers, capacity int) amboy.Queue
- func NewLocalLimitedSizeSerializable(workers, capacity int) (amboy.RetryableQueue, error)
- func NewLocalLimitedSizeSerializableWithOptions(opts LocalLimitedSizeSerializableOptions) (amboy.RetryableQueue, error)
- func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)
- func NewMongoDBQueue(ctx context.Context, opts MongoDBQueueOptions) (amboy.RetryableQueue, error)
- func NewMongoDBSingleQueueGroup(ctx context.Context, opts MongoDBQueueGroupOptions) (amboy.QueueGroup, error)
- type BasicRetryHandler
- func (rh *BasicRetryHandler) Close(ctx context.Context)
- func (rh *BasicRetryHandler) Put(ctx context.Context, j amboy.Job) error
- func (rh *BasicRetryHandler) SetQueue(q amboy.RetryableQueue) error
- func (rh *BasicRetryHandler) Start(ctx context.Context) error
- func (rh *BasicRetryHandler) Started() bool
- type Dispatcher
- type GroupCache
- type LocalLimitedSizeSerializableOptions
- type LocalQueueGroupOptions
- type LocalQueueOptions
- type MongoDBOptions
- type MongoDBQueueGroupOptions
- type MongoDBQueueOptions
- type PreferredIndexOptions
- type RegexpMongoDBQueueOptions
- type RetryableQueueOptions
- type ScopeManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewLocalLimitedSize ¶
NewLocalLimitedSize constructs a queue instance with the specified number of workers and maximum capacity.
func NewLocalLimitedSizeSerializable ¶
func NewLocalLimitedSizeSerializable(workers, capacity int) (amboy.RetryableQueue, error)
NewLocalLimitedSizeSerializable constructs a local limited-size retryable queue instance with the specified number of workers and maximum capacity.
func NewLocalLimitedSizeSerializableWithOptions ¶
func NewLocalLimitedSizeSerializableWithOptions(opts LocalLimitedSizeSerializableOptions) (amboy.RetryableQueue, error)
NewLocalLimitedSizeSerializableWithOptions constructs a local limited-size retryable queue instance with the given options.
func NewLocalQueueGroup ¶
func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)
NewLocalQueueGroup constructs a new local queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.
func NewMongoDBQueue ¶
func NewMongoDBQueue(ctx context.Context, opts MongoDBQueueOptions) (amboy.RetryableQueue, error)
NewMongoDBQueue builds a new queue that persists jobs to a MongoDB instance. These queues allow workers running in multiple processes to service shared workloads in multiple processes.
func NewMongoDBSingleQueueGroup ¶
func NewMongoDBSingleQueueGroup(ctx context.Context, opts MongoDBQueueGroupOptions) (amboy.QueueGroup, error)
NewMongoDBSingleQueueGroup constructs a new remote queue group. If the TTL is 0, the queues will not be TTLed except when the client explicitly calls Prune.
The MongoDB single remote queue group multiplexes all queues into a single collection.
Types ¶
type BasicRetryHandler ¶
type BasicRetryHandler struct {
// contains filtered or unexported fields
}
BasicRetryHandler implements the amboy.RetryHandler interface. It provides a simple component that can be attached to an amboy.RetryableQueue to support automatically retrying jobs.
func NewBasicRetryHandler ¶
func NewBasicRetryHandler(q amboy.RetryableQueue, opts amboy.RetryHandlerOptions) (*BasicRetryHandler, error)
NewBasicRetryHandler initializes and returns an BasicRetryHandler that can be used as an amboy.RetryHandler implementation.
func (*BasicRetryHandler) Close ¶
func (rh *BasicRetryHandler) Close(ctx context.Context)
Close finishes processing the remaining retrying jobs and cleans up all resources.
func (*BasicRetryHandler) Put ¶
Put adds a new job to be retried. If it is at maximum capacity, it will block until either there is capacity or the context is done. If it has unlimited capacity, it will be added without blocking.
func (*BasicRetryHandler) SetQueue ¶
func (rh *BasicRetryHandler) SetQueue(q amboy.RetryableQueue) error
SetQueue provides a mechanism to swap out amboy.RetryableQueue implementations before it has started processing retryable jobs.
func (*BasicRetryHandler) Start ¶
func (rh *BasicRetryHandler) Start(ctx context.Context) error
Start initiates processing of jobs that need to retry.
func (*BasicRetryHandler) Started ¶
func (rh *BasicRetryHandler) Started() bool
Started returns whether or not the BasicRetryHandler has started processing retryable jobs or not.
type Dispatcher ¶
type Dispatcher interface { // Dispatch allows a single worker to take exclusive ownership of the job // when preparing to run it and during its execution. If this succeeds, // implementations should not allow any other worker to take ownership of // the job unless the job is stranded in progress. Dispatch(context.Context, amboy.Job) error // Release releases the worker's exclusive ownership of the job. Release(context.Context, amboy.Job) // Complete relinquishes the worker's exclusive ownership of the job. It may // optionally update metadata indicating that the job is finished. Complete(context.Context, amboy.Job) // Close cleans up all resources used by the Dispatcher and releases all // actively-dispatched jobs. Jobs should not be dispatchable after this is // called. Close(context.Context) error }
Dispatcher provides a common mechanism shared between queue implementations to handle job locking to prevent multiple workers from running the same job.
func NewDispatcher ¶
func NewDispatcher(q amboy.Queue) Dispatcher
NewDispatcher constructs a default dispatching implementation.
type GroupCache ¶
type GroupCache interface { Set(string, amboy.Queue, time.Duration) error Get(string) amboy.Queue Remove(context.Context, string) error Prune(context.Context) error Close(context.Context) error Names() []string Len() int }
GroupCache provides a common mechanism for managing collections of queues, for use in specific group cache situations
func NewCacheWithCleanupHook ¶
func NewCacheWithCleanupHook(ttl time.Duration, hook func(ctx context.Context, id string) error) GroupCache
NewCacheWithCleanupHook defines a cache but allows implementations to add additional cleanup logic to the prune and Close operations.
func NewGroupCache ¶
func NewGroupCache(ttl time.Duration) GroupCache
NewGroupCache produces a GroupCache implementation that supports a default TTL setting, and supports cloning and closing operations.
type LocalLimitedSizeSerializableOptions ¶
type LocalLimitedSizeSerializableOptions struct { NumWorkers int Capacity int Retryable RetryableQueueOptions }
LocalLimitedSizeSerializableOptions provides options to configure and initialize a limited-size local amboy.RetryableQueue.
func (*LocalLimitedSizeSerializableOptions) Validate ¶
func (opts *LocalLimitedSizeSerializableOptions) Validate() error
Validate checks that the options are valid.
type LocalQueueGroupOptions ¶
type LocalQueueGroupOptions struct { DefaultQueue LocalQueueOptions TTL time.Duration }
LocalQueueGroupOptions describe options passed to NewLocalQueueGroup.
func (*LocalQueueGroupOptions) Validate ¶
func (o *LocalQueueGroupOptions) Validate() error
type LocalQueueOptions ¶
LocalQueueOptions represent options to construct a local queue.
func (*LocalQueueOptions) BuildQueue ¶
func (*LocalQueueOptions) Validate ¶
func (o *LocalQueueOptions) Validate() error
type MongoDBOptions ¶
type MongoDBOptions struct { // Client is the MongoDB client used to connect a MongoDB queue driver to // its MongoDB-backed storage if no MongoDB URI is specified. Either Client // or URI must be set. Client *mongo.Client // URI is used to connect to MongoDB if no client is specified. Either // Client or URI must be set. URI string // DB is the name of the database in which the driver operates. DB string // Collection is the collection name in which the driver manages jobs. Collection string // GroupName is the namespace for the jobs in the queue managed by this // driver when the collection is shared by multiple queues. This is used to // ensure that jobs within each queue are isolated from each other by // namespace. GroupName string // UseGroups determines if the jobs in this collection could be in different // queues. If true, the driver will ensure that jobs are isolated between // the different queues using GroupName, and GroupName must be set. UseGroups bool // CheckWaitUntil determines if jobs that have not met their wait until time // yet should be filtered from consideration for dispatch. If true, any job // whose wait until constraint has not been reached yet will be filtered. CheckWaitUntil bool // CheckDispatchBy determines if jobs that have already exceeded their // dispatch by deadline should be filtered from consideration for dispatch. // If true, any job whose dispatch by deadline has been passed will be // filtered. CheckDispatchBy bool // SkipQueueIndexBuilds determines if indexes required for regular queue // operations should be built before using the driver. SkipQueueIndexBuilds bool // SkipReportingIndexBuilds determines if indexes related to reporting job // state should be built before using the driver. SkipReportingIndexBuilds bool // PreferredIndexes allows users to explicitly use a particular index to // fulfill driver queries. PreferredIndexes PreferredIndexOptions // Format is the internal format used to store jobs in the DB. The default // value is amboy.BSON. Format amboy.Format // WaitInterval is the duration that the driver will wait in between checks // for the next available job when no job is currently available to // dispatch. WaitInterval time.Duration // TTL sets the number of seconds for a TTL index on the "info.created" // field. If set to zero, the TTL index will not be created and // and documents may live forever in the database. TTL time.Duration // LockTimeout determines how long the queue will wait for a job that's // already been dispatched to finish without receiving a lock ping // indicating liveliness. If the lock timeout has been exceeded without a // lock ping, it will be considered stale and will be re-dispatched. If set, // this overrides the default job lock timeout. LockTimeout time.Duration // SampleSize is the maximum number of jobs per set of jobs checked. // If it samples from the available jobs, the order of next jobs are randomized. // By default, the driver does not sample from the next available jobs. SampleSize int }
MongoDBOptions represents options for creating a MongoDB driver to communicate MongoDB-specific settings about the driver's behavior and operation.
func DefaultMongoDBOptions ¶
func DefaultMongoDBOptions() MongoDBOptions
DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database.
func (*MongoDBOptions) Validate ¶
func (opts *MongoDBOptions) Validate() error
Validate validates that the required options are given and sets fields that are unspecified and have a default value.
type MongoDBQueueGroupOptions ¶
type MongoDBQueueGroupOptions struct { // DefaultQueue represents default options for queues in the queue group. // These can be optionally overridden at the individual queue level. DefaultQueue MongoDBQueueOptions // RegexpQueue represents options for queues by IDs that match a regular // expression. These take precedence over the DefaultQueue options but have // lower precedence than the PerQueue options. RegexpQueue []RegexpMongoDBQueueOptions // PerQueue represent options for specific queues by ID. These take // precedence over the DefaultQueue and RegexpQueue options. PerQueue map[string]MongoDBQueueOptions // PruneFrequency is how often inactive queues are checked to see if they // can be pruned. PruneFrequency time.Duration // BackgroundCreateFrequency is how often active queues can have their TTLs // periodically refreshed in the background. A queue is active as long as it // either still has jobs to complete or the most recently completed job // finished within the TTL. This is useful in case a queue still has jobs to // process but a user does not explicitly access the queue - if the goal is // to ensure a queue is never pruned when it still has jobs to complete, // this should be set to a value lower than the TTL. BackgroundCreateFrequency time.Duration // TTL determines how long a queue is considered active without performing // useful work for being accessed by a user. After the TTL has elapsed, the // queue is allowed to be pruned. TTL time.Duration }
MongoDBQueueGroupOptions describe options to create a queue group backed by MongoDB.
type MongoDBQueueOptions ¶
type MongoDBQueueOptions struct { // DB represents options for the MongoDB driver. DB *MongoDBOptions // NumWorkers is the default number of workers the queue should use. This // has lower precedence than WorkerPoolSize. NumWorkers *int // WorkerPoolSize returns the number of workers the queue should use. If // set, this takes precedence over NumWorkers. WorkerPoolSize func(string) int // Abortable indicates whether executing jobs can be aborted. Abortable *bool // Retryable represents options to retry jobs after they complete. Retryable *RetryableQueueOptions }
MongoDBQueueOptions represent options to create a queue that stores jobs in a persistence layer to support distributed systems of workers.
func (*MongoDBQueueOptions) BuildQueue ¶
BuildQueue constructs a MongoDB-backed remote queue from the queue options.
func (*MongoDBQueueOptions) Validate ¶
func (o *MongoDBQueueOptions) Validate() error
Validate checks that the given queue options are valid.
type PreferredIndexOptions ¶
type PreferredIndexOptions struct { // NextJob determines the index pattern that will be used for requesting the // next job in the queue. // For the queue group, the driver will implicitly include the group, so it // does not need to be included in the index pattern. NextJob bson.D }
PreferredIndexOptions provide options to explicitly set the index for use in specific scenarios. If an index is not explicitly given, the index will be picked automatically.
type RegexpMongoDBQueueOptions ¶
type RegexpMongoDBQueueOptions struct { // Regexp is the regular expression to match against the queue ID. Regexp regexp.Regexp // Options are the queue options to apply to matching queues. Options MongoDBQueueOptions }
RegexpMongoDBQueueOptions represents a mapping from a regular expression to match named queues in a queue group to options for those queues.
type RetryableQueueOptions ¶
type RetryableQueueOptions struct { // RetryHandler are options to configure how retryable jobs are handled. RetryHandler amboy.RetryHandlerOptions // StaleRetryingMonitorInterval is how often a queue periodically checks for // stale retrying jobs. StaleRetryingMonitorInterval time.Duration }
RetryableQueueOptions represent common options to configure an amboy.RetryableQueue.
func (*RetryableQueueOptions) Validate ¶
func (opts *RetryableQueueOptions) Validate() error
type ScopeManager ¶
type ScopeManager interface { Acquire(owner string, scopes []string) error Release(owner string, scopes []string) error ReleaseAndAcquire(ownerToRelease string, scopesToRelease []string, ownerToAcquire string, scopesToAcquire []string) error }
ScopeManager provides a service to queue implementations to support additional locking semantics for queues that cannot push that into their backing storage.
func NewLocalScopeManager ¶
func NewLocalScopeManager() ScopeManager
NewLocalScopeManager constructs a ScopeManager implementation suitable for use in most local (in memory) queue implementations.