Documentation ¶
Index ¶
- Constants
- Variables
- type BackoffCalculator
- type Client
- func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error)
- func (c *Client) DeleteAllDeadJobs() error
- func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error
- func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error
- func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error
- func (c *Client) Queues() ([]*Queue, error)
- func (c *Client) RetryAllDeadJobs() error
- func (c *Client) RetryDeadJob(diedAt int64, jobID string) error
- func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error)
- func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error)
- func (c *Client) WorkerObservations() ([]*WorkerObservation, error)
- func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error)
- type ClientOption
- type DeadJob
- type Enqueuer
- func (e *Enqueuer) Enqueue(jobName string, args Q) (*Job, error)
- func (e *Enqueuer) EnqueueContext(ctx context.Context, jobName string, args Q) (*Job, error)
- func (e *Enqueuer) EnqueueContextIn(ctx context.Context, jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
- func (e *Enqueuer) EnqueueContextUnique(ctx context.Context, jobName string, args Q) (*Job, error)
- func (e *Enqueuer) EnqueueContextUniqueIn(ctx context.Context, jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
- func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)
- func (e *Enqueuer) EnqueueUnique(jobName string, args Q) (*Job, error)
- func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
- type GenericHandlerdeprecated
- type GenericMiddlewareHandlerdeprecated
- type Job
- type JobContextHandler
- type JobContextMiddleware
- type JobHandler
- type JobMiddleware
- type JobOptions
- type NextMiddlewareFunc
- type Pool
- type Q
- type Queue
- type ReapResult
- type ReaperHook
- type RetryJob
- type ScheduledJob
- type StdLoggerdeprecated
- type StructuredLogger
- type WatchdogStat
- type WorkerObservation
- type WorkerPool
- func (wp *WorkerPool) Drain()
- func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool
- func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool
- func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool
- func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Stop()
- func (wp *WorkerPool) WatchdogStats() []WatchdogStat
- type WorkerPoolHeartbeat
- type WorkerPoolOption
Constants ¶
const WatchdogFailCheckingTimeout = 60 * time.Second
WatchdogFailCheckingTimeout a default checking timeout that marks task as failed.
Variables ¶
var ErrNotDeleted = fmt.Errorf("nothing deleted")
ErrNotDeleted is returned by functions that delete jobs to indicate that although the redis commands were successful, no object was actually deleted by those commmands.
var ErrNotRetried = fmt.Errorf("nothing retried")
ErrNotRetried is returned by functions that retry jobs to indicate that although the redis commands were successful, no object was actually retried by those commmands.
Functions ¶
This section is empty.
Types ¶
type BackoffCalculator ¶ added in v0.5.1
You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.
The builtin backoff calculator provides an exponentially increasing wait function.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements all of the functionality of the web UI. It can be used to inspect the status of a running cluster and retry dead jobs.
func NewClient ¶
func NewClient(namespace string, pool Pool, opts ...ClientOption) *Client
NewClient creates a new Client with the specified redis namespace and connection pool.
func (*Client) DeadJobs ¶
DeadJobs returns a list of DeadJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of dead jobs is also returned.
func (*Client) DeleteAllDeadJobs ¶
DeleteAllDeadJobs deletes all dead jobs.
func (*Client) DeleteDeadJob ¶
DeleteDeadJob deletes a dead job from Redis.
func (*Client) DeleteRetryJob ¶
DeleteRetryJob deletes a job in the retry queue.
func (*Client) DeleteScheduledJob ¶
DeleteScheduledJob deletes a job in the scheduled queue.
func (*Client) RetryAllDeadJobs ¶
RetryAllDeadJobs requeues all dead jobs. In other words, it puts them all back on the normal work queue for workers to pull from and process.
func (*Client) RetryDeadJob ¶
RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.
func (*Client) RetryJobs ¶
RetryJobs returns a list of RetryJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of retry jobs is also returned.
func (*Client) ScheduledJobs ¶
func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error)
ScheduledJobs returns a list of ScheduledJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of scheduled jobs is also returned.
func (*Client) WorkerObservations ¶
func (c *Client) WorkerObservations() ([]*WorkerObservation, error)
WorkerObservations returns all of the WorkerObservation's it finds for all worker pools' workers.
func (*Client) WorkerPoolHeartbeats ¶
func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error)
WorkerPoolHeartbeats queries Redis and returns all WorkerPoolHeartbeat's it finds (even for those worker pools which don't have a current heartbeat).
type ClientOption ¶ added in v1.9.0
type ClientOption func(*Client)
WorkerPoolOption is an optional option for WorkerPool.
func WithClientLogger ¶ added in v1.9.0
func WithClientLogger(l StructuredLogger) ClientOption
WithClientLogger registers logger.
type Enqueuer ¶
type Enqueuer struct { Namespace string // eg, "myapp-work" Pool Pool // contains filtered or unexported fields }
Enqueuer can enqueue jobs.
func NewEnqueuer ¶
NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.
func (*Enqueuer) Enqueue ¶
Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (*Enqueuer) EnqueueContext ¶ added in v1.4.0
EnqueueContext will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (*Enqueuer) EnqueueContextIn ¶ added in v1.4.0
func (e *Enqueuer) EnqueueContextIn(ctx context.Context, jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
EnqueueContextIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (*Enqueuer) EnqueueContextUnique ¶ added in v1.4.0
EnqueueContextUnique does the same as EnqueueUnique with context propagation.
func (*Enqueuer) EnqueueContextUniqueIn ¶ added in v1.4.0
func (e *Enqueuer) EnqueueContextUniqueIn(ctx context.Context, jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
// EnqueueContextUniqueIn does the same as EnqueueUniqueIn with context propagation.
func (*Enqueuer) EnqueueIn ¶
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error)
EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (*Enqueuer) EnqueueUnique ¶
EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (*Enqueuer) EnqueueUniqueIn ¶
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args Q) (*ScheduledJob, error)
EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
type GenericHandler
deprecated
type GenericMiddlewareHandler
deprecated
type GenericMiddlewareHandler func(*Job, NextMiddlewareFunc) error
Deprecated: use JobMiddleware instead. GenericMiddlewareHandler is a middleware without any custom context.
type Job ¶
type Job struct { // Inputs when making a new job Name string `json:"name,omitempty"` ID string `json:"id"` EnqueuedAt int64 `json:"t"` Args map[string]interface{} `json:"args"` Unique bool `json:"unique,omitempty"` // Inputs when retrying Fails int64 `json:"fails,omitempty"` // number of times this job has failed LastErr string `json:"err,omitempty"` FailedAt int64 `json:"failed_at,omitempty"` // StartingDeadline is used to skip periodic jobs that are no longer relevant. StartingDeadline int64 `json:"d,omitempty"` // TraceContext contains the OpenTelemetry trace context to propagate the context. TraceContext map[string]string `json:"trace,omitempty"` // contains filtered or unexported fields }
Job represents a job.
func (*Job) ArgBool ¶
ArgBool returns j.Args[key] typed to a bool. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().
func (*Job) ArgError ¶
ArgError returns the last error generated when extracting typed params. Returns nil if extracting the args went fine.
func (*Job) ArgFloat64 ¶
ArgFloat64 returns j.Args[key] typed to a float64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().
func (*Job) ArgInt64 ¶
ArgInt64 returns j.Args[key] typed to an int64. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().
func (*Job) ArgString ¶
ArgString returns j.Args[key] typed to a string. If the key is missing or of the wrong type, it sets an argument error on the job. This function is meant to be used in the body of a job handling function while extracting arguments, followed by a single call to j.ArgError().
func (*Job) Checkin ¶
Checkin will update the status of the executing job to the specified messages. This message is visible within the web UI. This is useful for indicating some sort of progress on very long running jobs. For instance, on a job that has to process a million records over the course of an hour, the job could call Checkin with the current job number every 10k jobs.
type JobContextHandler ¶ added in v1.4.0
Job handler types.
type JobContextMiddleware ¶ added in v1.4.0
type JobContextMiddleware = func(context.Context, *Job, JobContextHandler) error
Job middleware types.
type JobMiddleware ¶ added in v1.4.0
type JobMiddleware = func(*Job, NextMiddlewareFunc) error
Job middleware types.
type JobOptions ¶
type JobOptions struct { Priority uint // Priority from 1 to 10000 MaxFails uint // 1: send straight to dead (unless SkipDead) SkipDead bool // If true, don't send failed jobs to the dead queue when retries are exhausted. MaxConcurrency uint // Max number of jobs to keep in flight (default is 0, meaning no max) Backoff BackoffCalculator // If not set, uses the default backoff algorithm }
JobOptions can be passed to JobWithOptions.
type NextMiddlewareFunc ¶
type NextMiddlewareFunc func() error
NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.
type Q ¶
type Q map[string]interface{}
Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true})
type Queue ¶
type Queue struct { JobName string `json:"job_name"` Count int64 `json:"count"` Latency int64 `json:"latency"` }
Queue represents a queue that holds jobs with the same name. It indicates their name, count, and latency (in seconds). Latency is a measurement of how long ago the next job to be processed was enqueued.
type ReapResult ¶ added in v1.9.0
type ReapResult struct { // Err is any errors during the reaper cycle. Err error // NoPoolHeartBeatJobs is a collection of job names that have been adjusted // due to outdated worker pool heartbeats. NoPoolHeartBeatJobs []string // UnknownPoolJobs is a set of job names that have been adjusted because the // worker pools working on them are not part of the overall set of worker pools. UnknownPoolJobs []string // DanglingLockJobs is a set of job names that have been adjusted due to // inconsistency in their "lock" and "lock_info" keys. DanglingLockJobs []string }
ReapResult is a set of data that reaper works with.
type ReaperHook ¶ added in v1.9.0
type ReaperHook func() (afterHook func(ReapResult))
ReaperHook can be used to monitor the reaper's actions.
type ScheduledJob ¶
ScheduledJob represents a job in the scheduled queue.
type StdLogger
deprecated
added in
v1.1.0
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
Deprecated: use StructuredLogger. StdLogger is used to log error messages.
Deprecated: provide a logger using the WithLogger option. Logger is the instance of a StdLogger interface that Worker writes connection management events to. By default it is set to discard all log messages via io.Discard, but you can set it to redirect wherever you want.
type StructuredLogger ¶ added in v1.9.0
type StructuredLogger interface { Error(msg string, args ...any) ErrorContext(ctx context.Context, msg string, args ...any) Warn(msg string, args ...any) WarnContext(ctx context.Context, msg string, args ...any) Info(msg string, args ...any) InfoContext(ctx context.Context, msg string, args ...any) Debug(msg string, args ...any) DebugContext(ctx context.Context, msg string, args ...any) }
type WatchdogStat ¶ added in v1.11.0
The WatchdogStat struct represents statistics for a periodic jobs, including the name, counter,
type WorkerObservation ¶
type WorkerObservation struct { WorkerID string `json:"worker_id"` IsBusy bool `json:"is_busy"` // If IsBusy: JobName string `json:"job_name"` JobID string `json:"job_id"` StartedAt int64 `json:"started_at"` ArgsJSON string `json:"args_json"` Checkin string `json:"checkin"` CheckinAt int64 `json:"checkin_at"` }
WorkerObservation represents the latest observation taken from a worker. The observation indicates whether the worker is busy processing a job, and if so, information about that job.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.
func NewWorkerPool ¶
func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool Pool, opts ...WorkerPoolOption) *WorkerPool
NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
func (*WorkerPool) Drain ¶
func (wp *WorkerPool) Drain()
Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return.
func (*WorkerPool) Job ¶
func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool
Job registers the job name to the specified handler fn. For instance, when workers pull jobs from the name queue they'll be processed by the specified handler function. fn can take one of these forms:
func(context.Context, *Job) error func(*Job) error (*ContextType).func(context.Context, *Job) error (*ContextType).func(*Job) error
ContextType matches the type of ctx specified when creating a pool.
func (*WorkerPool) JobWithOptions ¶
func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool
JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.
func (*WorkerPool) Middleware ¶
func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool
Middleware appends the specified function to the middleware chain. The fn can take one of these forms:
func(context.Context, *Job, JobContextHandler) error func(*Job, NextMiddlewareFunc) error (*ContextType).func(context.Context, *Job, JobContextHandler) error (*ContextType).func(*Job, NextMiddlewareFunc) error
ContextType matches the type of ctx specified when creating a pool.
func (*WorkerPool) PeriodicallyEnqueue ¶
func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool
PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec. The spec format is based on github.com/robfig/cron/v3, which is a relatively standard cron format. Note that the first value can be seconds! If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start()
Start starts the workers and associated processes.
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
Stop stops the workers and associated processes.
func (*WorkerPool) WatchdogStats ¶ added in v1.11.0
func (wp *WorkerPool) WatchdogStats() []WatchdogStat
type WorkerPoolHeartbeat ¶
type WorkerPoolHeartbeat struct { WorkerPoolID string `json:"worker_pool_id"` StartedAt int64 `json:"started_at"` HeartbeatAt int64 `json:"heartbeat_at"` JobNames []string `json:"job_names"` Concurrency uint `json:"concurrency"` Host string `json:"host"` Pid int `json:"pid"` WorkerIDs []string `json:"worker_ids"` }
WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information.
type WorkerPoolOption ¶ added in v1.6.0
type WorkerPoolOption func(wp *WorkerPool)
WorkerPoolOption is an optional option for WorkerPool.
func WithLogger ¶ added in v1.9.0
func WithLogger(l StructuredLogger) WorkerPoolOption
WithLogger registers logger.
func WithReapPeriod ¶ added in v1.6.0
func WithReapPeriod(p time.Duration) WorkerPoolOption
WithReapPeriod defines the reaper running cycle period.
func WithReaperHook ¶ added in v1.9.0
func WithReaperHook(h ReaperHook) WorkerPoolOption
WithReaperHook registers a hook to monitor the reaper's actions.
func WithWatchdogFailCheckingTimeout ¶ added in v1.11.0
func WithWatchdogFailCheckingTimeout(p time.Duration) WorkerPoolOption
WithWatchdogFailCheckingTimeout defines the watchdog checking timeout that marks task as failed (default WatchdogFailCheckingTimeout).