Documentation
¶
Index ¶
- Constants
- Variables
- func GetAllJobStates() ([]State, State)
- func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool)
- func UID() (string, error)
- type Config
- type Definition
- type Executor
- type Handler
- type Job
- type Priority
- type Progress
- type ProgressReporter
- type Scheduler
- func (s *Scheduler) AddRecurring(ctx context.Context, jobUID, jobType, cronDef string, maxDur time.Duration) error
- func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error
- func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error)
- func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error)
- func (s *Scheduler) PurgeJobByUID(ctx context.Context, jobUID string) error
- func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (int64, error)
- func (s *Scheduler) Run(ctx context.Context) error
- func (s *Scheduler) RunJob(ctx context.Context, def Definition) error
- func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error
- func (s *Scheduler) WaitJobsDone(ctx context.Context)
- type State
- type StateChange
- type Store
Constants ¶
const ( ProgressMin = 0 ProgressMax = 100 )
const ( PubSubTopicCancelJob = "gitness:job:cancel_job" PubSubTopicStateChange = "gitness:job:state_change" )
Variables ¶
var WireSet = wire.NewSet( ProvideExecutor, ProvideScheduler, )
Functions ¶
func GetAllJobStates ¶
func Sanitize ¶
func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool)
Types ¶
type Config ¶
type Config struct { // InstanceID specifis the ID of the instance. InstanceID string `envconfig:"INSTANCE_ID"` // MaxRunning is maximum number of jobs that can be running at once. BackgroundJobsMaxRunning int `envconfig:"JOBS_MAX_RUNNING" default:"10"` // RetentionTime is the duration after which non-recurring, // finished and failed jobs will be purged from the DB. BackgroundJobsRetentionTime time.Duration `envconfig:"JOBS_RETENTION_TIME" default:"120h"` // 5 days }
type Definition ¶
func (*Definition) Validate ¶
func (def *Definition) Validate() error
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor holds map of Handler objects per each job type registered. The Scheduler uses the Executor to start execution of jobs.
func NewExecutor ¶
NewExecutor creates new Executor.
type Handler ¶
type Handler interface {
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}
Handler is a job executor for a specific job type. An implementation should try to honor the context and try to abort the execution as soon as the context is done.
type Job ¶
type Job struct { UID string `db:"job_uid"` Created int64 `db:"job_created"` Updated int64 `db:"job_updated"` Type string `db:"job_type"` Priority Priority `db:"job_priority"` Data string `db:"job_data"` Result string `db:"job_result"` MaxDurationSeconds int `db:"job_max_duration_seconds"` MaxRetries int `db:"job_max_retries"` State State `db:"job_state"` Scheduled int64 `db:"job_scheduled"` TotalExecutions int `db:"job_total_executions"` RunBy string `db:"job_run_by"` RunDeadline int64 `db:"job_run_deadline"` RunProgress int `db:"job_run_progress"` LastExecuted int64 `db:"job_last_executed"` IsRecurring bool `db:"job_is_recurring"` RecurringCron string `db:"job_recurring_cron"` ConsecutiveFailures int `db:"job_consecutive_failures"` LastFailureError string `db:"job_last_failure_error"` GroupID string `db:"job_group_id"` }
type Progress ¶
type Progress struct { State State `json:"state"` Progress int `json:"progress"` Result string `json:"result,omitempty"` Failure string `json:"failure,omitempty"` }
func FailProgress ¶
func FailProgress() Progress
type ProgressReporter ¶
ProgressReporter can be used by a job Handler to report back the execution progress.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler controls execution of background jobs.
func NewScheduler ¶
func ProvideScheduler ¶
func (*Scheduler) AddRecurring ¶
func (*Scheduler) GetJobProgress ¶
func (*Scheduler) GetJobProgressForGroup ¶
func (*Scheduler) PurgeJobByUID ¶
func (*Scheduler) PurgeJobsByGroupID ¶
func (*Scheduler) Run ¶
Run runs the background job scheduler. It's a blocking call. It blocks until the provided context is done.
func (*Scheduler) RunJob ¶
func (s *Scheduler) RunJob(ctx context.Context, def Definition) error
RunJob runs a single job of the type Definition.Type. All parameters a job Handler receives must be inside the Definition.Data string (as JSON or whatever the job Handler can interpret).
func (*Scheduler) RunJobs ¶
RunJobs runs a several jobs. It's more efficient than calling RunJob several times because it locks the DB only once.
func (*Scheduler) WaitJobsDone ¶
WaitJobsDone waits until execution of all jobs has finished. It is intended to be used for graceful shutdown, after the Run method has finished.
type State ¶
type State string
State represents state of a background job.
const ( JobStateScheduled State = "scheduled" JobStateRunning State = "running" JobStateFinished State = "finished" JobStateFailed State = "failed" JobStateCanceled State = "canceled" )
State enumeration.
func (State) IsCompleted ¶
type StateChange ¶
type StateChange struct { UID string `json:"uid"` Type string `json:"type"` State State `json:"state"` Progress int `json:"progress"` Result string `json:"result"` Failure string `json:"failure"` }
func DecodeStateChange ¶
func DecodeStateChange(payload []byte) (*StateChange, error)
type Store ¶
type Store interface { // Find fetches a job by its unique identifier. Find(ctx context.Context, uid string) (*Job, error) // ListByGroupID fetches all jobs for a group id ListByGroupID(ctx context.Context, groupID string) ([]*Job, error) // DeleteByGroupID deletes all jobs for a group id DeleteByGroupID(ctx context.Context, groupID string) (int64, error) // Create is used to create a new job. Create(ctx context.Context, job *Job) error // Upsert will insert the job in the database if the job didn't already exist, // or it will update the existing one but only if its definition has changed. Upsert(ctx context.Context, job *Job) error // UpdateDefinition is used to update a job definition. UpdateDefinition(ctx context.Context, job *Job) error // UpdateExecution is used to update a job before and after execution. UpdateExecution(ctx context.Context, job *Job) error // UpdateProgress is used to update a job progress data. UpdateProgress(ctx context.Context, job *Job) error // CountRunning returns number of jobs that are currently being run. CountRunning(ctx context.Context) (int, error) // ListReady returns a list of jobs that are ready for execution. ListReady(ctx context.Context, now time.Time, limit int) ([]*Job, error) // ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*Job, error) // NextScheduledTime returns a scheduled time of the next ready job. NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) // DeleteOld removes non-recurring jobs that have finished execution or have failed. DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) // DeleteByUID deletes a job by its unique identifier. DeleteByUID(ctx context.Context, jobUID string) error }