Documentation ¶
Overview ¶
Package pgxjob provides a job runner using PostgreSQL.
Index ¶
- Constants
- func LogFinalJobRuns(worker *Worker, job *Job, startTime, endTime time.Time, err error) bool
- type AcquireConnFunc
- type DB
- type ErrorFilter
- type ErrorWithRetry
- type FilterErrorFunc
- type Job
- type JobGroup
- type JobSchedule
- type JobType
- type JobTypeConfig
- type RetryLinearBackoffErrorFilter
- type RunJobFunc
- type Scheduler
- func (m *Scheduler) Schedule(ctx context.Context, db DB, jobTypeName string, jobParams any, ...) error
- func (m *Scheduler) ScheduleNow(ctx context.Context, db DB, jobTypeName string, jobParams any) error
- func (m *Scheduler) StartWorker(config *WorkerConfig) (*Worker, error)
- func (s *Scheduler) WithContext(ctx context.Context) context.Context
- type SchedulerConfig
- type Worker
- type WorkerConfig
Constants ¶
const PGNotifyChannel = "pgxjob_job_available"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AcquireConnFunc ¶
AcquireConnFunc is a function that acquires a database connection for exclusive use. It returns a release function that will be called when the connection is no longer needed.
func AcquireConnFuncFromPool ¶
func AcquireConnFuncFromPool(pool *pgxpool.Pool) AcquireConnFunc
AcquireConnFuncFromPool returns an AcquireConnFunc that acquires connections from the given *pgxpool.Pool.
type DB ¶
type DB interface { Begin(ctx context.Context) (pgx.Tx, error) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row SendBatch(ctx context.Context, b *pgx.Batch) (br pgx.BatchResults) }
DB is the type pgxjob uses to interact with the database when it does not specifically need a *pgx.Conn.
type ErrorFilter ¶
type ErrorWithRetry ¶
func (*ErrorWithRetry) Error ¶
func (e *ErrorWithRetry) Error() string
func (*ErrorWithRetry) Unwrap ¶
func (e *ErrorWithRetry) Unwrap() error
type FilterErrorFunc ¶
func (FilterErrorFunc) FilterError ¶
func (f FilterErrorFunc) FilterError(job *Job, jobErr error) error
type JobSchedule ¶
type JobSchedule struct { // GroupName is the name of the group to use when enqueuing the job. If not set the job type's default group is used. GroupName string // RunAt is the time to run the job. If not set the job is scheduled to run immediately. RunAt time.Time }
JobSchedule is a schedule for a job.
type JobType ¶
type JobType struct { // ID is the ID of the job type. It is set automatically. ID int32 // Name is the name of the job type. Name string // DefaultGroup is the default group to use when enqueuing jobs of this type. DefaultGroup *JobGroup // RunJob is the function that will be called when a job of this type is run. RunJob func(ctx context.Context, job *Job) error }
JobType is a type of job.
type JobTypeConfig ¶
type JobTypeConfig struct { // Name is the name of the job type. It must be set and unique. Name string // DefaultGroupName is the name of the default group to use when enqueuing jobs of this type. If not set "default" is // used. DefaultGroupName string // RunJob is the function that will be called when a job of this type is run. It must be set. RunJob RunJobFunc }
type RetryLinearBackoffErrorFilter ¶
type RetryLinearBackoffErrorFilter struct { // MaxRetries is the maximum number of times to retry. MaxRetries int32 // BaseDelay is the amount of time to wait before the first retry. The wait time will increase by BaseDelay for each // retry. BaseDelay time.Duration }
RetryLinearBackoffErrorFilter is an ErrorFilter that returns an ErrorWithRetry if the job should be retried. It uses a linear backoff to determine when to schedule the retries.
func (*RetryLinearBackoffErrorFilter) FilterError ¶
func (f *RetryLinearBackoffErrorFilter) FilterError(job *Job, jobErr error) error
FilterError returns an ErrorWithRetry if the job should be retried. If the error is already an ErrorWithRetry then it is returned unmodified. If the job should not be retried then the original error is returned.
type RunJobFunc ¶
func FilterError ¶
func FilterError(runJob RunJobFunc, errorFilter ErrorFilter) RunJobFunc
FilterError returns a RunJobFunc that calls runJob. If runJob returns an error then it calls filterError and returns its error. filterError is typically used to determine if the error should be retried or not.
func UnmarshalParams ¶
UnmarshalParams returns a JobType.RunJob function that unmarshals job.Params into a T and calls fn.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is used to schedule jobs and start workers.
var DefaultContextScheduler *Scheduler
DefaultContextScheduler is the default scheduler. It is returned by Ctx when no scheduler is set in the context. It must be set before use.
func Ctx ¶
Ctx returns the *Scheduler attached to ctx. If ctx does not have a *Scheduler attached then it returns DefaultContextScheduler.
func NewScheduler ¶
func NewScheduler(config *SchedulerConfig) (*Scheduler, error)
NewScheduler returns a new Scheduler.
func (*Scheduler) Schedule ¶
func (m *Scheduler) Schedule(ctx context.Context, db DB, jobTypeName string, jobParams any, schedule JobSchedule) error
Schedule schedules a job to be run according to schedule.
func (*Scheduler) ScheduleNow ¶
func (m *Scheduler) ScheduleNow(ctx context.Context, db DB, jobTypeName string, jobParams any) error
ScheduleNow schedules a job to be run immediately.
func (*Scheduler) StartWorker ¶
func (m *Scheduler) StartWorker(config *WorkerConfig) (*Worker, error)
StartWorker starts a worker. The *Worker is returned immediately, but the startup process is run in the background. This is to avoid blocking or returning an error if the database is temporarily unavailable. Use StartupComplete if it is necessary to wait for the worker to be ready.
type SchedulerConfig ¶
type SchedulerConfig struct { // AcquireConn is used to get a connection to the database. It must be set. AcquireConn AcquireConnFunc // JobGroups is a lists of job groups that can be used by the scheduler. The job group "default" is always available. JobGroups []string // JobTypes is a list of job types that can be used by the scheduler. It must be set. JobTypes []*JobTypeConfig // HandleError is a function that is called when an error occurs that cannot be handled or returned. For example, a // network outage may cause a worker to be unable to fetch a job or record the outcome of an execution. The worker // should not be stopped because of this. Instead, it should try again later when the network may have been restored. // These types of errors are passed to HandleError. If not set errors are logged to stderr. HandleError func(err error) }
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) HandleNotification ¶
func (w *Worker) HandleNotification(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error
HandleNotification implements the pgxlisten.Handler interface. This allows a Worker to be used as a pgxlisten.Listener. When it receives a notification for the worker's job group it calls Signal.
func (*Worker) ID ¶
ID gets the worker's ID. This is only valid after the worker startup has completed. This is guaranteed while processing a job, but not immediately after StartWorker has returned. Use StartupComplete to wait for the worker to start.
func (*Worker) Shutdown ¶
Shutdown stops the worker. It waits for all jobs to finish before returning. Cancel ctx to force shutdown without waiting for jobs to finish or worker to cleanup.
func (*Worker) Signal ¶
func (w *Worker) Signal()
Signal causes the worker to wake up and process requests. It is safe to call this from multiple goroutines. It does not block.
func (*Worker) StartupComplete ¶
func (w *Worker) StartupComplete() <-chan struct{}
StartupComplete returns a channel that is closed when the worker start is complete.
type WorkerConfig ¶
type WorkerConfig struct { // GroupName is the group to work. If empty, "default" is used. GroupName string // MaxConcurrentJobs is the maximum number of jobs to work concurrently. If not set 10 is used. MaxConcurrentJobs int // MaxPrefetchedJobs is the maximum number of prefetched jobs (i.e. jobs that are fetched from the database and // locked, but not yet being worked). If not set 1000 is used. MaxPrefetchedJobs int // PollInterval is the interval between polling for new jobs. If not set 10 seconds is used. PollInterval time.Duration // MaxBufferedJobResults is the maximum number of job results that can be buffered before the job results must be // flushed to the database. If not set 100 is used. MaxBufferedJobResults int // MaxBufferedJobResultAge is the maximum age of a buffered job result before the job results must be flushed to the // database. If not set 1 second is used. MaxBufferedJobResultAge time.Duration // ShouldLogJobRun is called for every job run. If it returns true then the run is logged to the pgxjob_job_runs // table. If it returns false it is not. If not set all job runs are logged. ShouldLogJobRun func(worker *Worker, job *Job, startTime, endTime time.Time, err error) bool // contains filtered or unexported fields }