queue

package
v0.0.0-...-dec677a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2024 License: Apache-2.0 Imports: 22 Imported by: 32

Documentation

Overview

Package queue provides several implementations of the amboy.Queue and amboy.RemoteQueue interfaces capable of processing amboy.Job implementations. It also provides implementations of ancillary queue components, such as the ScopeManager for supporting scopes and the amboy.RetryHandler for supporting retryable queues.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLocalLimitedSize

func NewLocalLimitedSize(workers, capacity int) amboy.Queue

NewLocalLimitedSize constructs a queue instance with the specified number of workers and maximum capacity.

func NewLocalLimitedSizeSerializable

func NewLocalLimitedSizeSerializable(workers, capacity int) (amboy.RetryableQueue, error)

NewLocalLimitedSizeSerializable constructs a local limited-size retryable queue instance with the specified number of workers and maximum capacity.

func NewLocalLimitedSizeSerializableWithOptions

func NewLocalLimitedSizeSerializableWithOptions(opts LocalLimitedSizeSerializableOptions) (amboy.RetryableQueue, error)

NewLocalLimitedSizeSerializableWithOptions constructs a local limited-size retryable queue instance with the given options.

func NewLocalQueueGroup

func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)

NewLocalQueueGroup constructs a new local queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewMongoDBQueue

func NewMongoDBQueue(ctx context.Context, opts MongoDBQueueOptions) (amboy.RetryableQueue, error)

NewMongoDBQueue builds a new queue that persists jobs to a MongoDB instance. These queues allow workers running in multiple processes to service shared workloads in multiple processes.

func NewMongoDBSingleQueueGroup

func NewMongoDBSingleQueueGroup(ctx context.Context, opts MongoDBQueueGroupOptions) (amboy.QueueGroup, error)

NewMongoDBSingleQueueGroup constructs a new remote queue group. If the TTL is 0, the queues will not be TTLed except when the client explicitly calls Prune.

The MongoDB single remote queue group multiplexes all queues into a single collection.

Types

type BasicRetryHandler

type BasicRetryHandler struct {
	// contains filtered or unexported fields
}

BasicRetryHandler implements the amboy.RetryHandler interface. It provides a simple component that can be attached to an amboy.RetryableQueue to support automatically retrying jobs.

func NewBasicRetryHandler

func NewBasicRetryHandler(q amboy.RetryableQueue, opts amboy.RetryHandlerOptions) (*BasicRetryHandler, error)

NewBasicRetryHandler initializes and returns an BasicRetryHandler that can be used as an amboy.RetryHandler implementation.

func (*BasicRetryHandler) Close

func (rh *BasicRetryHandler) Close(ctx context.Context)

Close finishes processing the remaining retrying jobs and cleans up all resources.

func (*BasicRetryHandler) Put

func (rh *BasicRetryHandler) Put(ctx context.Context, j amboy.Job) error

Put adds a new job to be retried. If it is at maximum capacity, it will block until either there is capacity or the context is done. If it has unlimited capacity, it will be added without blocking.

func (*BasicRetryHandler) SetQueue

func (rh *BasicRetryHandler) SetQueue(q amboy.RetryableQueue) error

SetQueue provides a mechanism to swap out amboy.RetryableQueue implementations before it has started processing retryable jobs.

func (*BasicRetryHandler) Start

func (rh *BasicRetryHandler) Start(ctx context.Context) error

Start initiates processing of jobs that need to retry.

func (*BasicRetryHandler) Started

func (rh *BasicRetryHandler) Started() bool

Started returns whether or not the BasicRetryHandler has started processing retryable jobs or not.

type Dispatcher

type Dispatcher interface {
	// Dispatch allows a single worker to take exclusive ownership of the job
	// when preparing to run it and during its execution. If this succeeds,
	// implementations should not allow any other worker to take ownership of
	// the job unless the job is stranded in progress.
	Dispatch(context.Context, amboy.Job) error
	// Release releases the worker's exclusive ownership of the job.
	Release(context.Context, amboy.Job)
	// Complete relinquishes the worker's exclusive ownership of the job. It may
	// optionally update metadata indicating that the job is finished.
	Complete(context.Context, amboy.Job)
	// Close cleans up all resources used by the Dispatcher and releases all
	// actively-dispatched jobs. Jobs should not be dispatchable after this is
	// called.
	Close(context.Context) error
}

Dispatcher provides a common mechanism shared between queue implementations to handle job locking to prevent multiple workers from running the same job.

func NewDispatcher

func NewDispatcher(q amboy.Queue) Dispatcher

NewDispatcher constructs a default dispatching implementation.

type GroupCache

type GroupCache interface {
	Set(string, amboy.Queue, time.Duration) error
	Get(string) amboy.Queue
	Remove(context.Context, string) error
	Prune(context.Context) error
	Close(context.Context) error
	Names() []string
	Len() int
}

GroupCache provides a common mechanism for managing collections of queues, for use in specific group cache situations

func NewCacheWithCleanupHook

func NewCacheWithCleanupHook(ttl time.Duration, hook func(ctx context.Context, id string) error) GroupCache

NewCacheWithCleanupHook defines a cache but allows implementations to add additional cleanup logic to the prune and Close operations.

func NewGroupCache

func NewGroupCache(ttl time.Duration) GroupCache

NewGroupCache produces a GroupCache implementation that supports a default TTL setting, and supports cloning and closing operations.

type LocalLimitedSizeSerializableOptions

type LocalLimitedSizeSerializableOptions struct {
	NumWorkers int
	Capacity   int
	Retryable  RetryableQueueOptions
}

LocalLimitedSizeSerializableOptions provides options to configure and initialize a limited-size local amboy.RetryableQueue.

func (*LocalLimitedSizeSerializableOptions) Validate

func (opts *LocalLimitedSizeSerializableOptions) Validate() error

Validate checks that the options are valid.

type LocalQueueGroupOptions

type LocalQueueGroupOptions struct {
	DefaultQueue LocalQueueOptions
	TTL          time.Duration
}

LocalQueueGroupOptions describe options passed to NewLocalQueueGroup.

func (*LocalQueueGroupOptions) Validate

func (o *LocalQueueGroupOptions) Validate() error

type LocalQueueOptions

type LocalQueueOptions struct {
	Constructor func(ctx context.Context) (amboy.Queue, error)
}

LocalQueueOptions represent options to construct a local queue.

func (*LocalQueueOptions) BuildQueue

func (o *LocalQueueOptions) BuildQueue(ctx context.Context) (amboy.Queue, error)

func (*LocalQueueOptions) Validate

func (o *LocalQueueOptions) Validate() error

type MongoDBOptions

type MongoDBOptions struct {
	// Client is the MongoDB client used to connect a MongoDB queue driver to
	// its MongoDB-backed storage if no MongoDB URI is specified. Either Client
	// or URI must be set.
	Client *mongo.Client
	// URI is used to connect to MongoDB if no client is specified. Either
	// Client or URI must be set.
	URI string
	// DB is the name of the database in which the driver operates.
	DB string
	// Collection is the collection name in which the driver manages jobs.
	Collection string
	// GroupName is the namespace for the jobs in the queue managed by this
	// driver when the collection is shared by multiple queues. This is used to
	// ensure that jobs within each queue are isolated from each other by
	// namespace.
	GroupName string
	// UseGroups determines if the jobs in this collection could be in different
	// queues. If true, the driver will ensure that jobs are isolated between
	// the different queues using GroupName, and GroupName must be set.
	UseGroups bool
	// CheckWaitUntil determines if jobs that have not met their wait until time
	// yet should be filtered from consideration for dispatch. If true, any job
	// whose wait until constraint has not been reached yet will be filtered.
	CheckWaitUntil bool
	// CheckDispatchBy determines if jobs that have already exceeded their
	// dispatch by deadline should be filtered from consideration for dispatch.
	// If true, any job whose dispatch by deadline has been passed will be
	// filtered.
	CheckDispatchBy bool
	// SkipQueueIndexBuilds determines if indexes required for regular queue
	// operations should be built before using the driver.
	SkipQueueIndexBuilds bool
	// SkipReportingIndexBuilds determines if indexes related to reporting job
	// state should be built before using the driver.
	SkipReportingIndexBuilds bool
	// PreferredIndexes allows users to explicitly use a particular index to
	// fulfill driver queries.
	PreferredIndexes PreferredIndexOptions
	// Format is the internal format used to store jobs in the DB. The default
	// value is amboy.BSON.
	Format amboy.Format
	// WaitInterval is the duration that the driver will wait in between checks
	// for the next available job when no job is currently available to
	// dispatch.
	WaitInterval time.Duration
	// TTL sets the number of seconds for a TTL index on the "info.created"
	// field. If set to zero, the TTL index will not be created and
	// and documents may live forever in the database.
	TTL time.Duration
	// LockTimeout determines how long the queue will wait for a job that's
	// already been dispatched to finish without receiving a lock ping
	// indicating liveliness. If the lock timeout has been exceeded without a
	// lock ping, it will be considered stale and will be re-dispatched. If set,
	// this overrides the default job lock timeout.
	LockTimeout time.Duration
	// SampleSize is the maximum number of jobs per set of jobs checked.
	// If it samples from the available jobs, the order of next jobs are randomized.
	// By default, the driver does not sample from the next available jobs.
	SampleSize int
}

MongoDBOptions represents options for creating a MongoDB driver to communicate MongoDB-specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database.

func (*MongoDBOptions) Validate

func (opts *MongoDBOptions) Validate() error

Validate validates that the required options are given and sets fields that are unspecified and have a default value.

type MongoDBQueueGroupOptions

type MongoDBQueueGroupOptions struct {
	// DefaultQueue represents default options for queues in the queue group.
	// These can be optionally overridden at the individual queue level.
	DefaultQueue MongoDBQueueOptions

	// RegexpQueue represents options for queues by IDs that match a regular
	// expression. These take precedence over the DefaultQueue options but have
	// lower precedence than the PerQueue options.
	RegexpQueue []RegexpMongoDBQueueOptions

	// PerQueue represent options for specific queues by ID. These take
	// precedence over the DefaultQueue and RegexpQueue options.
	PerQueue map[string]MongoDBQueueOptions

	// PruneFrequency is how often inactive queues are checked to see if they
	// can be pruned.
	PruneFrequency time.Duration

	// BackgroundCreateFrequency is how often active queues can have their TTLs
	// periodically refreshed in the background. A queue is active as long as it
	// either still has jobs to complete or the most recently completed job
	// finished within the TTL. This is useful in case a queue still has jobs to
	// process but a user does not explicitly access the queue - if the goal is
	// to ensure a queue is never pruned when it still has jobs to complete,
	// this should be set to a value lower than the TTL.
	BackgroundCreateFrequency time.Duration

	// TTL determines how long a queue is considered active without performing
	// useful work for being accessed by a user. After the TTL has elapsed, the
	// queue is allowed to be pruned.
	TTL time.Duration
}

MongoDBQueueGroupOptions describe options to create a queue group backed by MongoDB.

type MongoDBQueueOptions

type MongoDBQueueOptions struct {
	// DB represents options for the MongoDB driver.
	DB *MongoDBOptions
	// NumWorkers is the default number of workers the queue should use. This
	// has lower precedence than WorkerPoolSize.
	NumWorkers *int
	// WorkerPoolSize returns the number of workers the queue should use. If
	// set, this takes precedence over NumWorkers.
	WorkerPoolSize func(string) int
	// Abortable indicates whether executing jobs can be aborted.
	Abortable *bool
	// Retryable represents options to retry jobs after they complete.
	Retryable *RetryableQueueOptions
}

MongoDBQueueOptions represent options to create a queue that stores jobs in a persistence layer to support distributed systems of workers.

func (*MongoDBQueueOptions) BuildQueue

func (o *MongoDBQueueOptions) BuildQueue(ctx context.Context) (amboy.Queue, error)

BuildQueue constructs a MongoDB-backed remote queue from the queue options.

func (*MongoDBQueueOptions) Validate

func (o *MongoDBQueueOptions) Validate() error

Validate checks that the given queue options are valid.

type PreferredIndexOptions

type PreferredIndexOptions struct {
	// NextJob determines the index pattern that will be used for requesting the
	// next job in the queue.
	// For the queue group, the driver will implicitly include the group, so it
	// does not need to be included in the index pattern.
	NextJob bson.D
}

PreferredIndexOptions provide options to explicitly set the index for use in specific scenarios. If an index is not explicitly given, the index will be picked automatically.

type RegexpMongoDBQueueOptions

type RegexpMongoDBQueueOptions struct {
	// Regexp is the regular expression to match against the queue ID.
	Regexp regexp.Regexp
	// Options are the queue options to apply to matching queues.
	Options MongoDBQueueOptions
}

RegexpMongoDBQueueOptions represents a mapping from a regular expression to match named queues in a queue group to options for those queues.

type RetryableQueueOptions

type RetryableQueueOptions struct {
	// RetryHandler are options to configure how retryable jobs are handled.
	RetryHandler amboy.RetryHandlerOptions
	// StaleRetryingMonitorInterval is how often a queue periodically checks for
	// stale retrying jobs.
	StaleRetryingMonitorInterval time.Duration
}

RetryableQueueOptions represent common options to configure an amboy.RetryableQueue.

func (*RetryableQueueOptions) Validate

func (opts *RetryableQueueOptions) Validate() error

type ScopeManager

type ScopeManager interface {
	Acquire(owner string, scopes []string) error
	Release(owner string, scopes []string) error
	ReleaseAndAcquire(ownerToRelease string, scopesToRelease []string, ownerToAcquire string, scopesToAcquire []string) error
}

ScopeManager provides a service to queue implementations to support additional locking semantics for queues that cannot push that into their backing storage.

func NewLocalScopeManager

func NewLocalScopeManager() ScopeManager

NewLocalScopeManager constructs a ScopeManager implementation suitable for use in most local (in memory) queue implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL