Documentation ¶
Index ¶
- type Builder
- func (b *Builder[P, S]) Build() *Client[P, S]
- func (b *Builder[P, S]) Client(client *http.Client) *Builder[P, S]
- func (b *Builder[P, S]) MaxBytes(maxBytes int64) *Builder[P, S]
- func (b *Builder[P, S]) MaxRetries(maxRetries uint) *Builder[P, S]
- func (b *Builder[P, S]) OnRetry(fn errorsext.OnRetryFn[error]) *Builder[P, S]
- func (b *Builder[P, S]) PollBackoff(bo backoff.Exponential) *Builder[P, S]
- func (b *Builder[P, S]) RetryBackoff(bo backoff.Exponential) *Builder[P, S]
- type Client
- func (c *Client[P, S]) Complete(ctx context.Context, queue, jobID string, runID uuid.UUID) error
- func (c *Client[P, S]) Delete(ctx context.Context, queue, jobID string) error
- func (c *Client[P, S]) Enqueue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) (err error)
- func (c *Client[P, S]) Exists(ctx context.Context, queue, jobID string) (bool, error)
- func (c *Client[P, S]) Get(ctx context.Context, queue, jobID string) (Option[job.Existing[P, S]], error)
- func (c *Client[P, S]) Heartbeat(ctx context.Context, queue, jobID string, runID uuid.UUID, state Option[S]) (err error)
- func (c *Client[P, S]) Next(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)
- func (c *Client[P, S]) Poll(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)
- func (c *Client[P, S]) Poller(queue string, runner Runner[P, S]) *PollBuilder[P, S, Runner[P, S]]
- func (c *Client[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, queue, jobID string, ...) (err error)
- type JobHelper
- func (h JobHelper[P, S]) Complete(ctx context.Context) error
- func (h JobHelper[P, S]) Delete(ctx context.Context) error
- func (h JobHelper[P, S]) Exists(ctx context.Context) (bool, error)
- func (h JobHelper[P, S]) Heartbeat(ctx context.Context, state Option[S]) error
- func (h JobHelper[P, S]) InnerClient() *Client[P, S]
- func (h JobHelper[P, S]) Job() job.Existing[P, S]
- func (h JobHelper[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) error
- type PollBuilder
- type Poller
- type Runner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Builder ¶
type Builder[P, S any] struct { // contains filtered or unexported fields }
Builder is used to build a Relay Client for use.
func NewBuilder ¶
NewBuilder creates a new Builder for use with sane defaults.
func (*Builder[P, S]) MaxBytes ¶
MaxBytes sets the maximum number of bytes to accept for an HTTP response.
see bytesext.Bytes for easily working with bytes.
func (*Builder[P, S]) MaxRetries ¶
MaxRetries sets the maximum number of retries to attempt before giving up.
func (*Builder[P, S]) OnRetry ¶
OnRetry sets an optional OnRetryFn to call when requests are being retried.
When set this function will be called from within the default OnRetryFn within this client which handles retry backoff.
func (*Builder[P, S]) PollBackoff ¶
PollBackoff sets the backoff used when polling for Jobs.
func (*Builder[P, S]) RetryBackoff ¶
RetryBackoff sets the backoff used when retrying failed requests.
type Client ¶
type Client[P, S any] struct { // contains filtered or unexported fields }
Client is the Relay low-level http client for interacting with the Relay HTTP API.
func (*Client[P, S]) Complete ¶
Complete deletes an in-flight `Existing` job.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - The `Existing` job doesn't exist.
func (*Client[P, S]) Delete ¶
Delete deletes an `Existing` job.
Errors ¶
Will return `Err` on: - an unrecoverable network error.
func (*Client[P, S]) Enqueue ¶
func (c *Client[P, S]) Enqueue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) (err error)
Enqueue enqueues a batch of one or more `New` jobs for processing using the provided `EnqueueMode`.
Errors ¶
Will return `Err` on an unrecoverable network error.
func (*Client[P, S]) Exists ¶
Exists returns if a `Existing` job exists.
Errors ¶
Will return `Err` on an unrecoverable network error.\
func (*Client[P, S]) Get ¶
func (c *Client[P, S]) Get(ctx context.Context, queue, jobID string) (Option[job.Existing[P, S]], error)
Get attempts to return an `Existing` job.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - if the `Job` doesn't exist.
func (*Client[P, S]) Heartbeat ¶
func (c *Client[P, S]) Heartbeat(ctx context.Context, queue, jobID string, runID uuid.UUID, state Option[S]) (err error)
Heartbeat sends a heartbeat request to an in-flight `Existing` job indicating it is still processing, resetting the timeout. Optionally you can update the `Existing` jobs state during the same request.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - if the `Existing` job doesn't exist.
func (*Client[P, S]) Next ¶
func (c *Client[P, S]) Next(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)
Next attempts to retrieve the next `Existing` job(s) for processing.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - no `Existing` jobs currently exists.
func (*Client[P, S]) Poll ¶
func (c *Client[P, S]) Poll(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)
Poll polls the Relay server until a `Job` becomes available.
Errors ¶
Will return `Err` on an unrecoverable network error.
func (*Client[P, S]) Poller ¶
func (c *Client[P, S]) Poller(queue string, runner Runner[P, S]) *PollBuilder[P, S, Runner[P, S]]
Poller Creates a new poller that will handle asynchronously polling and distributing `Existing` jobs to be processed by calling the supplied `Runner`.
func (*Client[P, S]) Requeue ¶
func (c *Client[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, queue, jobID string, runID uuid.UUID, jobs []job.New[P, S]) (err error)
Requeue re-enqueues an existing in-flight `Existing` job to be run again or spawn a new set of jobs atomically.
The `Existing` jobs queue, id and `run_id` must match an existing in-flight Job. This is primarily used to schedule a new/the next run of a singleton job. This provides the ability for self-perpetuating scheduled jobs in an atomic manner.
Reschedule also allows you to change the jobs `queue` and `id` during the reschedule. This is allowed to facilitate advancing a job through a distributed pipeline/state machine atomically if that is more appropriate than advancing using the jobs state alone.
The mode will be used to determine the behaviour if a conflicting record already exists, just like when enqueuing jobs.
If the `Existing` job no longer exists or is not in-flight, this will return without error and will not enqueue any jobs.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - if one of the `Existing` jobs exists when mode is unique.
type JobHelper ¶
type JobHelper[P, S any] struct { // contains filtered or unexported fields }
JobHelper is used to provide a `Existing` job and Relay client instance to the `Runner` for processing.
It contains helper functions allowing abstraction away complexity of interacting with Relay.
func (JobHelper[P, S]) Complete ¶
Complete completes/deletes this in-flight `Existing` job.
Panics ¶
If the `Existing` job doesn't have a `run_id` set.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - The `Existing` job doesn't exist.
func (JobHelper[P, S]) Delete ¶
Delete removes/delete the `Existing` job.
Errors ¶
Will return `Err` on: - an unrecoverable network error.
func (JobHelper[P, S]) Exists ¶
Exists returns if the `Existing` job still exists.
Errors ¶
Will return `Err` on an unrecoverable network error.
func (JobHelper[P, S]) Heartbeat ¶
Heartbeat sends a heartbeat request to this in-flight `Existing` job indicating it is still processing, resetting the timeout. Optionally you can update the `Existing` jobs state during the same request.
Panics ¶
If the `Existing` job doesn't have a `run_id` set.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - if the `Existing` job doesn't exist.
func (JobHelper[P, S]) InnerClient ¶
InnerClient returns a reference to the inner Relay client instance for interacting with Relay when the needing to do things the helper functions for the inner job don't apply to such as spawning one-off jobs not related to the existing running job in any way.
It is rare to need the inner client and helper functions should be preferred in most cases.
func (JobHelper[P, S]) Requeue ¶
func (h JobHelper[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) error
Requeue re-queues this in-flight `Existing` job to be run again or spawn a new set of jobs atomically.
The `Existing` jobs queue, id and `run_id` must match an existing in-flight Job. This is primarily used to schedule a new/the next run of a singleton job. This provides the ability for self-perpetuating scheduled jobs in an atomic manner.
Reschedule also allows you to change the jobs `queue` and `id` during the reschedule. This is allowed to facilitate advancing a job through a distributed pipeline/state machine atomically if that is more appropriate than advancing using the jobs state alone.
The mode will be used to determine the behaviour if a conflicting record already exists, just like when enqueuing jobs.
If the `Existing` job no longer exists or is not in-flight, this will return without error and will not enqueue any jobs.
Panics ¶
If the `Existing` job doesn't have a `run_id` set.
Errors ¶
Will return `Err` on: - an unrecoverable network error. - if one of the `Existing` jobs exists when mode is unique.
type PollBuilder ¶
PollBuilder is used to configure and build Poller for use.
func (*PollBuilder[P, S, R]) Build ¶
func (b *PollBuilder[P, S, R]) Build() *Poller[P, S, R]
Build creates a new `Poller` using the Builders configuration.
func (*PollBuilder[P, S, R]) NumWorkers ¶
func (b *PollBuilder[P, S, R]) NumWorkers(n int) *PollBuilder[P, S, R]
NumWorkers sets the number of backend async workers indicating the maximum number of in-flight `Job`s.
Default is number of logical cores on the machine.