pgq

package module
v0.0.0-...-098c36e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanDatabase

func CleanDatabase(ctx context.Context, db *sqlx.DB, schemaName string) error

CleanDatabase removes all database tables related to the "jobs" table in the specified schema.

func NewGroup

func NewGroup(ctx context.Context, db *sqlx.DB, opts Options, gopts GroupOptions) (amboy.QueueGroup, error)

NewGroup constructs a new SQL backed group queue and starts the background queue creation and pruning so that queues started on one process eventually end up on other processes, and that empty queues eventually release resources.

Group queues reduce the overall cost of creating a new queues, and make it possible to isolate workloads from each other.

The SQL backed queue is implemented such that single queues and group queues can coexist in the same database and underlying tables though there are some (minor and pathological) cases where this may behave unexpectedly. (e.g. non group jobs with prefixes could theoretically block similar group queue job, in a way that could never happen with multiple group queues.)

func NewManager

func NewManager(db *sqlx.DB, opts ManagerOptions) management.Manager

NewManager constructs a manager instance that interacts with the job data.

func NewQueue

func NewQueue(ctx context.Context, db *sqlx.DB, opts Options) (amboy.Queue, error)

NewQueue produces a new SQL-database backed queue. Broadly similar to the MongoDB implementation, this queue is available only in "unordered" variant (e.g. dependencies are not considered in dispatching order,) but can respect settings including: scopes, priority, WaitUntil, DispatchBy.

All job implementations *must* be JSON serializable, and the queue implementation assumes that the dependency Manager (and its edges) are immutable after the job is Put into the queue. Similarly, jobs must treat the Error array in the amboy.JobStatuseInfo as append-only.

Be aware, that in the current release this implementation will log warnings if some job metadata is above maxint32, (e.g. priority, error count), and error if more critical values are above this threshold (e.g. mod Count and version). Also because MaxTime is stored internally as an int32 of milliseconds (maximum ~500 hours), rather than go's native 64bit integer of nanoseconds, attempting to set longer maxtimes results in an error.

func PrepareDatabase

func PrepareDatabase(ctx context.Context, db *sqlx.DB, schemaName string) error

PrepareDatabase creates the schema, tables, and indexes in the given schema to support operational separation of database/queue configuration and construction queue objects.

Types

type GroupOptions

type GroupOptions struct {
	// Abortable controls if the queue will use an abortable pool
	// imlementation. The DefaultWorkers options sets the default number
	// of workers new queues will have if the WorkerPoolSize
	// function is not set.
	Abortable      bool
	DefaultWorkers int

	// WorkerPoolSize determines how many works will be allocated
	// to each queue, based on the queue ID passed to it.
	WorkerPoolSize func(string) int

	// PruneFrequency is how often Prune runs by default.
	PruneFrequency time.Duration

	// BackgroundCreateFrequency is how often the background queue
	// creation runs, in the case that queues may be created in
	// the background without
	BackgroundCreateFrequency time.Duration

	// The BackgroundOperation values control how often and at
	// what level to log at.  These default to 5 successive errors
	// and the level of "Warning" if unset.
	BackgroundOperationErrorCountThreshold int
	BackgroundOperationErrorLogLevel       level.Priority

	// TTL is how old the oldest task in the queue must be for the collection to be pruned.
	TTL time.Duration
}

GroupOptions controls the behavior of the amboy.QueueGroup implementation.

type ManagerOptions

type ManagerOptions struct {
	SingleGroup bool
	ByGroups    bool
	Options     Options
}

ManagerOptions control the behavior of the Manager implementation, particularly with regards to group handling. Also contains a nested queue options for additional relevant settings.

func (*ManagerOptions) Validate

func (o *ManagerOptions) Validate() error

type Options

type Options struct {
	SchemaName      string
	Name            string
	GroupName       string
	UseGroups       bool
	Priority        bool
	CheckWaitUntil  bool
	CheckDispatchBy bool
	SkipBootstrap   bool
	PoolSize        int

	// When true, ordered forces the queue to respect dependencies
	// of jobs. This can slow dispatching considerably,
	// particularly if jobs define cyclic or incomplete dependency chains.
	Ordered bool

	// Number of times the Complete operation should
	// retry. Previously defaulted to 10, and settings of 1 or 2
	// are reasonable.
	CompleteRetries int

	// LockTimeout overrides the default job lock timeout if set.
	WaitInterval time.Duration
	LockTimeout  time.Duration
	Logger       grip.Logger
}

Options describe the

func (*Options) Validate

func (opts *Options) Validate() error

Validate ensures that all options are reasonable, and will override and set default options where possible.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL