Documentation
¶
Overview ¶
Package gue is based on the github.com/bgentry/que-go and adds some additional functionality like several PostgreSQL drivers support, slightly more idiomatic Go API (opinionated) and some minor improvements.
PostgreSQL drivers ¶
Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented: - github.com/jackc/pgx v3 - github.com/jackc/pgx v4
Usage ¶
Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
package main import ( "context" "encoding/json" "fmt" "log" "os" "time" "github.com/jackc/pgx/v4/pgxpool" "github.com/vgarvardt/gue" "github.com/vgarvardt/gue/adapter/pgxv4" ) type printNameArgs struct { Name string } func main() { printName := func(j *gue.Job) error { var args printNameArgs if err := json.Unmarshal(j.Args, &args); err != nil { return err } fmt.Printf("Hello %s!\n", args.Name) return nil } pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL")) if err != nil { log.Fatal(err) } pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg) if err != nil { log.Fatal(err) } defer pgxPool.Close() poolAdapter := pgxv4.NewConnPool(pgxPool) gc := gue.NewClient(poolAdapter) wm := gue.WorkMap{ "PrintName": printName, } // create a pool w/ 2 workers workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer")) ctx, shutdown := context.WithCancel(context.Background()) // work jobs in goroutine if err := workers.Start(ctx); err != nil { log.Fatal(err) } args, err := json.Marshal(printNameArgs{Name: "vgarvardt"}) if err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } time.Sleep(30 * time.Second) // wait for while // send shutdown signal to worker shutdown() }
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAgain = errors.New("maximum number of LockJob attempts reached")
ErrAgain returned by LockJob if a job could not be retrieved from the queue after several attempts because of concurrently running transactions. This error should not be returned unless the queue is under extremely heavy concurrency.
var ErrMissingType = errors.New("job type must be specified")
ErrMissingType is returned when you attempt to enqueue a job with no Type specified.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Que client that can add jobs to the queue and remove jobs from the queue.
func (*Client) EnqueueInTx ¶
EnqueueInTx adds a job to the queue within the scope of the transaction tx. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.
It is the caller's responsibility to Commit or Rollback the transaction after this function is called.
func (*Client) LockJob ¶
LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, a session-level Postgres advisory lock is created for the Job's ID. If no job is found, nil will be returned instead of an error.
Because Gue uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock.
After the Job has been worked, you must call either Done() or Error() on it in order to return the database connection to the pool and remove the lock.
type Job ¶
type Job struct { // ID is the unique database ID of the Job. It is ignored on job creation. ID int64 // Queue is the name of the queue. It defaults to the empty queue "". Queue string // Priority is the priority of the Job. The default priority is 100, and a // lower number means a higher priority. A priority of 5 would be very // important. Priority int16 // RunAt is the time that this job should be executed. It defaults to now(), // meaning the job will execute immediately. Set it to a value in the future // to delay a job's execution. RunAt time.Time // Type corresponds to the Ruby job_class. If you are interoperating with // Ruby, you should pick suitable Ruby class names (such as MyJob). Type string // Args must be the bytes of a valid JSON string Args []byte // ErrorCount is the number of times this job has attempted to run, but // failed with an error. It is ignored on job creation. ErrorCount int32 // LastError is the error message or stack trace from the last time the job // failed. It is ignored on job creation. LastError pgtype.Text // contains filtered or unexported fields }
Job is a single unit of work for Gue to perform.
func (*Job) Conn ¶
Conn returns the pgx connection that this job is locked to. You may initiate transactions on this connection or use it as you please until you call Done(). At that point, this conn will be returned to the pool and it is unsafe to keep using it. This function will return nil if the Job's connection has already been released with Done().
func (*Job) Delete ¶
Delete marks this job as complete by deleting it form the database.
You must also later call Done() to return this job's database connection to the pool.
func (*Job) Done ¶
Done releases the Postgres advisory lock on the job and returns the database connection to the pool.
type WorkFunc ¶
WorkFunc is a function that performs a Job. If an error is returned, the job is re-enqueued with exponential backoff.
type WorkMap ¶
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is a single worker that pulls jobs off the specified queue. If no Job is found, the Worker will sleep for interval seconds.
func NewWorker ¶
func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.
Workers default to a poll interval of 5 seconds, which can be overridden by WithPollInterval option. The default queue is the nameless queue "", which can be overridden by WithQueue option.
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption defines a type that allows to set worker properties during the build-time.
func WithID ¶
func WithID(id string) WorkerOption
WithID sets worker ID for easier identification in logs
func WithLogger ¶
func WithLogger(logger adapter.Logger) WorkerOption
WithLogger sets Logger implementation to worker
func WithPollInterval ¶ added in v1.0.2
func WithPollInterval(d time.Duration) WorkerOption
WithPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithQueue ¶
func WithQueue(queue string) WorkerOption
WithQueue overrides default worker queue name with the given value.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of Workers, each working jobs from the queue queue at the specified interval using the WorkMap.
func NewWorkerPool ¶
func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) *WorkerPool
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolPollInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.
type WorkerPoolOption ¶
type WorkerPoolOption func(pool *WorkerPool)
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
func WithPoolID ¶
func WithPoolID(id string) WorkerPoolOption
WithPoolID sets worker pool ID for easier identification in logs
func WithPoolLogger ¶
func WithPoolLogger(logger adapter.Logger) WorkerPoolOption
WithPoolLogger sets Logger implementation to worker pool
func WithPoolPollInterval ¶ added in v1.0.2
func WithPoolPollInterval(d time.Duration) WorkerPoolOption
WithPoolPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolQueue ¶
func WithPoolQueue(queue string) WorkerPoolOption
WithPoolQueue overrides default worker queue name with the given value.