Documentation ¶
Overview ¶
Package gue implements Golang queue on top of PostgreSQL. It uses transaction-level locks for concurrent work.
PostgreSQL drivers ¶
Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:
- github.com/jackc/pgx v4
- github.com/jackc/pgx v3
- github.com/lib/pq
Usage ¶
Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
package main import ( "context" "encoding/json" "fmt" "log" "os" "time" "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/sync/errgroup" "github.com/vgarvardt/gue/v3" "github.com/vgarvardt/gue/v3/adapter/pgxv4" ) type printNameArgs struct { Name string } func main() { printName := func(j *gue.Job) error { var args printNameArgs if err := json.Unmarshal(j.Args, &args); err != nil { return err } fmt.Printf("Hello %s!\n", args.Name) return nil } pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL")) if err != nil { log.Fatal(err) } pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg) if err != nil { log.Fatal(err) } defer pgxPool.Close() poolAdapter := pgxv4.NewConnPool(pgxPool) gc := gue.NewClient(poolAdapter) wm := gue.WorkMap{ "PrintName": printName, } // create a pool w/ 2 workers workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer")) ctx, shutdown := context.WithCancel(context.Background()) // work jobs in goroutine g, gctx := errgroup.WithContext(ctx) g.Go(func() error { err := workers.Run(gctx) if err != nil { // In a real-world applications, use a better way to shut down // application on unrecoverable error. E.g. fx.Shutdowner from // go.uber.org/fx module. log.Fatal(err) } return err }) args, err := json.Marshal(printNameArgs{Name: "vgarvardt"}) if err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } time.Sleep(30 * time.Second) // wait for while // send shutdown signal to worker shutdown() if err := g.Wait(); err != nil { log.Fatal(err) } }
Index ¶
- Variables
- type Backoff
- type Client
- func (c *Client) Enqueue(ctx context.Context, j *Job) error
- func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx adapter.Tx) error
- func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error)
- func (c *Client) LockJobByID(ctx context.Context, id int64) (*Job, error)
- func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error)
- type ClientOption
- type HookFunc
- type Job
- type PollStrategy
- type WorkFunc
- type WorkMap
- type Worker
- type WorkerOption
- func WithWorkerHooksJobDone(hooks ...HookFunc) WorkerOption
- func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption
- func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption
- func WithWorkerID(id string) WorkerOption
- func WithWorkerLogger(logger adapter.Logger) WorkerOption
- func WithWorkerPollInterval(d time.Duration) WorkerOption
- func WithWorkerPollStrategy(s PollStrategy) WorkerOption
- func WithWorkerQueue(queue string) WorkerOption
- type WorkerPool
- type WorkerPoolOption
- func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
- func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption
- func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
- func WithPoolID(id string) WorkerPoolOption
- func WithPoolLogger(logger adapter.Logger) WorkerPoolOption
- func WithPoolPollInterval(d time.Duration) WorkerPoolOption
- func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption
- func WithPoolQueue(queue string) WorkerPoolOption
Constants ¶
This section is empty.
Variables ¶
var ErrMissingType = errors.New("job type must be specified")
ErrMissingType is returned when you attempt to enqueue a job with no Type specified.
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
Backoff is the interface for backoff implementation that will be used to reschedule errored jobs.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
func NewClient ¶
func NewClient(pool adapter.ConnPool, options ...ClientOption) *Client
NewClient creates a new Client that uses the pgx pool.
func (*Client) EnqueueTx ¶
EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.
It is the caller's responsibility to Commit or Rollback the transaction after this function is called.
func (*Client) LockJob ¶
LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.
This function cares about the priority first to lock top priority jobs first even if there are available ones that should be executed earlier but with the lower priority.
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Done() or Error() on it in order to commit transaction to persist Job changes (remove or update it).
func (*Client) LockJobByID ¶ added in v3.1.0
LockJobByID attempts to retrieve a specific Job from the database. If the job is found, it will be locked on the transactional level, so other workers will be skipping it. If the job is not found, an error will be returned
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting the job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Done() or Error() on it in order to commit transaction to persist Job changes (remove or update it).
func (*Client) LockNextScheduledJob ¶ added in v3.2.0
LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.
This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Done() or Error() on it in order to commit transaction to persist Job changes (remove or update it).
type ClientOption ¶
type ClientOption func(*Client)
ClientOption defines a type that allows to set client properties during the build-time.
func WithClientBackoff ¶
func WithClientBackoff(backoff Backoff) ClientOption
WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.
func WithClientID ¶
func WithClientID(id string) ClientOption
WithClientID sets client ID for easier identification in logs
func WithClientLogger ¶
func WithClientLogger(logger adapter.Logger) ClientOption
WithClientLogger sets Logger implementation to client
type HookFunc ¶ added in v3.3.0
HookFunc is a function that may react to a Job lifecycle events. All the callbacks are being executed synchronously, so be careful with the long-running locking operations. Hooks do not return an error, therefore they can not and must not be used to affect the Job execution flow, e.g. cancel it - this is the WorkFunc responsibility. Modifying Job fields and calling any methods that are modifying its state within hooks may lead to undefined behaviour. Please never do this.
Depending on the event err parameter may be empty or not - check the event description for its meaning.
type Job ¶
type Job struct { // ID is the unique database ID of the Job. It is ignored on job creation. ID int64 // Queue is the name of the queue. It defaults to the empty queue "". Queue string // Priority is the priority of the Job. The default priority is 0, and a // lower number means a higher priority. // // The highest priority is -32768, the lowest one is +32767 Priority int16 // RunAt is the time that this job should be executed. It defaults to now(), // meaning the job will execute immediately. Set it to a value in the future // to delay a job's execution. RunAt time.Time // Type maps job to a worker func. Type string // Args must be the bytes of a valid JSON string Args []byte // ErrorCount is the number of times this job has attempted to run, but // failed with an error. It is ignored on job creation. // This field is initialised only when the Job is being retrieved from the DB and is not // being updated when the current Job run errored. ErrorCount int32 // LastError is the error message or stack trace from the last time the job // failed. It is ignored on job creation. // This field is initialised only when the Job is being retrieved from the DB and is not // being updated when the current Job run errored. LastError pgtype.Text // contains filtered or unexported fields }
Job is a single unit of work for Gue to perform.
func (*Job) Delete ¶
Delete marks this job as complete by deleting it form the database.
You must also later call Done() to return this job's database connection to the pool. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (*Job) Done ¶
Done commits transaction that marks job as done. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (*Job) Error ¶
Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.
This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
type PollStrategy ¶ added in v3.2.0
type PollStrategy string
PollStrategy determines how the DB is queried for the next job to work on
const ( // PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are available //ones that should be executed earlier but with lower priority. PriorityPollStrategy PollStrategy = "OrderByPriority" // RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there // are ones with a higher priority scheduled to a later time but already eligible for execution RunAtPollStrategy PollStrategy = "OrderByRunAtPriority" )
type WorkFunc ¶
WorkFunc is a function that performs a Job. If an error is returned, the job is re-enqueued with exponential backoff.
type WorkMap ¶
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is a single worker that pulls jobs off the specified queue. If no Job is found, the Worker will sleep for interval seconds.
func NewWorker ¶
func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.
Worker defaults to a poll interval of 5 seconds, which can be overridden by WithWorkerPollInterval option. The default queue is the nameless queue "", which can be overridden by WithWorkerQueue option.
func (*Worker) Run ¶
Run pulls jobs off the Worker's queue at its interval. This function does not run in its own goroutine, so it’s possible to wait for completion. Use context cancellation to shut it down.
func (*Worker) Start
deprecated
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption defines a type that allows to set worker properties during the build-time.
func WithWorkerHooksJobDone ¶ added in v3.3.0
func WithWorkerHooksJobDone(hooks ...HookFunc) WorkerOption
WithWorkerHooksJobDone sets hooks that are called when worker finished working the job. Error field is set for the cases when the job was worked with an error.
func WithWorkerHooksJobLocked ¶ added in v3.3.0
func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption
WithWorkerHooksJobLocked sets hooks that are called right after the job was polled from the DB. Depending on the polling results hook will have either error or job set, but not both. If the error field is set - no other lifecycle hooks will be called for the job.
func WithWorkerHooksUnknownJobType ¶ added in v3.3.0
func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption
WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type. Error field for this event type is always set since this is an error situation. If this hook is called - no other lifecycle hooks will be called for the job.
func WithWorkerID ¶
func WithWorkerID(id string) WorkerOption
WithWorkerID sets worker ID for easier identification in logs
func WithWorkerLogger ¶
func WithWorkerLogger(logger adapter.Logger) WorkerOption
WithWorkerLogger sets Logger implementation to worker
func WithWorkerPollInterval ¶
func WithWorkerPollInterval(d time.Duration) WorkerOption
WithWorkerPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithWorkerPollStrategy ¶ added in v3.2.0
func WithWorkerPollStrategy(s PollStrategy) WorkerOption
WithWorkerPollStrategy overrides default poll strategy with given value
func WithWorkerQueue ¶
func WithWorkerQueue(queue string) WorkerOption
WithWorkerQueue overrides default worker queue name with the given value.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.
func NewWorkerPool ¶
func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) *WorkerPool
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolPollInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.
func (*WorkerPool) Run ¶
func (w *WorkerPool) Run(ctx context.Context) error
Run runs all the Workers in the WorkerPool in own goroutines. Run blocks until all workers exit. Use context cancellation for shutdown.
func (*WorkerPool) Start
deprecated
func (w *WorkerPool) Start(ctx context.Context) error
Start starts all the Workers in the WorkerPool in own goroutines. Use cancel context to shut them down.
Deprecated: use Run instead of Start. Start leaks resources and does not wait for shutdown to complete.
type WorkerPoolOption ¶
type WorkerPoolOption func(pool *WorkerPool)
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
func WithPoolHooksJobDone ¶ added in v3.3.0
func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
func WithPoolHooksJobLocked ¶ added in v3.3.0
func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksJobLocked calls WithWorkerHooksJobLocked for every worker in the pool.
func WithPoolHooksUnknownJobType ¶ added in v3.3.0
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
func WithPoolID ¶
func WithPoolID(id string) WorkerPoolOption
WithPoolID sets worker pool ID for easier identification in logs
func WithPoolLogger ¶
func WithPoolLogger(logger adapter.Logger) WorkerPoolOption
WithPoolLogger sets Logger implementation to worker pool
func WithPoolPollInterval ¶
func WithPoolPollInterval(d time.Duration) WorkerPoolOption
WithPoolPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollStrategy ¶ added in v3.2.0
func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption
WithPoolPollStrategy overrides default poll strategy with given value
func WithPoolQueue ¶
func WithPoolQueue(queue string) WorkerPoolOption
WithPoolQueue overrides default worker queue name with the given value.