worker

package
v8.4.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 13 Imported by: 14

Documentation

Index

Constants

View Source
const (
	DefaultJobWorkerMaxJobActive  = 32
	DefaultJobWorkerConcurrency   = 4
	DefaultJobWorkerPollInterval  = 100 * time.Millisecond
	DefaultJobWorkerPollThreshold = 0.3
	RequestTimeoutOffset          = 10 * time.Second
	DefaultRequestTimeout         = 10 * time.Second
	DefaultStreamEnabled          = false
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffSupplier added in v8.4.0

type BackoffSupplier interface {
	SupplyRetryDelay(currentRetryDelay time.Duration) time.Duration
}

type ExponentialBackoff added in v8.4.0

type ExponentialBackoff struct {
	// contains filtered or unexported fields
}

func NewExponentialBackoffBuilder added in v8.4.0

func NewExponentialBackoffBuilder() ExponentialBackoff

func (ExponentialBackoff) BackoffFactor added in v8.4.0

func (e ExponentialBackoff) BackoffFactor(backoffFactor float64) ExponentialBackoffBuilder

func (ExponentialBackoff) Build added in v8.4.0

func (ExponentialBackoff) JitterFactor added in v8.4.0

func (e ExponentialBackoff) JitterFactor(jitterFactor float64) ExponentialBackoffBuilder

func (ExponentialBackoff) MaxDelay added in v8.4.0

func (ExponentialBackoff) MinDelay added in v8.4.0

func (ExponentialBackoff) Random added in v8.4.0

func (ExponentialBackoff) SupplyRetryDelay added in v8.4.0

func (e ExponentialBackoff) SupplyRetryDelay(currentRetryDelay time.Duration) time.Duration

type ExponentialBackoffBuilder added in v8.4.0

type ExponentialBackoffBuilder interface {
	MaxDelay(time.Duration) ExponentialBackoffBuilder
	MinDelay(time.Duration) ExponentialBackoffBuilder
	BackoffFactor(float64) ExponentialBackoffBuilder
	JitterFactor(float64) ExponentialBackoffBuilder
	Random(*rand.Rand) ExponentialBackoffBuilder
	Build() BackoffSupplier
}

type JobClient

type JobClient interface {
	NewCompleteJobCommand() commands.CompleteJobCommandStep1
	NewFailJobCommand() commands.FailJobCommandStep1
	NewThrowErrorCommand() commands.ThrowErrorCommandStep1
}

type JobHandler

type JobHandler func(client JobClient, job entities.Job)

type JobWorker

type JobWorker interface {
	// Initiate graceful shutdown and awaits termination
	Close()
	// Await termination of worker
	AwaitClose()
}

type JobWorkerBuilder

type JobWorkerBuilder struct {
	// contains filtered or unexported fields
}

func (*JobWorkerBuilder) BackoffSupplier added in v8.4.0

func (builder *JobWorkerBuilder) BackoffSupplier(backoffSupplier BackoffSupplier) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Concurrency

func (builder *JobWorkerBuilder) Concurrency(concurrency int) JobWorkerBuilderStep3

func (*JobWorkerBuilder) FetchVariables

func (builder *JobWorkerBuilder) FetchVariables(fetchVariables ...string) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Handler

func (builder *JobWorkerBuilder) Handler(handler JobHandler) JobWorkerBuilderStep3

func (*JobWorkerBuilder) JobType

func (builder *JobWorkerBuilder) JobType(jobType string) JobWorkerBuilderStep2

func (*JobWorkerBuilder) MaxJobsActive

func (builder *JobWorkerBuilder) MaxJobsActive(maxJobsActive int) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Metrics

func (builder *JobWorkerBuilder) Metrics(metrics JobWorkerMetrics) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Name

func (builder *JobWorkerBuilder) Name(name string) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Open

func (builder *JobWorkerBuilder) Open() JobWorker

func (*JobWorkerBuilder) PollInterval

func (builder *JobWorkerBuilder) PollInterval(pollInterval time.Duration) JobWorkerBuilderStep3

func (*JobWorkerBuilder) PollThreshold

func (builder *JobWorkerBuilder) PollThreshold(pollThreshold float64) JobWorkerBuilderStep3

func (*JobWorkerBuilder) RequestTimeout

func (builder *JobWorkerBuilder) RequestTimeout(timeout time.Duration) JobWorkerBuilderStep3

func (*JobWorkerBuilder) StreamEnabled added in v8.4.0

func (builder *JobWorkerBuilder) StreamEnabled(streamEnabled bool) JobWorkerBuilderStep3

func (*JobWorkerBuilder) StreamRequestTimeout added in v8.4.0

func (builder *JobWorkerBuilder) StreamRequestTimeout(requestTimeout time.Duration) JobWorkerBuilderStep3

func (*JobWorkerBuilder) Timeout

func (builder *JobWorkerBuilder) Timeout(timeout time.Duration) JobWorkerBuilderStep3

type JobWorkerBuilderStep1

type JobWorkerBuilderStep1 interface {
	// JobType Set the type of jobs to work on
	JobType(string) JobWorkerBuilderStep2
}

func NewJobWorkerBuilder

func NewJobWorkerBuilder(gatewayClient pb.GatewayClient, jobClient JobClient, retryPred func(ctx context.Context, err error) bool) JobWorkerBuilderStep1

NewJobWorkerBuilder should use the same retryPredicate used by the CredentialProvider (ShouldRetry method):

credsProvider, _ := zbc.NewOAuthCredentialsProvider(...)
worker.NewJobWorkerBuilder(..., credsProvider.ShouldRetry)

type JobWorkerBuilderStep2

type JobWorkerBuilderStep2 interface {
	// Handler Set the handler to process jobs. The worker should complete or fail the job. The handler implementation
	// must be thread-safe.
	Handler(JobHandler) JobWorkerBuilderStep3
}

type JobWorkerBuilderStep3

type JobWorkerBuilderStep3 interface {
	// Name Set the name of the worker owner
	Name(string) JobWorkerBuilderStep3
	// Timeout Set the duration no other worker should work on job activated by this worker
	Timeout(time.Duration) JobWorkerBuilderStep3
	// RequestTimeout Set the timeout for the request
	RequestTimeout(time.Duration) JobWorkerBuilderStep3
	// MaxJobsActive Set the maximum number of jobs which will be activated for this worker at the
	// same time.
	MaxJobsActive(int) JobWorkerBuilderStep3
	// Concurrency Set the maximum number of concurrent spawned goroutines to complete jobs
	Concurrency(int) JobWorkerBuilderStep3
	// PollInterval Set the maximal interval between polling for new jobs
	PollInterval(time.Duration) JobWorkerBuilderStep3
	// PollThreshold Set the threshold of buffered activated jobs before polling for new jobs, i.e. threshold * MaxJobsActive(int)
	PollThreshold(float64) JobWorkerBuilderStep3
	// FetchVariables Set list of variable names which should be fetched on job activation
	FetchVariables(...string) JobWorkerBuilderStep3
	// Metrics Set implementation for metrics reporting
	Metrics(metrics JobWorkerMetrics) JobWorkerBuilderStep3
	// BackoffSupplier Set the backoffSupplier to back off polling on errors
	BackoffSupplier(supplier BackoffSupplier) JobWorkerBuilderStep3
	// StreamEnabled Enables the job worker to stream jobs. It will still poll for older jobs, but streaming is favored.
	StreamEnabled(bool) JobWorkerBuilderStep3
	// StreamRequestTimeout If streaming is enabled, this sets the timeout on the underlying job stream. It's useful to set a few hours to load-balance your streams over time.
	StreamRequestTimeout(time.Duration) JobWorkerBuilderStep3
	// Open the job worker and start polling and handling jobs
	Open() JobWorker
}

type JobWorkerMetrics

type JobWorkerMetrics interface {
	// Set the remaining count of scheduled jobs for a specific job
	SetJobsRemainingCount(jobType string, count int)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL