Documentation
¶
Index ¶
- Variables
- func Cancelled(cx Connection, job *Job) (err error)
- func Error(cx Connection, job *Job, userError error) (err error)
- func IsCancelled(cx Connection, job *Job) (b bool, err error)
- func Reap(cx Connection, queues []Queue) (_ int64, err error)
- func Success(cx Connection, job *Job) (err error)
- func WithKeepAlive(n time.Duration) func(*JobDescription)
- func WithMaxRetries(n int) func(*JobDescription)
- func WithParameters(params []byte) func(*JobDescription)
- func WithPriority(p int) func(*JobDescription)
- func WithRetention(dur time.Duration) func(*JobDescription)
- func WithTypeName(names []string) func(*DequeueFilters)
- type Connection
- type DequeueFilters
- type Handler
- type HandlerFunc
- type Job
- func AttachLogger(be LogBackend, job *Job) *Job
- func AttachPinger(cx Connection, job *Job) *Job
- func AttachResultWriter(cx Connection, job *Job) *Job
- func Dequeue(db *sql.DB, queues []Queue, filterFuncs ...func(*DequeueFilters)) (_ *Job, err error)
- func Enqueue(cx Connection, queue Queue, desc *JobDescription) (_ *Job, err error)
- type JobDescription
- type JobState
- type LogBackend
- type LogBackendAdapter
- type LogLevel
- type Logger
- func (log *Logger) Debug(msg string)
- func (log *Logger) Debugf(msg string, args ...interface{})
- func (log *Logger) Error(msg string)
- func (log *Logger) Errorf(msg string, args ...interface{})
- func (log *Logger) Info(msg string)
- func (log *Logger) Infof(msg string, args ...interface{})
- func (log *Logger) Warn(msg string)
- func (log *Logger) Warnf(msg string, args ...interface{})
- type Pinger
- type Queue
Constants ¶
This section is empty.
Variables ¶
var ( // ErrJobStateMismatch is returned if an optimistic locking failure occurs when transitioning // a job between states. It usually indicates that some other has updated the state of the job // after we have read it from the store. ErrJobStateMismatch = errors.New("job state mismatch") )
var ( // ErrSkipRetry must be used by the job handler routine to signal // that the job must not be retried (even when there are attempts left) ErrSkipRetry = errors.New("sqlq: skip retry") )
Functions ¶
func Cancelled ¶
func Cancelled(cx Connection, job *Job) (err error)
Cancelled transitions the job to CANCELLED state and mark it as completed
func Error ¶
func Error(cx Connection, job *Job, userError error) (err error)
Error transitions the job to ERROR state and mark it as completed. If the error is retryable (and there are still attempts left), we bump the attempt count and transition it to PENDING to be picked up again by a worker.
func IsCancelled ¶
func IsCancelled(cx Connection, job *Job) (b bool, err error)
IsCancelled checks the current job state searching for a cancelling state, if so returns true
func Reap ¶
func Reap(cx Connection, queues []Queue) (_ int64, err error)
Reap reaps any zombie process, processes where state is 'running' but the job hasn't pinged in a while, in the given queues. It moves any job with remaining attempts back to the queue while dumping all others in to the errored state.
func Success ¶
func Success(cx Connection, job *Job) (err error)
Success transitions the job to SUCCESS state and mark it as completed.
func WithKeepAlive ¶
func WithKeepAlive(n time.Duration) func(*JobDescription)
WithKeepAlive sets the keepalive ping duration for the job.
func WithMaxRetries ¶
func WithMaxRetries(n int) func(*JobDescription)
WithMaxRetries sets the maximum retry limit for the job.
func WithParameters ¶
func WithParameters(params []byte) func(*JobDescription)
WithParameters is used to pass additional parameters / arguments to the job. Note that params must be a JSON-encoded value.
func WithPriority ¶
func WithPriority(p int) func(*JobDescription)
WithPriority sets the job's priority.
func WithRetention ¶
func WithRetention(dur time.Duration) func(*JobDescription)
WithRetention sets the retention policy for a job. A completed job will be cleaned up after its retention policy expires.
func WithTypeName ¶
func WithTypeName(names []string) func(*DequeueFilters)
Types ¶
type Connection ¶
type Connection interface { // QueryContext executes a query that returns rows. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) // ExecContext executes a query that doesn't return rows. ExecContext(context.Context, string, ...interface{}) (sql.Result, error) }
Connection is a utility abstraction over sql connection / pool / transaction objects. This is so that the semantics of the operation (should we roll back on error?) are defined by the caller of the routine and not decided by the routine itself.
This allows user to run Enqueue() (and other) operations in, say, a common transaction, with the rest of the business logic. It ensures that the job is only queued if the business logic finishes successfully and commits, else the Enqueue() is also rolled back.
type DequeueFilters ¶
type DequeueFilters struct {
// contains filtered or unexported fields
}
DequeueFilters are a set of optional filters that can be used with Dequeue()
type Handler ¶
type Handler interface { // Process implements the job's logic to process the given job. // // It should return nil if the processing of the job is successful. // // If Process() returns an error or panics, the job is marked as failed. Process(ctx context.Context, job *Job) error }
Handler is the user-provided implementation of the job's business logic.
type HandlerFunc ¶
type Job ¶
type Job struct { ID uuid.UUID `db:"id"` Queue Queue `db:"queue"` TypeName string `db:"typename"` Priority int `db:"priority"` Status JobState `db:"status"` Parameters []byte `db:"parameters"` Result []byte `db:"result"` CreatedAt time.Time `db:"created_at"` StartedAt sql.NullTime `db:"started_at"` CompletedAt sql.NullTime `db:"completed_at"` RunAfter time.Duration `db:"run_after"` RetentionTTL time.Duration `db:"retention_ttl"` LastQueuedAt sql.NullTime `db:"last_queued_at"` KeepAlive time.Duration `db:"keepalive_interval"` LastKeepAlive sql.NullTime `db:"last_keepalive"` MaxRetries int `db:"max_retries"` Attempt int `db:"attempt"` // contains filtered or unexported fields }
Job represents an instance of a task / job in a queue.
func AttachLogger ¶
func AttachLogger(be LogBackend, job *Job) *Job
AttachLogger attaches a new Logger to the given job, that logs to the provided backend.
func AttachPinger ¶
func AttachPinger(cx Connection, job *Job) *Job
AttachPinger attaches a new Pinger to the given job.
func AttachResultWriter ¶
func AttachResultWriter(cx Connection, job *Job) *Job
AttachResultWriter attaches a result writer to the given job, using the provided Connection as the backend to write to.
func Dequeue ¶
Dequeue dequeues a single job from one of the provided queues.
It takes into account several factors such as a queue's concurrency and priority settings as well a job's priority. It ensures that by executing this job, the queue's concurrency setting would not be violated.
func Enqueue ¶
func Enqueue(cx Connection, queue Queue, desc *JobDescription) (_ *Job, err error)
Enqueue enqueues a new job in the given queue based on provided description.
The queue is created if it doesn't already exist. By default, a job is created in 'pending' state. It returns the newly created job instance.
func (*Job) Logger ¶
Logger returns an instance of sqlq.Logger service that manages user-emitted logs for this job. A logger is only available when running in the context of a runtime. Trying to call Logger() in any other context would cause a panic().
func (*Job) Pinger ¶
Pinger returns an instance of sqlq.Pinger service that sends keepalive pings for the job. A pinger is only available when running in the context of a runtime. Trying to call Pinger() in any other context would cause a panic().
func (*Job) ResultWriter ¶
func (job *Job) ResultWriter() io.WriteCloser
ResultWriter returns an io.WriteCloser that can be used to save the result of a job. User must close the writer to actually save the data. A result writer is only available when running in the context of a runtime. Trying to call ResultWriter() in any other context would cause a panic().
func (*Job) SendKeepAlive ¶
SendKeepAlive is a utility method that sends periodic keep-alive pings, every d duration.
This function starts an infinite for-loop that periodically sends ping using job.Pinger(). Call this function in a background goroutine, like:
go job.SendKeepAlive(ctx, job.KeepAlive)
To stop sending pings, cancel the context and the function will return.
type JobDescription ¶
type JobDescription struct {
// contains filtered or unexported fields
}
JobDescription describes a job to be enqueued. Note that it is just a set of options (that closely resembles) for a job, and not an actual instance of a job. It is used by Enqueue() to create a new Job.
func NewJobDesc ¶
func NewJobDesc(typeName string, opts ...func(*JobDescription)) *JobDescription
NewJobDesc creates a new JobDescription for a job with given typename and with the provided opts.
type JobState ¶
type JobState uint
JobState represents the status a job is in and serves as the basis of a job's state machine.
+-----------+ Enqueue()----------| PENDING |----------------+ +-----------+ | | | | | +-----|-----+ +---|----+ | RUNNING |------------| Retry? | +-----------+ +--------+ | | | | | | +-----|-----+ +---|-----+ | SUCCESS | | ERRORED | +-----------+ +---------+
A job starts in the PENDING state when it is Enqueue()'d. A worker would than pick it up and transition it to RUNNING. If the job finishes without any error, it is moved to SUCCESS. If an error occurs and runtime determines that the job can be retried, it will move it back to the PENDING state, else it will move it to the ERRORED state.
type LogBackend ¶
type LogBackend interface { // Write writes the message at given level for the given job Write(job *Job, level LogLevel, msg string) (int, error) }
LogBackend service provides the backend / sink implementation where the log messages are displayed and / or persisted.
type LogBackendAdapter ¶
LogBackendAdapter is a utility type to use complying functions are log backends.
type LogLevel ¶
type LogLevel int
LogLevel defines the logging levels used when sending log messages
type Logger ¶
type Logger struct {
// contains filtered or unexported fields
}
Logger is used to log messages from a job's handler to a given backend
func NewLogger ¶
func NewLogger(job *Job, be LogBackend) *Logger
NewLogger returns a new Logger for the given Job, which write logs to the provided backend.