Documentation ¶
Index ¶
- Variables
- func Dequeue(db DB, action string, opts ...Option) ([]byte, func() error, error)
- func Enqueue(db DB, action string, payload encoding.TextMarshaler, opts ...Option) (int64, error)
- func Length(db DB, action string) (int, error)
- func MigrateDB(db DB) error
- func RequeueExpired(db DB) (int, error)
- type DB
- type Option
- type Task
Constants ¶
This section is empty.
Variables ¶
var DefaultMaxTries int = 10
DefaultMaxTries permanently fails the task after this many failed tries
var DefaultTimeout time.Duration = 2 * time.Minute
DefaultTimeout is the default for enqueued items after which the task is assumed to have failed and will requeue for retry
var NoTask error = errors.New("no task")
NoTask is returned when dequeue is called and no task is available for that action
Functions ¶
func Dequeue ¶
Dequeue returns the oldest unclaimed task payload for action and a function to call on successful completion, achieving an exclusive lock on the task for a TTL. Dequeue will return nil and error of type NoTask when the queue is empty for that action.
func Enqueue ¶
Enqueue creates a durable task record in the queue that will survive crashes and restarts
func Length ¶
Length returns the queue length for a particular action in a non-blocking read. Length can be used before Dequeue to prevent lock contention.
func RequeueExpired ¶
RequeueExpired will re-add tasks to the queue when their TTL has expired. Note that you do not have to call this function manually unless you want to requeue immediately. Expired tasks are automatically requeued when a subsequent database operation is performed. For very low traffic queues, you may want to call this periodically to limit worst case delay before requeue.
Types ¶
type DB ¶
type DB interface { Exec(query string, args ...interface{}) (sql.Result, error) QueryRow(query string, args ...interface{}) *sql.Row }
DB is the minimal interface to the persistence layer. It works with sqlx, standard database/sql, or other abstraction layers like ~scooter/db
type Option ¶
type Option func(o *options)
func MaxTries ¶
MaxTries will fail the task after the specified number of tries. Set max tries to -1 to retry indefinitely.
func OnSuccess ¶
OnSuccess adds an additional function to be called after the task is successful. This can be used to queue a dependent task, finish clean up, or persist new state.
type Task ¶
type Task struct { ID int64 Action string Payload []byte Expires int64 Nbf int64 Tries int MaxTries int `db:"max_tries"` Parent sql.NullInt64 }
Task represents a task for a particular action with a payload stored in blob. Task processors are responsible for deserializing payloads for each action type. Successful tasks processors should call `OnSuccess` to remove the task from the queue.