Documentation
¶
Overview ¶
Package pgq provides an implementation of a Postgres-backed job queue. Safe concurrency is built on top of the SKIP LOCKED functionality introduced in Postgres 9.5. Retries and exponential backoff are supported.
Index ¶
- func PreserveCompletedJobs(worker *Worker)
- type BackoffError
- type Backoffer
- type DB
- type Durations
- type Job
- type JobOption
- type Worker
- func (worker *Worker) Count(queueNames ...string) (int64, error)
- func (worker *Worker) EnqueueJob(queueName string, data []byte, options ...JobOption) (int, error)
- func (worker *Worker) EnqueueJobInTx(tx DB, queueName string, data []byte, options ...JobOption) (int, error)
- func (worker *Worker) PerformNextJob() (attempted bool, outErr error)
- func (worker *Worker) RegisterQueue(queueName string, jobFunc func([]byte) error) error
- func (worker *Worker) Run(pollingOverride *time.Duration) error
- type WorkerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PreserveCompletedJobs ¶
func PreserveCompletedJobs(worker *Worker)
PreserveCompletedJobs sets the runner option to leave job attempts in the pgq_jobs table instead of deleting them when complete.
Types ¶
type BackoffError ¶
type BackoffError struct {
// contains filtered or unexported fields
}
BackoffError is an implementation of the Backoff interface
func (BackoffError) Backoff ¶
func (b BackoffError) Backoff() bool
Backoff implements the Backoffer interface.
func (BackoffError) Error ¶
func (b BackoffError) Error() string
Error implements the error interface.
type Backoffer ¶
Backoffer is a type of error that can also indicate whether a queue should slow down.
type DB ¶
type DB interface { Exec(string, ...interface{}) (sql.Result, error) QueryRow(string, ...interface{}) *sql.Row }
DB is anything with the DB methods on it that we need. (like a DB or a Tx)
type Durations ¶
Durations is our own alias for time.Duration slices, so we can stick Scan and Value methods on them. End users shouldn't have to use this type at all.
type Job ¶
type Job struct { ID int64 `db:"id"` CreatedAt time.Time `db:"created_at"` QueueName string `db:"queue_name"` Data []byte `db:"data"` RunAfter time.Time `db:"run_after"` RetryWaits Durations `db:"retry_waits"` RanAt null.Time `db:"ran_at"` Error null.String `db:"error"` }
Job contains all the info needed to execute a single job attempt.
type JobOption ¶
type JobOption func(*Job)
A JobOption sets an optional parameter on a Job that you're enqueueing.
func RetryWaits ¶
RetryWaits explicitly sets the wait periods for the Job's retries.
type Worker ¶
type Worker struct { StopChan chan bool // contains filtered or unexported fields }
Worker provides methods for putting jobs on a Postgres-backed queue, and performing any jobs that are there.
func NewWorker ¶
func NewWorker(db *sql.DB, options ...WorkerOption) *Worker
NewWorker takes a Postgres DB connection and returns a Worker instance.
func (*Worker) EnqueueJob ¶
EnqueueJob puts a job on the queue. If successful, it returns the Job ID.
func (*Worker) EnqueueJobInTx ¶
func (worker *Worker) EnqueueJobInTx(tx DB, queueName string, data []byte, options ...JobOption) (int, error)
EnqueueJobInTx enqueues a Job, but lets you provide your own sql.Tx or other compatible object with an Exec method. This is useful if your application has other tables in the same database, and you want to only enqueue the job if all the DB operations in the same transaction are successful. All the handling of Begin, Commit, and Rollback calls is up to you.
func (*Worker) PerformNextJob ¶
PerformNextJob performs the next job in the queue. It returns true if it attempted to run a job, or false if there was no job in the queue or some error prevented it from attempting to run the job. It only returns an error if there's some problem talking to Postgres. Errors inside jobs are not bubbled up.
func (*Worker) RegisterQueue ¶
RegisterQueue tells your Worker instance which function should be called for a given job type.
type WorkerOption ¶
type WorkerOption func(*Worker)
A WorkerOption sets an optional parameter on the Worker.
func JobPollingInterval ¶
func JobPollingInterval(d time.Duration) WorkerOption
JobPollingInterval sets the amount of time that the runner will sleep if it has no jobs to do. Default is 10 seconds.
func OnStop ¶
func OnStop(f func()) WorkerOption
OnStop sets an optional callback function that will be called when the runner exits its Run method.
func SetLogger ¶
func SetLogger(l *log.Logger) WorkerOption
SetLogger allows you to set your own logrus logger object for use by the job worker.