client

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Builder

type Builder[P, S any] struct {
	// contains filtered or unexported fields
}

Builder is used to build a Relay Client for use.

func NewBuilder

func NewBuilder[P, S any](baseURL string) *Builder[P, S]

NewBuilder creates a new Builder for use with sane defaults.

func (*Builder[P, S]) Build

func (b *Builder[P, S]) Build() *Client[P, S]

Build creates a new Relay Client for use.

func (*Builder[P, S]) Client

func (b *Builder[P, S]) Client(client *http.Client) *Builder[P, S]

Client sets the http.Client to use for all requests.

func (*Builder[P, S]) MaxBytes

func (b *Builder[P, S]) MaxBytes(maxBytes int64) *Builder[P, S]

MaxBytes sets the maximum number of bytes to accept for an HTTP response.

see bytesext.Bytes for easily working with bytes.

func (*Builder[P, S]) MaxRetries

func (b *Builder[P, S]) MaxRetries(maxRetries uint) *Builder[P, S]

MaxRetries sets the maximum number of retries to attempt before giving up.

func (*Builder[P, S]) OnRetry

func (b *Builder[P, S]) OnRetry(fn errorsext.OnRetryFn[error]) *Builder[P, S]

OnRetry sets an optional OnRetryFn to call when requests are being retried.

When set this function will be called from within the default OnRetryFn within this client which handles retry backoff.

func (*Builder[P, S]) PollBackoff

func (b *Builder[P, S]) PollBackoff(bo backoff.Exponential) *Builder[P, S]

PollBackoff sets the backoff used when polling for Jobs.

func (*Builder[P, S]) RetryBackoff

func (b *Builder[P, S]) RetryBackoff(bo backoff.Exponential) *Builder[P, S]

RetryBackoff sets the backoff used when retrying failed requests.

type Client

type Client[P, S any] struct {
	// contains filtered or unexported fields
}

Client is the Relay low-level http client for interacting with the Relay HTTP API.

func (*Client[P, S]) Complete

func (c *Client[P, S]) Complete(ctx context.Context, queue, jobID string, runID uuid.UUID) error

Complete deletes an in-flight `Existing` job.

Errors

Will return `Err` on: - an unrecoverable network error. - The `Existing` job doesn't exist.

func (*Client[P, S]) Delete

func (c *Client[P, S]) Delete(ctx context.Context, queue, jobID string) error

Delete deletes an `Existing` job.

Errors

Will return `Err` on: - an unrecoverable network error.

func (*Client[P, S]) Enqueue

func (c *Client[P, S]) Enqueue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) (err error)

Enqueue enqueues a batch of one or more `New` jobs for processing using the provided `EnqueueMode`.

Errors

Will return `Err` on an unrecoverable network error.

func (*Client[P, S]) Exists

func (c *Client[P, S]) Exists(ctx context.Context, queue, jobID string) (bool, error)

Exists returns if a `Existing` job exists.

Errors

Will return `Err` on an unrecoverable network error.\

func (*Client[P, S]) Get

func (c *Client[P, S]) Get(ctx context.Context, queue, jobID string) (Option[job.Existing[P, S]], error)

Get attempts to return an `Existing` job.

Errors

Will return `Err` on: - an unrecoverable network error. - if the `Job` doesn't exist.

func (*Client[P, S]) Heartbeat

func (c *Client[P, S]) Heartbeat(ctx context.Context, queue, jobID string, runID uuid.UUID, state Option[S]) (err error)

Heartbeat sends a heartbeat request to an in-flight `Existing` job indicating it is still processing, resetting the timeout. Optionally you can update the `Existing` jobs state during the same request.

Errors

Will return `Err` on: - an unrecoverable network error. - if the `Existing` job doesn't exist.

func (*Client[P, S]) Next

func (c *Client[P, S]) Next(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)

Next attempts to retrieve the next `Existing` job(s) for processing.

Errors

Will return `Err` on: - an unrecoverable network error. - no `Existing` jobs currently exists.

func (*Client[P, S]) Poll

func (c *Client[P, S]) Poll(ctx context.Context, queue string, numJobs uint) ([]job.Existing[P, S], error)

Poll polls the Relay server until a `Job` becomes available.

Errors

Will return `Err` on an unrecoverable network error.

func (*Client[P, S]) Poller

func (c *Client[P, S]) Poller(queue string, runner Runner[P, S]) *PollBuilder[P, S, Runner[P, S]]

Poller Creates a new poller that will handle asynchronously polling and distributing `Existing` jobs to be processed by calling the supplied `Runner`.

func (*Client[P, S]) Requeue

func (c *Client[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, queue, jobID string, runID uuid.UUID, jobs []job.New[P, S]) (err error)

Requeue re-enqueues an existing in-flight `Existing` job to be run again or spawn a new set of jobs atomically.

The `Existing` jobs queue, id and `run_id` must match an existing in-flight Job. This is primarily used to schedule a new/the next run of a singleton job. This provides the ability for self-perpetuating scheduled jobs in an atomic manner.

Reschedule also allows you to change the jobs `queue` and `id` during the reschedule. This is allowed to facilitate advancing a job through a distributed pipeline/state machine atomically if that is more appropriate than advancing using the jobs state alone.

The mode will be used to determine the behaviour if a conflicting record already exists, just like when enqueuing jobs.

If the `Existing` job no longer exists or is not in-flight, this will return without error and will not enqueue any jobs.

Errors

Will return `Err` on: - an unrecoverable network error. - if one of the `Existing` jobs exists when mode is unique.

type JobHelper

type JobHelper[P, S any] struct {
	// contains filtered or unexported fields
}

JobHelper is used to provide a `Existing` job and Relay client instance to the `Runner` for processing.

It contains helper functions allowing abstraction away complexity of interacting with Relay.

func (JobHelper[P, S]) Complete

func (h JobHelper[P, S]) Complete(ctx context.Context) error

Complete completes/deletes this in-flight `Existing` job.

Panics

If the `Existing` job doesn't have a `run_id` set.

Errors

Will return `Err` on: - an unrecoverable network error. - The `Existing` job doesn't exist.

func (JobHelper[P, S]) Delete

func (h JobHelper[P, S]) Delete(ctx context.Context) error

Delete removes/delete the `Existing` job.

Errors

Will return `Err` on: - an unrecoverable network error.

func (JobHelper[P, S]) Exists

func (h JobHelper[P, S]) Exists(ctx context.Context) (bool, error)

Exists returns if the `Existing` job still exists.

Errors

Will return `Err` on an unrecoverable network error.

func (JobHelper[P, S]) Heartbeat

func (h JobHelper[P, S]) Heartbeat(ctx context.Context, state Option[S]) error

Heartbeat sends a heartbeat request to this in-flight `Existing` job indicating it is still processing, resetting the timeout. Optionally you can update the `Existing` jobs state during the same request.

Panics

If the `Existing` job doesn't have a `run_id` set.

Errors

Will return `Err` on: - an unrecoverable network error. - if the `Existing` job doesn't exist.

func (JobHelper[P, S]) InnerClient

func (h JobHelper[P, S]) InnerClient() *Client[P, S]

InnerClient returns a reference to the inner Relay client instance for interacting with Relay when the needing to do things the helper functions for the inner job don't apply to such as spawning one-off jobs not related to the existing running job in any way.

It is rare to need the inner client and helper functions should be preferred in most cases.

func (JobHelper[P, S]) Job

func (h JobHelper[P, S]) Job() job.Existing[P, S]

Job returns the `Existing` job to be processed.

func (JobHelper[P, S]) Requeue

func (h JobHelper[P, S]) Requeue(ctx context.Context, mode job.EnqueueMode, jobs []job.New[P, S]) error

Requeue re-queues this in-flight `Existing` job to be run again or spawn a new set of jobs atomically.

The `Existing` jobs queue, id and `run_id` must match an existing in-flight Job. This is primarily used to schedule a new/the next run of a singleton job. This provides the ability for self-perpetuating scheduled jobs in an atomic manner.

Reschedule also allows you to change the jobs `queue` and `id` during the reschedule. This is allowed to facilitate advancing a job through a distributed pipeline/state machine atomically if that is more appropriate than advancing using the jobs state alone.

The mode will be used to determine the behaviour if a conflicting record already exists, just like when enqueuing jobs.

If the `Existing` job no longer exists or is not in-flight, this will return without error and will not enqueue any jobs.

Panics

If the `Existing` job doesn't have a `run_id` set.

Errors

Will return `Err` on: - an unrecoverable network error. - if one of the `Existing` jobs exists when mode is unique.

type PollBuilder

type PollBuilder[P, S any, R Runner[P, S]] struct {
	// contains filtered or unexported fields
}

PollBuilder is used to configure and build Poller for use.

func (*PollBuilder[P, S, R]) Build

func (b *PollBuilder[P, S, R]) Build() *Poller[P, S, R]

Build creates a new `Poller` using the Builders configuration.

func (*PollBuilder[P, S, R]) NumWorkers

func (b *PollBuilder[P, S, R]) NumWorkers(n int) *PollBuilder[P, S, R]

NumWorkers sets the number of backend async workers indicating the maximum number of in-flight `Job`s.

Default is number of logical cores on the machine.

type Poller

type Poller[P, S any, R Runner[P, S]] struct {
	// contains filtered or unexported fields
}

Poller is used to abstract away polling and running multiple `Job`s calling the provided `Runner` method(s).

func (*Poller[P, S, R]) Start

func (p *Poller[P, S, R]) Start(ctx context.Context) (err error)

Start begins polling for jobs and calling provided `Runner`.

type Runner

type Runner[P, S any] interface {
	Run(ctx context.Context, helper JobHelper[P, S])
}

Runner is an interface used by the `Poller` to execute a `Job` after fetching it to be processed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL