job

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2023 License: MPL-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package job provides a Job and a Run struct suitable for persisting to the repository.

A Job represents work that should be run at a predetermined time and needs to be synchronized across servers to ensure that one and only one instance of a job is running at any given time. The job's name must be unique to the plugin that registered the job, the (plugin_id, name) uniqueness is enforced by the database.

A Run represents a single execution of a job, only a single run can be in a running state for a specific job. The private_id (primary key) is generated by the repository during RunJobs.

Repository

A repository provides methods for creating, updating, retrieving, and deleting Jobs. It also provides methods to run jobs, update progress of runs and complete, fail or interrupt runs. A new repository should be created for each transaction. For example:

var wrapper wrapping.Wrapper
... init wrapper ...

// db implements both the reader and writer interfaces.
db, _ := db.Open(db.Postgres, url)

var repo *job.Repository
repo, _ = job.NewRepository(db, db, wrapper)

var j *job.Job
j, _ = repo.UpsertJob(context.Background(), j, "name", "description")

var runs []*Run
repo, _ = job.NewRepository(db, db, wrapper)
runs, _ = repo.RunJobs(context.Background(), "serverId")

... run job ...

var run *Run
repo, _ = job.NewRepository(db, db, wrapper)
run, _ = repo.UpdateProgress(ctx, run, []string{"TotalCount", "CompletedCount"})

nextJobRun = time.Now().Add(time.Hour)

repo, _ = job.NewRepository(db, db, wrapper)
run, _ = repo.CompleteRun(ctx, run.PrivateId, job.Completed, nextJobRun)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	*store.Job
	// contains filtered or unexported fields
}

Job represents work that should be run at a predetermined time and needs to be synchronized across servers to ensure that one and only one instance of a job is running at any given time.

func (*Job) SetTableName

func (j *Job) SetTableName(n string)

SetTableName sets the table name. If the caller attempts to set the name to "" the name will be reset to the default name "job".

func (*Job) TableName

func (j *Job) TableName() string

TableName returns the table name for the job.

type Option

type Option func(*options)

Option - how Options are passed as arguments

func WithControllerId added in v0.9.0

func WithControllerId(id string) Option

WithControllerId provides an option to provide the server id to match when calling InterruptRuns

func WithLimit

func WithLimit(l int) Option

WithLimit provides an option to provide a limit for ListJobs. Intentionally allowing negative integers. If WithLimit < 0, then unlimited results are returned. If WithLimit == 0, then default limits are used for results.

func WithName

func WithName(n string) Option

WithName provides an option to provide the name to match when calling ListJobs

func WithNextRunIn

func WithNextRunIn(d time.Duration) Option

WithNextRunIn provides an option to provide the duration until the next run is scheduled. If this option is not provided the NextScheduledRun of the job will default to the current database time, and be available to run immediately.

func WithRunJobsLimit

func WithRunJobsLimit(l int) Option

WithRunJobsLimit provides an option to provide the number of jobs to run. If WithRunJobsLimit == 0, then default run jobs limit is used. If WithRunJobsLimit < 0, then no limit is used.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

A Repository stores and retrieves the persistent types in the job package. It is not safe to use a repository concurrently.

func NewRepository

func NewRepository(r db.Reader, w db.Writer, kms *kms.Kms, opt ...Option) (*Repository, error)

NewRepository creates a new Repository. The returned repository should only be used for one transaction and it is not safe for concurrent go routines to access it. WithLimit option is used as a repo wide default limit applied to all ListX methods.

func (*Repository) CompleteRun

func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, completed, total int, _ ...Option) (*Run, error)

CompleteRun updates the Run repository entry for the provided runId. It sets the status to 'completed', updates the run's EndTime to the current database time, and sets the completed and total counts. CompleteRun also updates the Job repository entry that is associated with this run, setting the job's NextScheduledRun to the current database time incremented by the nextRunIn parameter.

Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to CompleteRun will return an error with Code errors.InvalidJobRunState. All options are ignored.

func (*Repository) FailRun

func (r *Repository) FailRun(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)

FailRun updates the Run repository entry for the provided runId. It sets the status to 'failed' and updates the run's EndTime to the current database time, and sets the completed and total counts.

Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to FailRun will return an error with Code errors.InvalidJobRunState. All options are ignored.

func (*Repository) InterruptRuns

func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time.Duration, opt ...Option) ([]*Run, error)

InterruptRuns updates the Run repository entries for all job runs that have not been updated for the provided interruptThreshold. It sets the status to 'interrupted' and updates the run's EndTime to the current database time.

Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to InterruptRuns will return an error with Code errors.InvalidJobRunState. WithControllerId is the only valid option

func (*Repository) ListJobs

func (r *Repository) ListJobs(ctx context.Context, opt ...Option) ([]*Job, error)

ListJobs returns a slice of Jobs.

WithName and WithLimit are the only valid options.

func (*Repository) LookupJob

func (r *Repository) LookupJob(ctx context.Context, name string, _ ...Option) (*Job, error)

LookupJob will look up a job in the repository using the job name. If the job is not found, it will return nil, nil.

All options are ignored.

func (*Repository) LookupRun

func (r *Repository) LookupRun(ctx context.Context, runId string, _ ...Option) (*Run, error)

LookupRun will look up a run in the repository using the runId. If the run is not found, it will return nil, nil.

All options are ignored.

func (*Repository) RunJobs

func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option) ([]*Run, error)

RunJobs queries the job repository for jobs that need to be run. It creates new entries for each job that needs to be run in the job_run repository, returning a slice of *Run. If there are not jobs to run, an empty slice will be returned with a nil error.

• serverId is required and is the private_id of the server that will run the jobs.

The only valid option is WithRunJobsLimit, if not provided RunJobs will run only 1 job.

func (*Repository) UpdateJobNextRunInAtLeast added in v0.4.0

func (r *Repository) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, _ ...Option) (*Job, error)

UpdateJobNextRunInAtLeast updates the Job repository entry for the job name, setting the job's NextScheduledRun time to either the current database time incremented by the nextRunInAtLeast parameter or the current NextScheduledRun time value, which ever is sooner.

All options are ignored.

func (*Repository) UpdateProgress

func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)

UpdateProgress updates the repository entry's completed and total counts for the provided runId.

Once a run has been persisted with a final run status (completed, failed or interrupted), any future UpdateProgress attempts will return an error with Code errors.InvalidJobRunState. All options are ignored.

func (*Repository) UpsertJob added in v0.7.5

func (r *Repository) UpsertJob(ctx context.Context, name, description string, opt ...Option) (*Job, error)

UpsertJob inserts a job into the repository or updates its current description and returns a new *Job.

• name must be provided and is the name of the job.

• description must be provided and is the user-friendly description of the job.

WithNextRunIn is the only valid options.

type Run

type Run struct {
	*store.JobRun
	// contains filtered or unexported fields
}

Run represents an instance of a job that is either actively running or has already completed.

func (*Run) SetTableName

func (j *Run) SetTableName(n string)

SetTableName sets the table name. If the caller attempts to set the name to "" the name will be reset to the default name "job_run".

func (*Run) TableName

func (j *Run) TableName() string

TableName returns the table name for the job run.

type Status

type Status string
const (
	// Running represents that the job run is actively running on a server
	Running Status = "running"

	// Completed represents that the job run has successfully finished
	Completed Status = "completed"

	// Failed represent that the job run had an error during execution
	Failed Status = "failed"

	// Interrupted represents that the job run was interrupted by a server
	// other than the server running the job.
	Interrupted Status = "interrupted"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL