pgxjob

package module
v0.0.0-...-3d38343 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package pgxjob provides a job runner using PostgreSQL.

Index

Constants

View Source
const PGNotifyChannel = "pgxjob_job_available"

Variables

This section is empty.

Functions

func LogFinalJobRuns

func LogFinalJobRuns(worker *Worker, job *Job, startTime, endTime time.Time, err error) bool

LogFinalJobRuns is a ShouldLogJobRun function that returns true for the final run of a job. That is, the run was successful or it failed and will not try again.

Types

type AcquireConnFunc

type AcquireConnFunc func(ctx context.Context) (conn *pgxpool.Conn, release func(), err error)

AcquireConnFunc is a function that acquires a database connection for exclusive use. It returns a release function that will be called when the connection is no longer needed.

func AcquireConnFuncFromPool

func AcquireConnFuncFromPool(pool *pgxpool.Pool) AcquireConnFunc

AcquireConnFuncFromPool returns an AcquireConnFunc that acquires connections from the given *pgxpool.Pool.

type DB

type DB interface {
	Begin(ctx context.Context) (pgx.Tx, error)
	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
	Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
	SendBatch(ctx context.Context, b *pgx.Batch) (br pgx.BatchResults)
}

DB is the type pgxjob uses to interact with the database when it does not specifically need a *pgx.Conn.

type ErrorFilter

type ErrorFilter interface {
	FilterError(job *Job, jobErr error) error
}

type ErrorWithRetry

type ErrorWithRetry struct {
	Err     error
	RetryAt time.Time
}

func (*ErrorWithRetry) Error

func (e *ErrorWithRetry) Error() string

func (*ErrorWithRetry) Unwrap

func (e *ErrorWithRetry) Unwrap() error

type FilterErrorFunc

type FilterErrorFunc func(job *Job, jobErr error) error

func (FilterErrorFunc) FilterError

func (f FilterErrorFunc) FilterError(job *Job, jobErr error) error

type Job

type Job struct {
	ID         int64
	Group      *JobGroup
	Type       *JobType
	Params     []byte
	InsertedAt time.Time
	RunAt      time.Time
	LastError  string
	ErrorCount int32
	ASAP       bool
}

type JobGroup

type JobGroup struct {
	ID   int32
	Name string
}

type JobSchedule

type JobSchedule struct {
	// GroupName is the name of the group to use when enqueuing the job. If not set the job type's default group is used.
	GroupName string

	// RunAt is the time to run the job. If not set the job is scheduled to run immediately.
	RunAt time.Time
}

JobSchedule is a schedule for a job.

type JobType

type JobType struct {
	// ID is the ID of the job type. It is set automatically.
	ID int32

	// Name is the name of the job type.
	Name string

	// DefaultGroup is the default group to use when enqueuing jobs of this type.
	DefaultGroup *JobGroup

	// RunJob is the function that will be called when a job of this type is run.
	RunJob func(ctx context.Context, job *Job) error
}

JobType is a type of job.

type JobTypeConfig

type JobTypeConfig struct {
	// Name is the name of the job type. It must be set and unique.
	Name string

	// DefaultGroupName is the name of the default group to use when enqueuing jobs of this type. If not set "default" is
	// used.
	DefaultGroupName string

	// RunJob is the function that will be called when a job of this type is run. It must be set.
	RunJob RunJobFunc
}

type RetryLinearBackoffErrorFilter

type RetryLinearBackoffErrorFilter struct {
	// MaxRetries is the maximum number of times to retry.
	MaxRetries int32

	// BaseDelay is the amount of time to wait before the first retry. The wait time will increase by BaseDelay for each
	// retry.
	BaseDelay time.Duration
}

RetryLinearBackoffErrorFilter is an ErrorFilter that returns an ErrorWithRetry if the job should be retried. It uses a linear backoff to determine when to schedule the retries.

func (*RetryLinearBackoffErrorFilter) FilterError

func (f *RetryLinearBackoffErrorFilter) FilterError(job *Job, jobErr error) error

FilterError returns an ErrorWithRetry if the job should be retried. If the error is already an ErrorWithRetry then it is returned unmodified. If the job should not be retried then the original error is returned.

type RunJobFunc

type RunJobFunc func(ctx context.Context, job *Job) error

func FilterError

func FilterError(runJob RunJobFunc, errorFilter ErrorFilter) RunJobFunc

FilterError returns a RunJobFunc that calls runJob. If runJob returns an error then it calls filterError and returns its error. filterError is typically used to determine if the error should be retried or not.

func UnmarshalParams

func UnmarshalParams[T any](fn func(ctx context.Context, job *Job, params T) error) RunJobFunc

UnmarshalParams returns a JobType.RunJob function that unmarshals job.Params into a T and calls fn.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is used to schedule jobs and start workers.

var DefaultContextScheduler *Scheduler

DefaultContextScheduler is the default scheduler. It is returned by Ctx when no scheduler is set in the context. It must be set before use.

func Ctx

func Ctx(ctx context.Context) *Scheduler

Ctx returns the *Scheduler attached to ctx. If ctx does not have a *Scheduler attached then it returns DefaultContextScheduler.

func NewScheduler

func NewScheduler(config *SchedulerConfig) (*Scheduler, error)

NewScheduler returns a new Scheduler.

func (*Scheduler) Schedule

func (m *Scheduler) Schedule(ctx context.Context, db DB, jobTypeName string, jobParams any, schedule JobSchedule) error

Schedule schedules a job to be run according to schedule.

func (*Scheduler) ScheduleNow

func (m *Scheduler) ScheduleNow(ctx context.Context, db DB, jobTypeName string, jobParams any) error

ScheduleNow schedules a job to be run immediately.

func (*Scheduler) StartWorker

func (m *Scheduler) StartWorker(config *WorkerConfig) (*Worker, error)

StartWorker starts a worker. The *Worker is returned immediately, but the startup process is run in the background. This is to avoid blocking or returning an error if the database is temporarily unavailable. Use StartupComplete if it is necessary to wait for the worker to be ready.

func (*Scheduler) WithContext

func (s *Scheduler) WithContext(ctx context.Context) context.Context

WithContext returns a copy of ctx with s attached.

type SchedulerConfig

type SchedulerConfig struct {
	// AcquireConn is used to get a connection to the database. It must be set.
	AcquireConn AcquireConnFunc

	// JobGroups is a lists of job groups that can be used by the scheduler. The job group "default" is always available.
	JobGroups []string

	// JobTypes is a list of job types that can be used by the scheduler. It must be set.
	JobTypes []*JobTypeConfig

	// HandleError is a function that is called when an error occurs that cannot be handled or returned. For example, a
	// network outage may cause a worker to be unable to fetch a job or record the outcome of an execution. The worker
	// should not be stopped because of this. Instead, it should try again later when the network may have been restored.
	// These types of errors are passed to HandleError. If not set errors are logged to stderr.
	HandleError func(err error)
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) HandleNotification

func (w *Worker) HandleNotification(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error

HandleNotification implements the pgxlisten.Handler interface. This allows a Worker to be used as a pgxlisten.Listener. When it receives a notification for the worker's job group it calls Signal.

func (*Worker) ID

func (w *Worker) ID() int32

ID gets the worker's ID. This is only valid after the worker startup has completed. This is guaranteed while processing a job, but not immediately after StartWorker has returned. Use StartupComplete to wait for the worker to start.

func (*Worker) Shutdown

func (w *Worker) Shutdown(ctx context.Context) error

Shutdown stops the worker. It waits for all jobs to finish before returning. Cancel ctx to force shutdown without waiting for jobs to finish or worker to cleanup.

func (*Worker) Signal

func (w *Worker) Signal()

Signal causes the worker to wake up and process requests. It is safe to call this from multiple goroutines. It does not block.

func (*Worker) StartupComplete

func (w *Worker) StartupComplete() <-chan struct{}

StartupComplete returns a channel that is closed when the worker start is complete.

type WorkerConfig

type WorkerConfig struct {
	// GroupName is the group to work. If empty, "default" is used.
	GroupName string

	// MaxConcurrentJobs is the maximum number of jobs to work concurrently. If not set 10 is used.
	MaxConcurrentJobs int

	// MaxPrefetchedJobs is the maximum number of prefetched jobs (i.e. jobs that are fetched from the database and
	// locked, but not yet being worked). If not set 1000 is used.
	MaxPrefetchedJobs int

	// PollInterval is the interval between polling for new jobs. If not set 10 seconds is used.
	PollInterval time.Duration

	// MaxBufferedJobResults is the maximum number of job results that can be buffered before the job results must be
	// flushed to the database. If not set 100 is used.
	MaxBufferedJobResults int

	// MaxBufferedJobResultAge is the maximum age of a buffered job result before the job results must be flushed to the
	// database. If not set 1 second is used.
	MaxBufferedJobResultAge time.Duration

	// ShouldLogJobRun is called for every job run. If it returns true then the run is logged to the pgxjob_job_runs
	// table. If it returns false it is not. If not set all job runs are logged.
	ShouldLogJobRun func(worker *Worker, job *Job, startTime, endTime time.Time, err error) bool
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL