Documentation ¶
Overview ¶
Package gue implements Golang queues on top of PostgreSQL. It uses transaction-level locks for concurrent work.
PostgreSQL drivers ¶
Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:
- github.com/jackc/pgx/v4
- github.com/jackc/pgx/v5
- github.com/lib/pq
Usage ¶
Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
package main import ( "context" "encoding/json" "fmt" "log" "os" "time" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/sync/errgroup" "github.com/vgarvardt/gue/v5" "github.com/vgarvardt/gue/v5/adapter/pgxv5" ) type printNameArgs struct { Name string } func main() { printName := func(j *gue.Job) error { var args printNameArgs if err := json.Unmarshal(j.Args, &args); err != nil { return err } fmt.Printf("Hello %s!\n", args.Name) return nil } pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL")) if err != nil { log.Fatal(err) } pgxPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg) if err != nil { log.Fatal(err) } defer pgxPool.Close() poolAdapter := pgxv5.NewConnPool(pgxPool) gc, err := gue.NewClient(poolAdapter) if err != nil { log.Fatal(err) } wm := gue.WorkMap{ "PrintName": printName, } // create a pool w/ 2 workers workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer")) if err != nil { log.Fatal(err) } ctx, shutdown := context.WithCancel(context.Background()) // work jobs in goroutine g, gctx := errgroup.WithContext(ctx) g.Go(func() error { err := workers.Run(gctx) if err != nil { // In a real-world applications, use a better way to shut down // application on unrecoverable error. E.g. fx.Shutdowner from // go.uber.org/fx module. log.Fatal(err) } return err }) args, err := json.Marshal(printNameArgs{Name: "vgarvardt"}) if err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } j := &gue.Job{ Type: "PrintName", RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds Args: args, } if err := gc.Enqueue(context.Background(), j); err != nil { log.Fatal(err) } time.Sleep(30 * time.Second) // wait for while // send shutdown signal to worker shutdown() if err := g.Wait(); err != nil { log.Fatal(err) } }
Index ¶
- Constants
- Variables
- func ErrDiscardJob(reason string) error
- func ErrRescheduleJobAt(t time.Time, reason string) error
- func ErrRescheduleJobIn(d time.Duration, reason string) error
- func GetWorkerIdx(ctx context.Context) int
- func RandomStringID() string
- func RunLock(ctx context.Context, f func(ctx context.Context) error, mu *sync.Mutex, ...) error
- type Backoff
- type Client
- func (c *Client) Enqueue(ctx context.Context, j *Job) error
- func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx adapter.Tx) error
- func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error)
- func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error)
- func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error)
- type ClientOption
- type ErrJobReschedule
- type HookFunc
- type Job
- type JobPriority
- type PollStrategy
- type WorkFunc
- type WorkMap
- type Worker
- type WorkerOption
- func WithWorkerGracefulShutdown(handlerCtx func() context.Context) WorkerOption
- func WithWorkerHooksJobDone(hooks ...HookFunc) WorkerOption
- func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption
- func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption
- func WithWorkerID(id string) WorkerOption
- func WithWorkerLogger(logger adapter.Logger) WorkerOption
- func WithWorkerMeter(meter metric.Meter) WorkerOption
- func WithWorkerPanicStackBufSize(size int) WorkerOption
- func WithWorkerPollInterval(d time.Duration) WorkerOption
- func WithWorkerPollStrategy(s PollStrategy) WorkerOption
- func WithWorkerQueue(queue string) WorkerOption
- func WithWorkerTracer(tracer trace.Tracer) WorkerOption
- type WorkerPool
- type WorkerPoolOption
- func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption
- func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
- func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption
- func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
- func WithPoolID(id string) WorkerPoolOption
- func WithPoolLogger(logger adapter.Logger) WorkerPoolOption
- func WithPoolMeter(meter metric.Meter) WorkerPoolOption
- func WithPoolPanicStackBufSize(size int) WorkerPoolOption
- func WithPoolPollInterval(d time.Duration) WorkerPoolOption
- func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption
- func WithPoolQueue(queue string) WorkerPoolOption
- func WithPoolTracer(tracer trace.Tracer) WorkerPoolOption
Constants ¶
const (
// WorkerIdxUnknown is returned when worker index in the pool is not set for some reasons.
WorkerIdxUnknown = -1
)
Variables ¶
var ( // DefaultExponentialBackoff is the exponential Backoff implementation with default config applied DefaultExponentialBackoff = NewExponentialBackoff(exp.Config{ BaseDelay: 1.0 * time.Second, Multiplier: 1.6, Jitter: 0.2, MaxDelay: 1.0 * time.Hour, }) // BackoffNever is the Backoff implementation that never returns errored job to the queue for retry, // but discards it in case of the error. BackoffNever = func(retries int) time.Duration { return -1 } )
var ErrMissingType = errors.New("job type must be specified")
ErrMissingType is returned when you attempt to enqueue a job with no Type specified.
Functions ¶
func ErrDiscardJob ¶
ErrDiscardJob spawns an error that unconditionally discards a job.
func ErrRescheduleJobAt ¶
ErrRescheduleJobAt spawns an error that reschedules a job to run at some predefined time.
func ErrRescheduleJobIn ¶
ErrRescheduleJobIn spawns an error that reschedules a job to run after some predefined duration.
func GetWorkerIdx ¶
GetWorkerIdx gets the index of the worker in the pool from the worker context. Returns WorkerIdxUnknown if the context is not set or the value is not found there.
func RandomStringID ¶
func RandomStringID() string
RandomStringID returns random alphanumeric string that can be used as ID.
Types ¶
type Backoff ¶
Backoff is the interface for backoff implementation that will be used to reschedule errored jobs to a later time. If the Backoff implementation returns negative duration - the job will be discarded.
func NewConstantBackoff ¶
NewConstantBackoff instantiates new backoff implementation with teh constant retry duration that does not depend on the retry.
func NewExponentialBackoff ¶
NewExponentialBackoff instantiates new exponential Backoff implementation with config
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Gue client that can add jobs to the queue and remove jobs from the queue.
func NewClient ¶
func NewClient(pool adapter.ConnPool, options ...ClientOption) (*Client, error)
NewClient creates a new Client that uses the pgx pool.
func (*Client) EnqueueTx ¶
EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.
It is the caller's responsibility to Commit or Rollback the transaction after this function is called.
func (*Client) LockJob ¶
LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.
This function cares about the priority first to lock top priority jobs first even if there are available ones that should be executed earlier but with the lower priority.
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).
func (*Client) LockJobByID ¶
LockJobByID attempts to retrieve a specific Job from the database. If the job is found, it will be locked on the transactional level, so other workers will be skipping it. If the job is not found, an error will be returned
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting the job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).
func (*Client) LockNextScheduledJob ¶
LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.
This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution
Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.
After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).
type ClientOption ¶
type ClientOption func(*Client)
ClientOption defines a type that allows to set client properties during the build-time.
func WithClientBackoff ¶
func WithClientBackoff(backoff Backoff) ClientOption
WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.
func WithClientID ¶
func WithClientID(id string) ClientOption
WithClientID sets client ID for easier identification in logs.
func WithClientLogger ¶
func WithClientLogger(logger adapter.Logger) ClientOption
WithClientLogger sets Logger implementation to client.
func WithClientMeter ¶
func WithClientMeter(meter metric.Meter) ClientOption
WithClientMeter sets metric.Meter instance to the client.
type ErrJobReschedule ¶
type ErrJobReschedule interface {
// contains filtered or unexported methods
}
ErrJobReschedule interface implementation allows errors to reschedule jobs in the individual basis.
type HookFunc ¶
HookFunc is a function that may react to a Job lifecycle events. All the callbacks are being executed synchronously, so be careful with the long-running locking operations. Hooks do not return an error, therefore they can not and must not be used to affect the Job execution flow, e.g. cancel it - this is the WorkFunc responsibility. Modifying Job fields and calling any methods that are modifying its state within hooks may lead to undefined behaviour. Please never do this.
Depending on the event err parameter may be empty or not - check the event description for its meaning.
type Job ¶
type Job struct { // ID is the unique database ID of the Job. It is ignored on job creation. ID ulid.ULID // Queue is the name of the queue. It defaults to the empty queue "". Queue string // Priority is the priority of the Job. The default priority is 0, and a // lower number means a higher priority. // // The highest priority is JobPriorityHighest, the lowest one is JobPriorityLowest Priority JobPriority // RunAt is the time that this job should be executed. It defaults to now(), // meaning the job will execute immediately. Set it to a value in the future // to delay a job's execution. RunAt time.Time // Type maps job to a worker func. Type string // Args for the job. Args []byte // ErrorCount is the number of times this job has attempted to run, but failed with an error. // It is ignored on job creation. // This field is initialised only when the Job is being retrieved from the DB and is not // being updated when the current Job handler errored. ErrorCount int32 // LastError is the error message or stack trace from the last time the job failed. It is ignored on job creation. // This field is initialised only when the Job is being retrieved from the DB and is not // being updated when the current Job run errored. This field supposed to be used mostly for the debug reasons. LastError sql.NullString // contains filtered or unexported fields }
Job is a single unit of work for Gue to perform.
func (*Job) Delete ¶
Delete marks this job as complete by deleting it from the database.
You must also later call Done() to return this job's database connection to the pool. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (*Job) Done ¶
Done commits transaction that marks job as done. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (*Job) Error ¶
Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.
This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.
type JobPriority ¶
type JobPriority int16
JobPriority is the wrapper type for Job.Priority
const ( JobPriorityHighest JobPriority = -32768 JobPriorityHigh JobPriority = -16384 JobPriorityDefault JobPriority = 0 JobPriorityLow JobPriority = 16384 JobPriorityLowest JobPriority = 32767 )
Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
type PollStrategy ¶
type PollStrategy string
PollStrategy determines how the DB is queried for the next job to work on
const ( // PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are available // ones that should be executed earlier but with lower priority. PriorityPollStrategy PollStrategy = "OrderByPriority" // RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there // are ones with a higher priority scheduled to a later time but already eligible for execution RunAtPollStrategy PollStrategy = "OrderByRunAtPriority" )
type WorkFunc ¶
WorkFunc is the handler function that performs the Job. If an error is returned, the Job is either re-enqueued with the given backoff or is discarded based on the worker backoff strategy and returned error.
Modifying Job fields and calling any methods that are modifying its state within the handler may lead to undefined behaviour. Please never do this.
type WorkMap ¶
WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is a single worker that pulls jobs off the specified queue. If no Job is found, the Worker will sleep for interval seconds.
func NewWorker ¶
func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) (*Worker, error)
NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.
Worker defaults to a poll interval of 5 seconds, which can be overridden by WithWorkerPollInterval option. The default queue is the nameless queue "", which can be overridden by WithWorkerQueue option.
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption defines a type that allows to set worker properties during the build-time.
func WithWorkerGracefulShutdown ¶
func WithWorkerGracefulShutdown(handlerCtx func() context.Context) WorkerOption
WithWorkerGracefulShutdown enables graceful shutdown mode in the worker. When graceful shutdown is enabled - worker does not propagate cancel context to Job, as a result worker is waiting for the Job being currently executed and only then shuts down. Use this mode carefully, as Job handler is not aware anymore of the worker context state and dependencies may already be cancelled/closed, so it is up to the job to ensure everything is still working. Values of the original context are not propagated to the handler context as well when the graceful mode is enabled.
Use "handlerCtx" to set up custom handler context. When set to nil - defaults to context.Background().
func WithWorkerHooksJobDone ¶
func WithWorkerHooksJobDone(hooks ...HookFunc) WorkerOption
WithWorkerHooksJobDone sets hooks that are called when worker finished working the job, right before the successfully executed job will be removed or errored job handler will be called to decide if the Job will be re-queued or discarded. Error field is set for the cases when the job was worked with an error.
func WithWorkerHooksJobLocked ¶
func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption
WithWorkerHooksJobLocked sets hooks that are called right after the job was polled from the DB. Depending on the polling results hook will have either error or job set, but not both. If the error field is set - no other lifecycle hooks will be called for the job.
func WithWorkerHooksUnknownJobType ¶
func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption
WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type. Error field for this event type is always set since this is an error situation. If this hook is called - no other lifecycle hooks will be called for the job.
func WithWorkerID ¶
func WithWorkerID(id string) WorkerOption
WithWorkerID sets worker ID for easier identification in logs
func WithWorkerLogger ¶
func WithWorkerLogger(logger adapter.Logger) WorkerOption
WithWorkerLogger sets Logger implementation to worker
func WithWorkerMeter ¶
func WithWorkerMeter(meter metric.Meter) WorkerOption
WithWorkerMeter sets metric.Meter instance to the worker.
func WithWorkerPanicStackBufSize ¶
func WithWorkerPanicStackBufSize(size int) WorkerOption
WithWorkerPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.
func WithWorkerPollInterval ¶
func WithWorkerPollInterval(d time.Duration) WorkerOption
WithWorkerPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithWorkerPollStrategy ¶
func WithWorkerPollStrategy(s PollStrategy) WorkerOption
WithWorkerPollStrategy overrides default poll strategy with given value
func WithWorkerQueue ¶
func WithWorkerQueue(queue string) WorkerOption
WithWorkerQueue overrides default worker queue name with the given value.
func WithWorkerTracer ¶
func WithWorkerTracer(tracer trace.Tracer) WorkerOption
WithWorkerTracer sets trace.Tracer instance to the worker.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.
func NewWorkerPool ¶
func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) (*WorkerPool, error)
NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolPollInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.
type WorkerPoolOption ¶
type WorkerPoolOption func(pool *WorkerPool)
WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.
func WithPoolGracefulShutdown ¶
func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption
WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool. See WithWorkerGracefulShutdown for details.
func WithPoolHooksJobDone ¶
func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.
func WithPoolHooksJobLocked ¶
func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksJobLocked calls WithWorkerHooksJobLocked for every worker in the pool.
func WithPoolHooksUnknownJobType ¶
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption
WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
func WithPoolID ¶
func WithPoolID(id string) WorkerPoolOption
WithPoolID sets worker pool ID for easier identification in logs
func WithPoolLogger ¶
func WithPoolLogger(logger adapter.Logger) WorkerPoolOption
WithPoolLogger sets Logger implementation to worker pool
func WithPoolMeter ¶
func WithPoolMeter(meter metric.Meter) WorkerPoolOption
WithPoolMeter sets metric.Meter instance to every worker in the pool.
func WithPoolPanicStackBufSize ¶
func WithPoolPanicStackBufSize(size int) WorkerPoolOption
WithPoolPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.
func WithPoolPollInterval ¶
func WithPoolPollInterval(d time.Duration) WorkerPoolOption
WithPoolPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollStrategy ¶
func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption
WithPoolPollStrategy overrides default poll strategy with given value
func WithPoolQueue ¶
func WithPoolQueue(queue string) WorkerPoolOption
WithPoolQueue overrides default worker queue name with the given value.
func WithPoolTracer ¶
func WithPoolTracer(tracer trace.Tracer) WorkerPoolOption
WithPoolTracer sets trace.Tracer instance to every worker in the pool.