Documentation
¶
Overview ¶
Package jobloop contains the Job trait that abstracts over several types of worker loops. The package provides the basic implementation of these worker loops, including basic instrumentation.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultJitter ¶
DefaultJitter returns a random duration within +/- 10% of the requested value. See explanation on type Jitter for when this is useful.
func NoJitter ¶
NoJitter returns the input value unchanged.
This can be used in place of DefaultJitter to ensure deterministic behavior in tests.
func ProcessMany ¶
ProcessMany finds and executes a given amount of tasks. If not enough tasks are available to be executed, `sql.ErrNoRows` is returned. If any error is encountered, processing stops early.
If only go would support member functions on interfaces...
Types ¶
type CronJob ¶
type CronJob struct { Metadata JobMetadata Interval time.Duration // By default, the job will wait out a full Interval before running for the first time. // If an earlier first run is desired, InitialDelay can be set to a non-zero value that is smaller than Interval. InitialDelay time.Duration // A function that will be executed by this job once per Interval. // // The provided label set will have been prefilled with the labels from // Metadata.CounterLabels and all label values set to "early-db-access". // The implementation is expected to substitute the actual label values as // soon as they become known. Task func(context.Context, prometheus.Labels) error }
CronJob is a job loop that executes in a set interval.
func (*CronJob) Setup ¶
func (j *CronJob) Setup(registerer prometheus.Registerer) Job
Setup builds the Job interface for this job and registers the counter metric. At runtime, `nil` can be given to use the default registry. In tests, a test-local prometheus.Registry instance should be used instead.
type Jitter ¶
Jitter is a strategy for randomizing task recurrence intervals.
When a background job performs a certain task for each object on a specific interval, it is usually desirable to not schedule the next task to take place after exactly that interval.
For example, consider a blob storage service with a background job to check the validity of each individual blob every 24 hours. If a lot of blobs are uploaded at once, adhering to the exact 24-hour interval will cause high load in the system every day at the same time.
To counteract this, we recommend that the calculation of a followup task deadline use jitter like this:
// instead of this... blob.NextValidationAt = now.Add(24 * time.Hour) // ...do this blob.NextValidationAt = now.Add(jobloop.DefaultJitter(24 * time.Hour))
type Job ¶
type Job interface { // ProcessOne finds and executes exactly one task, aborting early if `ctx` expires. // If no task is available to be executed, `sql.ErrNoRows` is returned. // The runtime behavior of the job can be configured through Option arguments. ProcessOne(ctx context.Context, opts ...Option) error // Run blocks the current goroutine and executes tasks until `ctx` expires. // The runtime behavior of the job can be configured through Option arguments. Run(ctx context.Context, opts ...Option) }
Job describes a loop that executes instances of a specific type of task.
type JobMetadata ¶
type JobMetadata struct { // A readable name or short description for this job. This will be used in // log messages. ReadableName string // Whether it is safe to have multiple tasks running in parallel. If set to // false, the job will never select a new task before the previous task has // been fully processed, thus avoiding any concurrent processing of tasks. ConcurrencySafe bool // Metadata for the counter metric that will be emitted by the job. CounterOpts prometheus.CounterOpts // The labels of the counter metric. Besides the application-specific labels // listed here, the counter metric will always have the label "task_outcome" // with the possible values "success" and "failure". This label will be // filled by the job implementation. CounterLabels []string // contains filtered or unexported fields }
JobMetadata contains metadata and common configuration for a job. Types that implement the Job interface will usually be holding one of these.
type Option ¶
type Option func(*jobConfig)
Option is a configuration option for a Job. Currently, only the number of goroutines can be configured, but more options could be added in the future.
This type is an implementation of the Functional Options pattern, see e.g. <https://github.com/tmrts/go-patterns/blob/master/idiom/functional-options.md>
func NumGoroutines ¶
NumGoroutines is an option for a Job that allows the Job to use multiple goroutines, up to the specified number. The default value is 1, meaning that no concurrency will be employed.
This option is always ignored during ProcessOne(), because a single task does not require concurrency on the level of the job runtime.
func WithLabel ¶
WithLabel is an option for a Job that prefills one of the CounterLabels declared in the job's metadata before each task. This is useful for running multiple instances of a job in parallel while reusing the JobMetadata, task callbacks, and Prometheus metrics. Task callbacks can inspect the overridden label value to discover which particular instance of the job they belong to.
type ProducerConsumerJob ¶
type ProducerConsumerJob[T any] struct { Metadata JobMetadata // A function that will be polled periodically to discover the next task // within this job. If there are currently no tasks waiting to be executed, // this function shall return `sql.ErrNoRows` to instruct the job to slow // down its polling. // // The provided label set will have been prefilled with the labels from // Metadata.CounterLabels and all label values set to "early-db-access". The // implementation is expected to substitute the actual label values as soon // as they become known. DiscoverTask func(context.Context, prometheus.Labels) (T, error) // A function that will be used to process a task that has been discovered // within this job. // // The provided label set will have been prefilled with the labels from // Metadata.CounterLabels and all label values set to "early-db-access". The // implementation is expected to substitute the actual label values as soon // as they become known. ProcessTask func(context.Context, T, prometheus.Labels) error }
ProducerConsumerJob describes a job that has one goroutine (the "producer") selecting tasks from an external source and one or more goroutines (the "consumers") executing the tasks that have been selected.
Usually, the external source for the tasks is a database table, from which one row is selected per task. In this scenario, row-level locking should be used to ensure that multiple processes working on the same database table do not interfere with each other. If row-level locking cannot be used, the ConcurrencySafe field must be set to false to avoid data races.
This type is parametrized over the type T (Task) which contains all data that is relevant to a single task, i.e. one single execution of the job.
A package that implements job loops will usually provide a public API to spawn Job objects, and hide the task type as well as the task callbacks within the package, like this:
func (e *MyExecutor) EventTranslationJob(registerer prometheus.Registerer) jobloop.Job { return (&jobloop.ProducerConsumerJob[*eventTranslateTask]{ //task type is private Metadata: jobloop.JobMetadata { ReadableName: "event translation", ConcurrencySafe: true, MetricOpts: prometheus.CounterOpts{Name: "myservice_event_translations"}, LabelNames: []string{"event_type"}, }, DiscoverTask: e.findNextEventToTranslate, //function is private ProcessTask: e.translateEvent, //function is private }).Setup(registerer) }
func (*ProducerConsumerJob[T]) Setup ¶
func (j *ProducerConsumerJob[T]) Setup(registerer prometheus.Registerer) Job
Setup builds the Job interface for this job and registers the counter metric. At runtime, `nil` can be given to use the default registry. In tests, a test-local prometheus.Registry instance should be used instead.
type TxGuardedJob ¶
type TxGuardedJob[Tx sqlext.Rollbacker, P any] struct { Metadata JobMetadata // A function that begins a new DB transaction. Usually set to `db.Begin`. BeginTx func() (Tx, error) // A function that will be polled periodically (once per transaction) to // discover the next row to work on. If there are currently no rows waiting // to be processed, this function shall return `sql.ErrNoRows` to instruct // the job to slow down its polling. // // The provided label set will have been prefilled with the labels from // Metadata.CounterLabels and all label values set to "early-db-access". The // implementation is expected to substitute the actual label values as soon // as they become known. DiscoverRow func(context.Context, Tx, prometheus.Labels) (P, error) // A function that will be called once for each discovered row to process it. // // The provided label set will have been prefilled with the labels from // Metadata.CounterLabels and all label values set to "early-db-access". The // implementation is expected to substitute the actual label values as soon // as they become known. ProcessRow func(context.Context, Tx, P, prometheus.Labels) error }
TxGuardedJob is a specialization of ProducerConsumerJob, where each task corresponds to one row of a database table that needs to be worked on. Rows must be selected in a ConcurrencySafe way (most commonly through the "SELECT ... FOR UPDATE SKIP LOCKED" mechanism). The job implementation ensures that the entirety of each task runs within a single SQL transaction, and manages the lifecycle of that transaction.
This type works in the same way as ProducerConsumerJob, except that it offers a different set of callbacks. The type arguments are:
- P (Payload), the payload for one individual task (e.g. the ORM object corresponding to the selected row)
- Tx (Transaction), the type for a DB transaction (the job implementation will call Rollback on this in case of errors)
Just like ProducerConsumerJob, the type arguments are often private types, and the job type as well as the callbacks are hidden within the defining package, like this:
func (e *MyExecutor) EventTranslationJob(registerer prometheus.Registerer) jobloop.Job { return (&jobloop.TxGuardedJob[*sql.Tx, dbmodel.Event]{ Metadata: jobloop.JobMetadata { ReadableName: "event translation", ConcurrencySafe: true, MetricOpts: prometheus.CounterOpts{Name: "myservice_event_translations"}, LabelNames: []string{"event_type"}, }, BeginTx: e.DB.Begin, DiscoverRow: e.findNextEventToTranslate, //function is private ProcessRow: e.translateEvent, //function is private }).Setup(registerer)
func (*TxGuardedJob[Tx, P]) Setup ¶
func (j *TxGuardedJob[Tx, P]) Setup(registerer prometheus.Registerer) Job
Setup builds the Job interface for this job and registers the counter metric. At runtime, `nil` can be given to use the default registry. In tests, a test-local prometheus.Registry instance should be used instead.