Documentation
¶
Index ¶
- Variables
- type AckOptions
- type BackoffFunc
- type BulkDequeuer
- type BulkEnqueuer
- type BulkJobFinder
- type ContextHandleFunc
- type DequeueFunc
- type DequeueMiddleware
- type DequeueOptions
- type Dequeuer
- type EnqueueFunc
- type EnqueueMiddleware
- type EnqueueOptions
- type Enqueuer
- type ExternalBulkEnqueuer
- type ExternalEnqueuer
- type FindOptions
- type HandleFunc
- type HandleMiddleware
- type InvalidJobPayloadError
- type Job
- type JobOptions
- type Metrics
- type MetricsExporter
- type OnceJobOptions
- type Queue
- type QueueMetrics
- type QueueMetricsOptions
- type RedisQueue
- type Worker
- func (w *Worker) ExportMetrics() (*Metrics, error)
- func (w *Worker) Register(queueID string, h HandleFunc, opt *JobOptions) error
- func (w *Worker) RegisterWithContext(queueID string, h ContextHandleFunc, opt *JobOptions) error
- func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFunc, opt *OnceJobOptions) error
- func (w *Worker) Start()
- func (w *Worker) Stop()
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyNamespace = errors.New("work: empty namespace") ErrEmptyQueueID = errors.New("work: empty queue id") ErrAt = errors.New("work: at should not be zero") ErrInvisibleSec = errors.New("work: invisible sec should be >= 0") )
options validation errors
var ( ErrMaxExecutionTime = errors.New("work: max execution time should be > 0") ErrNumGoroutines = errors.New("work: number of goroutines should be > 0") ErrIdleWait = errors.New("work: idle wait should be > 0") )
options validation error
var ( // ErrDoNotRetry is returned if the job should not be retried; // this may be because the job is unrecoverable, or because // the handler has already rescheduled it. ErrDoNotRetry = errors.New("work: do not retry") // ErrUnrecoverable is returned if the error is unrecoverable. // The job will be discarded. ErrUnrecoverable = fmt.Errorf("work: permanent error: %w", ErrDoNotRetry) )
retry error
var ( // ErrEmptyQueue is returned if Dequeue() is called on an empty queue. ErrEmptyQueue = errors.New("work: no job is found") )
Functions ¶
This section is empty.
Types ¶
type AckOptions ¶
AckOptions specifies how a job is deleted from a queue.
func (*AckOptions) Validate ¶
func (opt *AckOptions) Validate() error
Validate validates AckOptions.
type BackoffFunc ¶ added in v0.1.12
type BackoffFunc func(*Job, *DequeueOptions) time.Duration
BackoffFunc computes when to retry this job from now.
type BulkDequeuer ¶
type BulkDequeuer interface { BulkDequeue(int64, *DequeueOptions) ([]*Job, error) BulkAck([]*Job, *AckOptions) error }
BulkDequeuer dequeues jobs in a batch.
type BulkEnqueuer ¶
type BulkEnqueuer interface {
BulkEnqueue([]*Job, *EnqueueOptions) error
}
BulkEnqueuer enqueues jobs in a batch.
type BulkJobFinder ¶ added in v0.1.4
type BulkJobFinder interface {
BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error)
}
BulkJobFinder finds jobs by ids. It allows third-party tools to get job status, or modify job by re-enqueue. It returns nil if the job is no longer in the queue. The length of the returned job list will be equal to the length of jobIDs.
type ContextHandleFunc ¶ added in v0.1.4
type ContextHandleFunc func(context.Context, *Job, *DequeueOptions) error
ContextHandleFunc runs a job.
type DequeueFunc ¶
type DequeueFunc func(*DequeueOptions) (*Job, error)
DequeueFunc generates a job.
type DequeueMiddleware ¶
type DequeueMiddleware func(DequeueFunc) DequeueFunc
DequeueMiddleware modifies DequeueFunc behavior.
type DequeueOptions ¶
type DequeueOptions struct { // Namespace is the namespace of a queue. Namespace string // QueueID is the id of a queue. QueueID string // At is the current time of the dequeuer. // Any job that is scheduled before this can be executed. At time.Time // After the job is dequeued, no other dequeuer can see this job for a while. // InvisibleSec controls how long this period is. InvisibleSec int64 }
DequeueOptions specifies how a job is dequeued.
func (*DequeueOptions) Validate ¶
func (opt *DequeueOptions) Validate() error
Validate validates DequeueOptions.
type Dequeuer ¶
type Dequeuer interface { Dequeue(*DequeueOptions) (*Job, error) Ack(*Job, *AckOptions) error }
Dequeuer dequeues a job. If a job is processed successfully, call Ack() to delete the job.
type EnqueueFunc ¶
type EnqueueFunc func(*Job, *EnqueueOptions) error
EnqueueFunc takes in a job for processing.
type EnqueueMiddleware ¶
type EnqueueMiddleware func(EnqueueFunc) EnqueueFunc
EnqueueMiddleware modifies EnqueueFunc behavior.
type EnqueueOptions ¶
type EnqueueOptions struct { // Namespace is the namespace of a queue. Namespace string // QueueID is the id of a queue. QueueID string }
EnqueueOptions specifies how a job is enqueued.
func (*EnqueueOptions) Validate ¶
func (opt *EnqueueOptions) Validate() error
Validate validates EnqueueOptions.
type Enqueuer ¶
type Enqueuer interface {
Enqueue(*Job, *EnqueueOptions) error
}
Enqueuer enqueues a job.
type ExternalBulkEnqueuer ¶ added in v0.1.10
type ExternalBulkEnqueuer interface {
ExternalBulkEnqueue([]*Job, *EnqueueOptions) error
}
ExternalBulkEnqueuer enqueues jobs in a batch with other queue protocol. Queue adaptor that implements this can publish jobs directly to other types of queue systems.
type ExternalEnqueuer ¶ added in v0.1.10
type ExternalEnqueuer interface {
ExternalEnqueue(*Job, *EnqueueOptions) error
}
ExternalEnqueuer enqueues a job with other queue protocol. Queue adaptor that implements this can publish jobs directly to other types of queue systems.
type FindOptions ¶ added in v0.1.4
type FindOptions struct {
Namespace string
}
FindOptions specifies how a job is searched from a queue.
func (*FindOptions) Validate ¶ added in v0.1.4
func (opt *FindOptions) Validate() error
Validate validates FindOptions.
type HandleMiddleware ¶
type HandleMiddleware func(HandleFunc) HandleFunc
HandleMiddleware modifies HandleFunc hehavior.
type InvalidJobPayloadError ¶ added in v0.1.3
type InvalidJobPayloadError struct {
Err error
}
InvalidJobPayloadError wraps json or msgpack decoding error.
func (*InvalidJobPayloadError) Error ¶ added in v0.1.3
func (e *InvalidJobPayloadError) Error() string
type Job ¶
type Job struct { // ID is the unique id of a job. ID string `msgpack:"id"` // CreatedAt is set to the time when NewJob() is called. CreatedAt time.Time `msgpack:"created_at"` // UpdatedAt is when the job is last executed. // UpdatedAt is set to the time when NewJob() is called initially. UpdatedAt time.Time `msgpack:"updated_at"` // EnqueuedAt is when the job will be executed next. // EnqueuedAt is set to the time when NewJob() is called initially. EnqueuedAt time.Time `msgpack:"enqueued_at"` // Payload is raw bytes. Payload []byte `msgpack:"payload"` // If the job previously fails, Retries will be incremented. Retries int64 `msgpack:"retries"` // If the job previously fails, LastError will be populated with error string. LastError string `msgpack:"last_error"` }
Job is a single unit of work.
func (*Job) MarshalJSONPayload ¶
MarshalJSONPayload encodes a variable into the JSON payload.
func (*Job) MarshalPayload ¶
MarshalPayload encodes a variable into the msgpack payload.
func (*Job) UnmarshalJSONPayload ¶
UnmarshalJSONPayload decodes the JSON payload into a variable.
func (*Job) UnmarshalPayload ¶
UnmarshalPayload decodes the msgpack payload into a variable.
type JobOptions ¶
type JobOptions struct { MaxExecutionTime time.Duration IdleWait time.Duration NumGoroutines int64 Backoff BackoffFunc DequeueMiddleware []DequeueMiddleware HandleMiddleware []HandleMiddleware }
JobOptions specifies how a job is executed.
func (*JobOptions) Validate ¶
func (opt *JobOptions) Validate() error
Validate validates JobOptions.
type Metrics ¶
type Metrics struct {
Queue []*QueueMetrics
}
Metrics wraps metrics reported by MetricsExporter.
type MetricsExporter ¶
type MetricsExporter interface {
GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error)
}
MetricsExporter can be implemented by Queue to report metrics.
type OnceJobOptions ¶ added in v0.1.12
type OnceJobOptions struct { MaxExecutionTime time.Duration Backoff BackoffFunc DequeueMiddleware []DequeueMiddleware HandleMiddleware []HandleMiddleware }
OnceJobOptions specifies how a job is executed.
func (*OnceJobOptions) Validate ¶ added in v0.1.12
func (opt *OnceJobOptions) Validate() error
Validate validates OnceJobOptions.
type QueueMetrics ¶
type QueueMetrics struct { Namespace string QueueID string // Total number of jobs that can be executed right now. ReadyTotal int64 // Total number of jobs that are scheduled to run in future. ScheduledTotal int64 // Processing delay from oldest ready job Latency time.Duration }
QueueMetrics contains metrics from a queue.
type QueueMetricsOptions ¶
QueueMetricsOptions specifies how to fetch queue metrics.
func (*QueueMetricsOptions) Validate ¶
func (opt *QueueMetricsOptions) Validate() error
Validate validates QueueMetricsOptions.
type RedisQueue ¶ added in v0.2.0
type RedisQueue interface { Queue BulkEnqueuer BulkDequeuer BulkJobFinder MetricsExporter }
RedisQueue implements Queue with other additional capabilities
func NewRedisQueue ¶
func NewRedisQueue(client redis.UniversalClient) RedisQueue
NewRedisQueue creates a new queue stored in redis.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker runs jobs.
func (*Worker) ExportMetrics ¶
ExportMetrics dumps queue stats if the queue implements MetricsExporter.
func (*Worker) Register ¶
func (w *Worker) Register(queueID string, h HandleFunc, opt *JobOptions) error
Register adds handler for a queue. queueID and namespace should be the same as the one used to enqueue.
func (*Worker) RegisterWithContext ¶ added in v0.1.4
func (w *Worker) RegisterWithContext(queueID string, h ContextHandleFunc, opt *JobOptions) error
RegisterWithContext adds handler for a queue with context.Context. queueID and namespace should be the same as the one used to enqueue. The context is created with context.WithTimeout set from MaxExecutionTime.
func (*Worker) RunOnce ¶ added in v0.1.12
func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFunc, opt *OnceJobOptions) error
RunOnce simply runs one job from a queue. The context is created with context.WithTimeout set from MaxExecutionTime.
This is used with kubernetes where a pod is created directly to run a job.
type WorkerOptions ¶
WorkerOptions is used to create a worker.