gue

package module
v5.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: MIT Imports: 22 Imported by: 10

README

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v5

Additionally, you need to apply DB migration.

Usage Example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"golang.org/x/sync/errgroup"

	"github.com/vgarvardt/gue/v5"
	"github.com/vgarvardt/gue/v5/adapter/pgxv5"
)

const (
	printerQueue   = "name_printer"
	jobTypePrinter = "PrintName"
)

type printNameArgs struct {
	Name string
}

func main() {
	printName := func(ctx context.Context, j *gue.Job) error {
		var args printNameArgs
		if err := json.Unmarshal(j.Args, &args); err != nil {
			return err
		}
		fmt.Printf("Hello %s!\n", args.Name)
		return nil
	}

	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv5.NewConnPool(pgxPool)

	gc, err := gue.NewClient(poolAdapter)
	if err != nil {
		log.Fatal(err)
	}
	wm := gue.WorkMap{
		jobTypePrinter: printName,
	}

	finishedJobsLog := func(ctx context.Context, j *gue.Job, err error) {
		if err != nil {
			return
		}

		j.Tx().Exec(
			ctx,
			"INSERT INTO finished_jobs_log (queue, type, run_at) VALUES ($1, $2, now())",
			j.Queue,
			j.Type,
		)
	}

	// create a pool w/ 2 workers
	workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue(printerQueue), gue.WithPoolHooksJobDone(finishedJobsLog))
	if err != nil {
		log.Fatal(err)
	}

	ctx, shutdown := context.WithCancel(context.Background())

	// work jobs in goroutine
	g, gctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		err := workers.Run(gctx)
		if err != nil {
			// In a real-world applications, use a better way to shut down
			// application on unrecoverable error. E.g. fx.Shutdowner from
			// go.uber.org/fx module.
			log.Fatal(err)
		}
		return err
	})

	args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
	if err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	j = &gue.Job{
		Type:  jobTypePrinter,
		Queue: printerQueue,
		RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	time.Sleep(30 * time.Second) // wait for while

	// send shutdown signal to worker
	shutdown()
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v5
package main

import (
  "log"
  "os"

  "github.com/jackc/pgx/v5/pgxpool"

  "github.com/vgarvardt/gue/v5"
  "github.com/vgarvardt/gue/v5/adapter/pgxv5"
)

func main() {
  pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }

  pgxPool, err := pgxpool.NewConfig(context.Background(), pgxCfg)
  if err != nil {
    log.Fatal(err)
  }
  defer pgxPool.Close()

  poolAdapter := pgxv5.NewConnPool(pgxPool)

  gc, err := gue.NewClient(poolAdapter)
  ...
}
pgx/v4
package main

import (
  "context"
  "log"
  "os"

  "github.com/jackc/pgx/v4/pgxpool"

  "github.com/vgarvardt/gue/v5"
  "github.com/vgarvardt/gue/v5/adapter/pgxv4"
)

func main() {
  pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }

  pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
  if err != nil {
    log.Fatal(err)
  }
  defer pgxPool.Close()

  poolAdapter := pgxv4.NewConnPool(pgxPool)

  gc, err := gue.NewClient(poolAdapter)
  ...
}
lib/pq
package main

import (
  "database/sql"
  "log"
  "os"

  _ "github.com/lib/pq" // register postgres driver

  "github.com/vgarvardt/gue/v5"
  "github.com/vgarvardt/gue/v5/adapter/libpq"
)

func main() {
  db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }
  defer db.Close()

  poolAdapter := libpq.NewConnPool(db)

  gc, err := gue.NewClient(poolAdapter)
  ...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter/zap.New(...).
  • Olivier Poitrey's zerolog - adapter that uses github.com/rs/zerolog logger for logs output. Instantiate it with adapter/zerolog.New(...).
  • Stdlib slog - adapter that uses log/slog logger for logs output. Instantiate it with adapter/slog.New(...).

Documentation

Overview

Package gue implements Golang queues on top of PostgreSQL. It uses transaction-level locks for concurrent work.

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • github.com/jackc/pgx/v4
  • github.com/jackc/pgx/v5
  • github.com/lib/pq

Usage

Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"golang.org/x/sync/errgroup"

	"github.com/vgarvardt/gue/v5"
	"github.com/vgarvardt/gue/v5/adapter/pgxv5"
)

type printNameArgs struct {
	Name string
}

func main() {
	printName := func(j *gue.Job) error {
		var args printNameArgs
		if err := json.Unmarshal(j.Args, &args); err != nil {
			return err
		}
		fmt.Printf("Hello %s!\n", args.Name)
		return nil
	}

	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv5.NewConnPool(pgxPool)

	gc, err := gue.NewClient(poolAdapter)
	if err != nil {
		log.Fatal(err)
	}

	wm := gue.WorkMap{
		"PrintName": printName,
	}

	// create a pool with 2 workers
	workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))
	if err != nil {
		log.Fatal(err)
	}

	ctx, shutdown := context.WithCancel(context.Background())

	// work jobs in goroutine
	g, gctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		err := workers.Run(gctx)
		if err != nil {
			// In a real-world applications, use a better way to shut down
			// application on unrecoverable error. E.g. fx.Shutdowner from
			// go.uber.org/fx module.
			log.Fatal(err)
		}
		return err
	})

	args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
	if err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  "PrintName",
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  "PrintName",
		RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	time.Sleep(30 * time.Second) // wait for while

	// send shutdown signal to worker
	shutdown()
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}
}

Index

Constants

View Source
const (
	// WorkerIdxUnknown is returned when worker index in the pool is not set for some reason.
	WorkerIdxUnknown = -1
)

Variables

View Source
var (
	// DefaultExponentialBackoff is the exponential Backoff implementation with default config applied
	DefaultExponentialBackoff = NewExponentialBackoff(exp.Config{
		BaseDelay:  1.0 * time.Second,
		Multiplier: 1.6,
		Jitter:     0.2,
		MaxDelay:   1.0 * time.Hour,
	})

	// BackoffNever is the Backoff implementation that never returns errored job to the queue for retry,
	// but discards it in case of the error.
	BackoffNever = func(retries int) time.Duration {
		return -1
	}
)
View Source
var (
	// ErrJobPanicked is returned when the job failed to be handled because it is panicked.
	// Error is normally returned wrapped, so use `errors.Is(err, gue.ErrJobPanicked)` to ensure this is the error you're
	// looking for.
	ErrJobPanicked = errors.New("job panicked")

	// ErrHookJobDonePanicked is returned when the hook job done panicked while panicked job recovery.
	// Error is normally returned wrapped, so use `errors.Is(err, gue.ErrHookJobDonePanicked)` to ensure this is the error you're
	// looking for.
	ErrHookJobDonePanicked = errors.New("hook job done panicked in job panic recovery")
)
View Source
var ErrMissingType = errors.New("job type must be specified")

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

Functions

func ErrDiscardJob

func ErrDiscardJob(reason string) error

ErrDiscardJob spawns an error that unconditionally discards a job.

func ErrRescheduleJobAt

func ErrRescheduleJobAt(t time.Time, reason string) error

ErrRescheduleJobAt spawns an error that reschedules a job to run at some predefined time.

func ErrRescheduleJobIn

func ErrRescheduleJobIn(d time.Duration, reason string) error

ErrRescheduleJobIn spawns an error that reschedules a job to run after some predefined duration.

func GetWorkerIdx

func GetWorkerIdx(ctx context.Context) int

GetWorkerIdx gets the index of the worker in the pool from the worker context. Returns WorkerIdxUnknown if the context is not set or the value is not found there.

func RandomStringID

func RandomStringID() string

RandomStringID returns random alphanumeric string that can be used as ID.

func RunLock

func RunLock(ctx context.Context, f func(ctx context.Context) error, mu *sync.Mutex, running *bool, id string) error

RunLock ensures that there is only one instance of the running callback function "f" (worker).

Types

type Backoff

type Backoff func(retries int) time.Duration

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs to a later time. If the Backoff implementation returns negative duration - the job will be discarded.

func NewConstantBackoff

func NewConstantBackoff(d time.Duration) Backoff

NewConstantBackoff instantiates new backoff implementation with the constant retry duration that does not depend on the retry.

func NewExponentialBackoff

func NewExponentialBackoff(cfg exp.Config) Backoff

NewExponentialBackoff instantiates new exponential Backoff implementation with config

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.

func NewClient

func NewClient(pool adapter.ConnPool, options ...ClientOption) (*Client, error)

NewClient creates a new Client that uses the pgx pool.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, j *Job) error

Enqueue adds a job to the queue.

func (*Client) EnqueueBatch added in v5.2.0

func (c *Client) EnqueueBatch(ctx context.Context, jobs []*Job) error

EnqueueBatch adds a batch of jobs. Operation is atomic, so either all jobs are added, or none.

func (*Client) EnqueueBatchTx added in v5.2.0

func (c *Client) EnqueueBatchTx(ctx context.Context, jobs []*Job, tx adapter.Tx) error

EnqueueBatchTx adds a batch of jobs within the scope of the transaction. This allows you to guarantee that an enqueued batch will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) EnqueueTx

func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx adapter.Tx) error

EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) EnqueueWithID added in v5.4.0

func (c *Client) EnqueueWithID(ctx context.Context, j *Job, jobID ulid.ULID) error

EnqueueWithID adds a job to the queue with a specific id

func (*Client) LockJob

func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error)

LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.

This function cares about the priority first to lock top priority jobs first even if there are available ones that should be executed earlier but with the lower priority.

Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.

After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).

func (*Client) LockJobByID

func (c *Client) LockJobByID(ctx context.Context, id ulid.ULID) (*Job, error)

LockJobByID attempts to retrieve a specific Job from the database. If the job is found, it will be locked on the transactional level, so other workers will be skipping it. If the job is not found, an error will be returned

Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting the job, working it, deleting it, and releasing the lock.

After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).

func (*Client) LockNextScheduledJob

func (c *Client) LockNextScheduledJob(ctx context.Context, queue string) (*Job, error)

LockNextScheduledJob attempts to retrieve the earliest scheduled Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.

This function cares about the scheduled time first to lock earliest to execute jobs first even if there are ones with a higher priority scheduled to a later time but already eligible for execution

Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.

After the Job has been worked, you must call either Job.Done() or Job.Error() on it in order to commit transaction to persist Job changes (remove or update it).

type ClientOption

type ClientOption func(*Client)

ClientOption defines a type that allows to set client properties during the build-time.

func WithClientBackoff

func WithClientBackoff(backoff Backoff) ClientOption

WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.

func WithClientID

func WithClientID(id string) ClientOption

WithClientID sets client ID for easier identification in logs.

func WithClientLogger

func WithClientLogger(logger adapter.Logger) ClientOption

WithClientLogger sets Logger implementation to client.

func WithClientMeter

func WithClientMeter(meter metric.Meter) ClientOption

WithClientMeter sets metric.Meter instance to the client.

type ErrJobReschedule

type ErrJobReschedule interface {
	// contains filtered or unexported methods
}

ErrJobReschedule interface implementation allows errors to reschedule jobs in the individual basis.

type HookFunc

type HookFunc func(ctx context.Context, j *Job, err error)

HookFunc is a function that may react to a Job lifecycle events. All the callbacks are being executed synchronously, so be careful with the long-running locking operations. Hooks do not return an error, therefore they can not and must not be used to affect the Job execution flow, e.g. cancel it - this is the WorkFunc responsibility. Modifying Job fields and calling any methods that are modifying its state within hooks may lead to undefined behaviour. Please never do this.

Depending on the event err parameter may be empty or not - check the event description for its meaning.

type Job

type Job struct {
	// ID is the unique database ID of the Job. It is ignored on job creation.
	ID ulid.ULID

	// Queue is the name of the queue. It defaults to the empty queue "".
	Queue string

	// Priority is the priority of the Job. The default priority is 0, and a
	// lower number means a higher priority.
	//
	// The highest priority is JobPriorityHighest, the lowest one is JobPriorityLowest
	Priority JobPriority

	// RunAt is the time that this job should be executed. It defaults to now(),
	// meaning the job will execute immediately. Set it to a value in the future
	// to delay a job's execution.
	RunAt time.Time

	// Type maps job to a worker func.
	Type string

	// Args for the job.
	Args []byte

	// ErrorCount is the number of times this job has attempted to run, but failed with an error.
	// It is ignored on job creation.
	// This field is initialised only when the Job is being retrieved from the DB and is not
	// being updated when the current Job handler errored.
	ErrorCount int32

	// LastError is the error message or stack trace from the last time the job failed. It is ignored on job creation.
	// This field is initialised only when the Job is being retrieved from the DB and is not
	// being updated when the current Job run errored. This field supposed to be used mostly for the debug reasons.
	LastError sql.NullString

	// CreatedAt is the job creation time.
	// This field is initialised only when the Job is being retrieved from the DB and is not
	// being updated when the current Job run errored. This field can be used as a decision parameter in some handlers
	// whether it makes sense to retry the job or it can be dropped.
	CreatedAt time.Time
	// contains filtered or unexported fields
}

Job is a single unit of work for Gue to perform.

func (*Job) Delete

func (j *Job) Delete(ctx context.Context) error

Delete marks this job as complete by deleting it from the database.

You must also later call Done() to return this job's database connection to the pool. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

func (*Job) Done

func (j *Job) Done(ctx context.Context) error

Done commits transaction that marks job as done. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

func (*Job) Error

func (j *Job) Error(ctx context.Context, jErr error) (err error)

Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.

This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues. If you got the job from the worker - it will take care of cleaning up the job and resources, no need to do this manually in a WorkFunc.

func (*Job) Tx

func (j *Job) Tx() adapter.Tx

Tx returns DB transaction that this job is locked to. You may use it as you please until you call Done(). At that point, this transaction will be committed. This function will return nil if the Job's transaction was closed with Done().

type JobPriority

type JobPriority int16

JobPriority is the wrapper type for Job.Priority

const (
	JobPriorityHighest JobPriority = -32768
	JobPriorityHigh    JobPriority = -16384
	JobPriorityDefault JobPriority = 0
	JobPriorityLow     JobPriority = 16384
	JobPriorityLowest  JobPriority = 32767
)

Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.

type PollStrategy

type PollStrategy string

PollStrategy determines how the DB is queried for the next job to work on

const (

	// PriorityPollStrategy cares about the priority first to lock top priority jobs first even if there are available
	// ones that should be executed earlier but with lower priority.
	PriorityPollStrategy PollStrategy = "OrderByPriority"
	// RunAtPollStrategy cares about the scheduled time first to lock earliest to execute jobs first even if there
	// are ones with a higher priority scheduled to a later time but already eligible for execution
	RunAtPollStrategy PollStrategy = "OrderByRunAtPriority"
)

type WorkFunc

type WorkFunc func(ctx context.Context, j *Job) error

WorkFunc is the handler function that performs the Job. If an error is returned, the Job is either re-enqueued with the given backoff or is discarded based on the worker backoff strategy and returned error.

Modifying Job fields and calling any methods that are modifying its state within the handler may lead to undefined behaviour. Please never do this.

type WorkMap

type WorkMap map[string]WorkFunc

WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a single worker that pulls jobs off the specified queue. If no Job is found, the Worker will sleep for interval seconds.

func NewWorker

func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) (*Worker, error)

NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.

Worker defaults to a poll interval of 5 seconds, which can be overridden by WithWorkerPollInterval option. The default queue is the nameless queue "", which can be overridden by WithWorkerQueue option.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run pulls jobs off the Worker's queue at its interval. This function does not run in its own goroutine, so it’s possible to wait for completion. Use context cancellation to shut it down.

func (*Worker) WorkOne

func (w *Worker) WorkOne(ctx context.Context) (didWork bool)

WorkOne tries to consume single message from the queue.

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption defines a type that allows to set worker properties during the build-time.

func WithWorkerGracefulShutdown

func WithWorkerGracefulShutdown(handlerCtx func() context.Context) WorkerOption

WithWorkerGracefulShutdown enables graceful shutdown mode in the worker. When graceful shutdown is enabled - worker does not propagate cancel context to Job, as a result worker is waiting for the Job being currently executed and only then shuts down. Use this mode carefully, as Job handler is not aware anymore of the worker context state and dependencies may already be cancelled/closed, so it is up to the job to ensure everything is still working. Values of the original context are not propagated to the handler context as well when the graceful mode is enabled.

Use "handlerCtx" to set up custom handler context. When set to nil - defaults to context.Background().

func WithWorkerHooksJobDone

func WithWorkerHooksJobDone(hooks ...HookFunc) WorkerOption

WithWorkerHooksJobDone sets hooks that are called when worker finished working the job, right before the successfully executed job will be removed or errored job handler will be called to decide if the Job will be re-queued or discarded. Error field is set for the cases when the job was worked with an error.

func WithWorkerHooksJobLocked

func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption

WithWorkerHooksJobLocked sets hooks that are called right after the job was polled from the DB. Depending on the polling results hook will have either error or job set, but not both. If the error field is set - no other lifecycle hooks will be called for the job.

func WithWorkerHooksJobUndone added in v5.6.0

func WithWorkerHooksJobUndone(hooks ...HookFunc) WorkerOption

WithWorkerHooksJobUndone sets hooks that are called when worker fails to mark the job as done. This is an exceptional situation, most likely caused by transaction failed to be committed. Hook implementation MUST NOT rely on the transaction provided by the job as it may already be marked as failed.

func WithWorkerHooksUnknownJobType

func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption

WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type. Error field for this event type is always set since this is an error situation. If this hook is called - no other lifecycle hooks will be called for the job. When the handler for unknown job types is set with WithWorkerUnknownJobWorkFunc - these hooks are never called as the job is handled in the regular way using that handler.

func WithWorkerID

func WithWorkerID(id string) WorkerOption

WithWorkerID sets worker ID for easier identification in logs

func WithWorkerJobTTL added in v5.3.0

func WithWorkerJobTTL(d time.Duration) WorkerOption

WithWorkerJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context, so it is up to the job implementation to handle context cancellation properly.

func WithWorkerLogger

func WithWorkerLogger(logger adapter.Logger) WorkerOption

WithWorkerLogger sets Logger implementation to worker

func WithWorkerMeter

func WithWorkerMeter(meter metric.Meter) WorkerOption

WithWorkerMeter sets metric.Meter instance to the worker.

func WithWorkerPanicStackBufSize

func WithWorkerPanicStackBufSize(size int) WorkerOption

WithWorkerPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.

func WithWorkerPollInterval

func WithWorkerPollInterval(d time.Duration) WorkerOption

WithWorkerPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.

func WithWorkerPollStrategy

func WithWorkerPollStrategy(s PollStrategy) WorkerOption

WithWorkerPollStrategy overrides default poll strategy with given value

func WithWorkerQueue

func WithWorkerQueue(queue string) WorkerOption

WithWorkerQueue overrides default worker queue name with the given value.

func WithWorkerSpanWorkOneNoJob added in v5.2.1

func WithWorkerSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerOption

WithWorkerSpanWorkOneNoJob enables tracing span generation for every try to get one. When set to true - generates a span for every DB poll, even when no job was acquired. This may generate a lot of empty spans, but may help with some debugging, so use carefully.

func WithWorkerTracer

func WithWorkerTracer(tracer trace.Tracer) WorkerOption

WithWorkerTracer sets trace.Tracer instance to the worker.

func WithWorkerUnknownJobWorkFunc added in v5.6.0

func WithWorkerUnknownJobWorkFunc(wf WorkFunc) WorkerOption

WithWorkerUnknownJobWorkFunc sets the handler for unknown job types. When the handler is set - hooks set with WithWorkerHooksUnknownJobType are never called as the job is handled in the regular way.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of Workers, each working jobs from the queue at the specified interval using the WorkMap.

func NewWorkerPool

func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) (*WorkerPool, error)

NewWorkerPool creates a new WorkerPool with count workers using the Client c.

Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolPollInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.

func (*WorkerPool) Run

func (w *WorkerPool) Run(ctx context.Context) error

Run runs all the Workers in the WorkerPool in own goroutines. Run blocks until all workers exit. Use context cancellation for shutdown.

func (*WorkerPool) WorkOne

func (w *WorkerPool) WorkOne(ctx context.Context) (didWork bool)

WorkOne tries to consume single message from the queue.

type WorkerPoolOption

type WorkerPoolOption func(pool *WorkerPool)

WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.

func WithPoolGracefulShutdown

func WithPoolGracefulShutdown(handlerCtx func() context.Context) WorkerPoolOption

WithPoolGracefulShutdown enables graceful shutdown mode for all workers in the pool. See WithWorkerGracefulShutdown for details.

func WithPoolHooksJobDone

func WithPoolHooksJobDone(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksJobDone calls WithWorkerHooksJobDone for every worker in the pool.

func WithPoolHooksJobLocked

func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksJobLocked calls WithWorkerHooksJobLocked for every worker in the pool.

func WithPoolHooksJobUndone added in v5.6.0

func WithPoolHooksJobUndone(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksJobUndone calls WithWorkerHooksJobUndone for every worker in the pool.

func WithPoolHooksUnknownJobType

func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption

WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool. When the handler for unknown job types is set with WithPoolUnknownJobWorkFunc - these hooks are never called as the job is handled in the regular way using that handler.

func WithPoolID

func WithPoolID(id string) WorkerPoolOption

WithPoolID sets worker pool ID for easier identification in logs

func WithPoolJobTTL added in v5.3.0

func WithPoolJobTTL(d time.Duration) WorkerPoolOption

WithPoolJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context, so it is up to the job implementation to handle context cancellation properly.

func WithPoolLogger

func WithPoolLogger(logger adapter.Logger) WorkerPoolOption

WithPoolLogger sets Logger implementation to worker pool

func WithPoolMeter

func WithPoolMeter(meter metric.Meter) WorkerPoolOption

WithPoolMeter sets metric.Meter instance to every worker in the pool.

func WithPoolPanicStackBufSize

func WithPoolPanicStackBufSize(size int) WorkerPoolOption

WithPoolPanicStackBufSize sets max size for the stacktrace buffer for panicking jobs. Default value is 1024 that is enough for most of the cases. Be careful setting buffer suze to the big values as this may affect overall performance.

func WithPoolPollInterval

func WithPoolPollInterval(d time.Duration) WorkerPoolOption

WithPoolPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.

func WithPoolPollStrategy

func WithPoolPollStrategy(s PollStrategy) WorkerPoolOption

WithPoolPollStrategy overrides default poll strategy with given value

func WithPoolQueue

func WithPoolQueue(queue string) WorkerPoolOption

WithPoolQueue overrides default worker queue name with the given value.

func WithPoolSpanWorkOneNoJob added in v5.2.1

func WithPoolSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerPoolOption

WithPoolSpanWorkOneNoJob enables tracing span generation for every try to get one. When set to true - generates a span for every DB poll, even when no job was acquired. This may generate a lot of empty spans, but may help with some debugging, so use carefully.

func WithPoolTracer

func WithPoolTracer(tracer trace.Tracer) WorkerPoolOption

WithPoolTracer sets trace.Tracer instance to every worker in the pool.

func WithPoolUnknownJobWorkFunc added in v5.6.0

func WithPoolUnknownJobWorkFunc(wf WorkFunc) WorkerPoolOption

WithPoolUnknownJobWorkFunc sets the handler for unknown job types. When the handler is set - hooks set with WithPoolHooksUnknownJobType are never called as the job is handled in the regular way.

Directories

Path Synopsis
zap

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL