Documentation ¶
Index ¶
- func DefaultLogger() zerolog.Logger
- type ConnProvider
- type Enqueuer
- type Job
- func (j *Job) Args() []interface{}
- func (j *Job) Class() string
- func (j *Job) CreatedAt() time.Time
- func (j *Job) EnqueuedAt() time.Time
- func (j *Job) ID() string
- func (j *Job) Queue() string
- func (j *Job) Retry() bool
- func (j *Job) RetryTimes() int
- func (j *Job) SetArgs(args []interface{}) *Job
- func (j *Job) SetClass(c string) *Job
- func (j *Job) SetCreatedAt(t time.Time) *Job
- func (j *Job) SetID(id string) *Job
- func (j *Job) SetQueue(q string) *Job
- func (j *Job) SetRetry(retry bool) *Job
- func (j *Job) SetRetryTimes(n int) *Job
- type Node
- type OrderedQueueSet
- type QueueSet
- type RandomQueueSet
- type Worker
- type WorkerFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultLogger ¶
DefaultLogger returns a JSON logger, writing to stdout, with a timestamp and a minimum level of info.
Types ¶
type ConnProvider ¶
type ConnProvider interface { // Conn returns a connection, which can come from a shared pool. The caller will call Close on the connection when // it is done with it. Conn(context.Context) (redis.Conn, error) // DialLongPoll returns a new, dedicated connection, with a long read timeout and a normal write timeout. The caller // will close the connection. DialLongPoll(context.Context) (redis.Conn, error) }
ConnProvider provides Redis connections, while encapsulating the process of establishing and configuring the connections. It is safe for concurrent use.
The provided Context should only affect the process of establishing a connection. If the context expires afterwards, it should not affect the use of the connection.
type Enqueuer ¶
type Enqueuer struct {
// contains filtered or unexported fields
}
Enqueuer puts jobs in queues.
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
func (*Job) EnqueuedAt ¶
func (*Job) RetryTimes ¶
RetryTimes returns the number of times the job should be retried, or 0 if the default value should be used.
func (*Job) SetRetryTimes ¶
SetRetryTimes configures the number of times the job should be retried. The minimum allowed value is 0 and the maximum 100. Values outside of that range will be ignored.
Calling this function always enables retries, because 0 represents the default value for the number of retries. To disable retries, use SetRetry(false).
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
A Node represents a single server instance processing as many queues with as many Worker instances as are needed.
func NewNode ¶
func NewNode(log zerolog.Logger, cp ConnProvider, longPollTimeout int) *Node
NewNode returns a new instance.
func (*Node) ProcessQueues ¶
ProcessQueues configures the Node to process the given set of queues using the desired number of Worker instances.
You can call ProcessQueues many times with different sets of queues and Workers. Do not call it any more after calling Run.
func (*Node) Run ¶
func (n *Node) Run()
Run starts the process of getting jobs from queues and passing them to Workers. It blocks until the Node is shut down. See Stop for more.
func (*Node) Stop ¶
Stop initiates worker shutdown. Once the shutdown process is complete, the call to Run will return.
New jobs will not be taken from queues any more.
Workers that are currently processing jobs will be given a grace period and allowed to finish. Once the Context provided to Stop expires, the Context passed to every Worker will be cancelled.
Stop blocks until the shutdown process has completed.
type OrderedQueueSet ¶
type OrderedQueueSet []string
An OrderedQueueSet always returns the queues in the desired order.
func (OrderedQueueSet) GetQueues ¶
func (qs OrderedQueueSet) GetQueues() []string
GetQueues implements QueueSet.
func (OrderedQueueSet) Names ¶
func (qs OrderedQueueSet) Names() []string
Names implements QueueSet.
type QueueSet ¶
type QueueSet interface { // GetQueues returns the queues sorted by the desired priority. GetQueues() []string // Names returns a list of queue names in the same order as they were configured, ignoring the strategy of the set, // such as randomization, for example. This is currently used for logging. Names() []string }
A QueueSet implements a strategy for deciding which queues should be checked first by a group of workers. The set will be consulted every time there is a need to get the next job, so it is OK to return a different slice on every call to GetQueues.
type RandomQueueSet ¶
type RandomQueueSet struct {
// contains filtered or unexported fields
}
A RandomQueueSet returns the queues in random order, with the likelihood of each queue being first based on their relative weights.
func NewRandomQueueSet ¶
func NewRandomQueueSet() *RandomQueueSet
NewRandomQueueSet returns a new instance.
func (*RandomQueueSet) Add ¶
func (qs *RandomQueueSet) Add(q string, weight int)
Add adds a queue with the given relative weight.
Example:
qs.Add("low_priority", 1) qs.Add("high_priority", 3)
The "low_priority" queue has a 25% chance of being checked first: 1 / (1 + 3).
The "high_priority" queue has a 75% chance of being checked first: 3 / (1 + 3).
func (*RandomQueueSet) GetQueues ¶
func (qs *RandomQueueSet) GetQueues() []string
GetQueues implements QueueSet.
func (*RandomQueueSet) Names ¶
func (qs *RandomQueueSet) Names() []string
Names implements QueueSet.