qg

package module
v5.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: MIT Imports: 16 Imported by: 0

README

qg

test GitHub license Go Report Card

Ruby Que implementation in Go. This library is almost a fork of que-go, the great work of bgentry.

Why created

First of all, Que, and it's Go port que-go are really great libraries, which can simplify small to mid scale application with some sort of asynchronous tasks/jobs by avoiding to add another moving part if you are already using PostgreSQL as main RDBMS. However, as I use que-go to develop my application written in Go, there are some functionalities that que-go doesn't provide. The following is an list of functionalities I'm going to add to qg.

  • database/sql compatible version of enqueue functions so that many other database libraries can work with it.
  • Transaction can be injected to a Job to make WorkFunc tests much easier.
  • Customizable Job.Delete(), Job.Error() to give more flexibility.
  • Synchronous execution option in Client.Enqueue and Client.EnqueueInTx for easy development.
  • Better logger interface to be able to switch whatever loggers developers want.

This library is still under heavy development, and might significantly change APIs.

Test

docker compose up -d
make db
make table
make test

Great Resources

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAgain = errors.New("maximum number of LockJob attempts reached")

ErrAgain Returned by LockJob if a job could not be retrieved from the queue after several attempts because of concurrently running transactions. This error should not be returned unless the queue is under extremely heavy concurrency.

View Source
var ErrMissingType = errors.New("job type must be specified")

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

Functions

func GetConnector

func GetConnector(host string, port int, username string, password string, database string) (driver.Connector, error)

func GetConnectorFromConnStr

func GetConnectorFromConnStr(connStr string, opts ...stdlib.OptionOpenDB) (driver.Connector, error)

func GetConnectorFromURL

func GetConnectorFromURL(u *url.URL) (driver.Connector, error)

func PrepareStatements

func PrepareStatements(ctx context.Context, conn *pgx.Conn) error

PrepareStatements prepar statements

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Que client that can add jobs to the queue and remove jobs from the queue.

func MustNewClient

func MustNewClient(pool *sql.DB) *Client

MustNewClient creates a new Client. Panic if the initialization fails.

func NewClient

func NewClient(pool *sql.DB) (*Client, error)

NewClient creates a new Client.

func (*Client) Close

func (c *Client) Close()

Close disposes all the resources associated to the client

func (*Client) Enqueue

func (c *Client) Enqueue(j *Job) error

Enqueue adds a job to the queue.

func (*Client) EnqueueInTx

func (c *Client) EnqueueInTx(j *Job, tx *sql.Tx) error

EnqueueInTx adds a job to the queue within the scope of the transaction tx. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) LockJob

func (c *Client) LockJob(queue string) (*Job, error)

Same as LockJobContext function (without context.Context).

func (*Client) LockJobContext

func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)

LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, a session-level Postgres advisory lock is created for the Job's ID. If no job is found, nil will be returned instead of an error.

Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock.

After the Job has been worked, you must call either Done() or Error() on it in order to return the database connection to the pool and remove the lock.

func (*Client) Stats

func (c *Client) Stats() (results []JobStats, err error)

Stats retrieves the stats of all the queues

type ConnWrapper

type ConnWrapper interface {
	WrappedConn() driver.Conn
}

driver.Conn Wrapper

type Conner

type Conner interface {
	Queryer
	Begin() (*sql.Tx, error)
	Close() error
}

Conner is interface for conn

type Job

type Job struct {
	// ID is the unique database ID of the Job. It is ignored on job creation.
	ID int64

	// Queue is the name of the queue. It defaults to the empty queue "".
	Queue string

	// Priority is the priority of the Job. The default priority is 100, and a
	// lower number means a higher priority. A priority of 5 would be very
	// important.
	Priority int16

	// RunAt is the time that this job should be executed. It defaults to now(),
	// meaning the job will execute immediately. Set it to a value in the future
	// to delay a job's execution.
	RunAt time.Time

	// Type corresponds to the Ruby job_class. If you are interoperating with
	// Ruby, you should pick suitable Ruby class names (such as MyJob).
	Type string

	// Args must be the bytes of a valid JSON string
	Args []byte

	// ErrorCount is the number of times this job has attempted to run, but
	// failed with an error. It is ignored on job creation.
	ErrorCount int32

	// LastError is the error message or stack trace from the last time the job
	// failed. It is ignored on job creation.
	LastError sql.NullString
	// contains filtered or unexported fields
}

Job is a single unit of work for Que to perform.

func TestInjectJobConn

func TestInjectJobConn(j *Job, conn *sql.Conn) *Job

// TestInjectJobConn injects *pgx.Conn to Job

func TestInjectJobTx

func TestInjectJobTx(j *Job, tx Txer) *Job

TestInjectJobTx injects tx to Job

func (*Job) Conn deprecated

func (j *Job) Conn() *sql.Conn

Conn returns transaction

Deprecated: This is an internal method. DON'T USE IT.

func (*Job) Delete

func (j *Job) Delete() error

Same as DeleteContext function (without context.Context).

func (*Job) DeleteContext

func (j *Job) DeleteContext(ctx context.Context) error

Delete marks this job as complete by deleting it form the database.

You must also later call Done() to return this job's database connection to the pool.

func (*Job) Done

func (j *Job) Done()

Same as DoneContext function (without context.Context).

func (*Job) DoneContext

func (j *Job) DoneContext(ctx context.Context)

Done releases the Postgres advisory lock on the job and returns the database connection to the pool.

func (*Job) Error

func (j *Job) Error(msg string) error

Same as ErrorContext function (without context.Context).

func (*Job) ErrorContext

func (j *Job) ErrorContext(ctx context.Context, msg string) error

Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.

You must also later call Done() to return this job's database connection to the pool.

func (*Job) Tx

func (j *Job) Tx() Txer

Tx returns transaction

type JobStats

type JobStats struct {
	Queue             string
	Type              string
	Count             int
	CountWorking      int
	CountErrored      int
	HighestErrorCount int
	OldestRunAt       time.Time
}

JobStats stores the statistics information for the queue and type

type Queryer

type Queryer interface {
	Exec(string, ...interface{}) (sql.Result, error)
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	Query(string, ...interface{}) (*sql.Rows, error)
	QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
	QueryRow(string, ...interface{}) *sql.Row
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

Queryer is interface for query

type Txer

type Txer interface {
	Queryer
	Commit() error
	Rollback() error
}

Txer is interface for tx

type WorkFunc

type WorkFunc func(j *Job) error

WorkFunc is a function that performs a Job. If an error is returned, the job is reenqueued with exponential backoff.

type WorkMap

type WorkMap map[string]WorkFunc

WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.

type Worker

type Worker struct {
	// Interval is the amount of time that this Worker should sleep before trying
	// to find another Job.
	Interval time.Duration

	// Queue is the name of the queue to pull Jobs off of. The default value, "",
	// is usable and is the default for both que-go and the ruby que library.
	Queue string
	// contains filtered or unexported fields
}

Worker is a single worker that pulls jobs off the specified Queue. If no Job is found, the Worker will sleep for Interval seconds.

func NewWorker

func NewWorker(c *Client, m WorkMap) *Worker

NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.

Workers default to an Interval of 5 seconds, which can be overridden by setting the environment variable QUE_WAKE_INTERVAL. The default Queue is the nameless queue "", which can be overridden by setting QUE_QUEUE. Either of these settings can be changed on the returned Worker before it is started with Work().

func (*Worker) Shutdown

func (w *Worker) Shutdown()

Shutdown tells the worker to finish processing its current job and then stop. There is currently no timeout for in-progress jobs. This function blocks until the Worker has stopped working. It should only be called on an active Worker.

func (*Worker) Work

func (w *Worker) Work()

Work pulls jobs off the Worker's Queue at its Interval. This function only returns after Shutdown() is called, so it should be run in its own goroutine.

func (*Worker) WorkOne

func (w *Worker) WorkOne() (didWork bool)

WorkOne work on job

type WorkerPool

type WorkerPool struct {
	WorkMap  WorkMap
	Interval time.Duration
	Queue    string
	// contains filtered or unexported fields
}

WorkerPool is a pool of Workers, each working jobs from the queue Queue at the specified Interval using the WorkMap.

func NewWorkerPool

func NewWorkerPool(c *Client, wm WorkMap, count int) *WorkerPool

NewWorkerPool creates a new WorkerPool with count workers using the Client c.

func (*WorkerPool) Shutdown

func (w *WorkerPool) Shutdown()

Shutdown sends a Shutdown signal to each of the Workers in the WorkerPool and waits for them all to finish shutting down.

func (*WorkerPool) Start

func (w *WorkerPool) Start()

Start starts all of the Workers in the WorkerPool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL