Documentation ¶
Overview ¶
Package job provides a Job and a Run struct suitable for persisting to the repository.
A Job represents work that should be run at a predetermined time and needs to be synchronized across servers to ensure that one and only one instance of a job is running at any given time. The job's name must be unique to the plugin that registered the job, the (plugin_id, name) uniqueness is enforced by the database.
A Run represents a single execution of a job, only a single run can be in a running state for a specific job. The private_id (primary key) is generated by the repository during RunJobs.
Repository ¶
A repository provides methods for creating, updating, retrieving, and deleting Jobs. It also provides methods to run jobs, update progress of runs and complete, fail or interrupt runs. A new repository should be created for each transaction. For example:
var wrapper wrapping.Wrapper ... init wrapper ... // db implements both the reader and writer interfaces. db, _ := db.Open(db.Postgres, url) var repo *job.Repository repo, _ = job.NewRepository(db, db, wrapper) var j *job.Job j, _ = repo.UpsertJob(context.Background(), j, "name", "description") var runs []*Run repo, _ = job.NewRepository(db, db, wrapper) runs, _ = repo.RunJobs(context.Background(), "serverId") ... run job ... var run *Run repo, _ = job.NewRepository(db, db, wrapper) run, _ = repo.UpdateProgress(ctx, run, []string{"TotalCount", "CompletedCount"}) nextJobRun = time.Now().Add(time.Hour) repo, _ = job.NewRepository(db, db, wrapper) run, _ = repo.CompleteRun(ctx, run.PrivateId, job.Completed, nextJobRun)
Index ¶
- type Job
- type Option
- type Repository
- func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, ...) (*Run, error)
- func (r *Repository) FailRun(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)
- func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time.Duration, opt ...Option) ([]*Run, error)
- func (r *Repository) ListJobs(ctx context.Context, opt ...Option) ([]*Job, error)
- func (r *Repository) LookupJob(ctx context.Context, name string, _ ...Option) (*Job, error)
- func (r *Repository) LookupRun(ctx context.Context, runId string, _ ...Option) (*Run, error)
- func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option) ([]*Run, error)
- func (r *Repository) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, _ ...Option) (*Job, error)
- func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)
- func (r *Repository) UpsertJob(ctx context.Context, name, description string, opt ...Option) (*Job, error)
- type Run
- type Status
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
Job represents work that should be run at a predetermined time and needs to be synchronized across servers to ensure that one and only one instance of a job is running at any given time.
func (*Job) SetTableName ¶
SetTableName sets the table name. If the caller attempts to set the name to "" the name will be reset to the default name "job".
type Option ¶
type Option func(*options)
Option - how Options are passed as arguments
func WithControllerId ¶ added in v0.9.0
WithControllerId provides an option to provide the server id to match when calling InterruptRuns
func WithLimit ¶
WithLimit provides an option to provide a limit for ListJobs. Intentionally allowing negative integers. If WithLimit < 0, then unlimited results are returned. If WithLimit == 0, then default limits are used for results.
func WithNextRunIn ¶
WithNextRunIn provides an option to provide the duration until the next run is scheduled. If this option is not provided the NextScheduledRun of the job will default to the current database time, and be available to run immediately.
func WithRunJobsLimit ¶
WithRunJobsLimit provides an option to provide the number of jobs to run. If WithRunJobsLimit == 0, then default run jobs limit is used. If WithRunJobsLimit < 0, then no limit is used.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
A Repository stores and retrieves the persistent types in the job package. It is not safe to use a repository concurrently.
func NewRepository ¶
NewRepository creates a new Repository. The returned repository should only be used for one transaction and it is not safe for concurrent go routines to access it. WithLimit option is used as a repo wide default limit applied to all ListX methods.
func (*Repository) CompleteRun ¶
func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, completed, total int, _ ...Option) (*Run, error)
CompleteRun updates the Run repository entry for the provided runId. It sets the status to 'completed', updates the run's EndTime to the current database time, and sets the completed and total counts. CompleteRun also updates the Job repository entry that is associated with this run, setting the job's NextScheduledRun to the current database time incremented by the nextRunIn parameter.
Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to CompleteRun will return an error with Code errors.InvalidJobRunState. All options are ignored.
func (*Repository) FailRun ¶
func (r *Repository) FailRun(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)
FailRun updates the Run repository entry for the provided runId. It sets the status to 'failed' and updates the run's EndTime to the current database time, and sets the completed and total counts.
Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to FailRun will return an error with Code errors.InvalidJobRunState. All options are ignored.
func (*Repository) InterruptRuns ¶
func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time.Duration, opt ...Option) ([]*Run, error)
InterruptRuns updates the Run repository entries for all job runs that have not been updated for the provided interruptThreshold. It sets the status to 'interrupted' and updates the run's EndTime to the current database time.
Once a run has been persisted with a final run status (completed, failed or interrupted), any future calls to InterruptRuns will return an error with Code errors.InvalidJobRunState. WithControllerId is the only valid option
func (*Repository) ListJobs ¶
ListJobs returns a slice of Jobs.
WithName and WithLimit are the only valid options.
func (*Repository) LookupJob ¶
LookupJob will look up a job in the repository using the job name. If the job is not found, it will return nil, nil.
All options are ignored.
func (*Repository) LookupRun ¶
LookupRun will look up a run in the repository using the runId. If the run is not found, it will return nil, nil.
All options are ignored.
func (*Repository) RunJobs ¶
RunJobs queries the job repository for jobs that need to be run. It creates new entries for each job that needs to be run in the job_run repository, returning a slice of *Run. If there are not jobs to run, an empty slice will be returned with a nil error.
• serverId is required and is the private_id of the server that will run the jobs.
The only valid option is WithRunJobsLimit, if not provided RunJobs will run only 1 job.
func (*Repository) UpdateJobNextRunInAtLeast ¶ added in v0.4.0
func (r *Repository) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, _ ...Option) (*Job, error)
UpdateJobNextRunInAtLeast updates the Job repository entry for the job name, setting the job's NextScheduledRun time to either the current database time incremented by the nextRunInAtLeast parameter or the current NextScheduledRun time value, which ever is sooner.
All options are ignored.
func (*Repository) UpdateProgress ¶
func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error)
UpdateProgress updates the repository entry's completed and total counts for the provided runId.
Once a run has been persisted with a final run status (completed, failed or interrupted), any future UpdateProgress attempts will return an error with Code errors.InvalidJobRunState. All options are ignored.
func (*Repository) UpsertJob ¶ added in v0.7.5
func (r *Repository) UpsertJob(ctx context.Context, name, description string, opt ...Option) (*Job, error)
UpsertJob inserts a job into the repository or updates its current description and returns a new *Job.
• name must be provided and is the name of the job.
• description must be provided and is the user-friendly description of the job.
WithNextRunIn is the only valid options.
type Run ¶
Run represents an instance of a job that is either actively running or has already completed.
func (*Run) SetTableName ¶
SetTableName sets the table name. If the caller attempts to set the name to "" the name will be reset to the default name "job_run".
type Status ¶
type Status string
const ( // Running represents that the job run is actively running on a server Running Status = "running" // Completed represents that the job run has successfully finished Completed Status = "completed" // Failed represent that the job run had an error during execution Failed Status = "failed" // Interrupted represents that the job run was interrupted by a server // other than the server running the job. Interrupted Status = "interrupted" )