job

package
v1.0.4-gitspaces-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProgressMin = 0
	ProgressMax = 100
)
View Source
const (
	PubSubTopicCancelJob   = "gitness:job:cancel_job"
	PubSubTopicStateChange = "gitness:job:state_change"
)

Variables

Functions

func GetAllJobStates

func GetAllJobStates() ([]State, State)

func Sanitize

func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool)

func UID

func UID() (string, error)

UID returns unique random string with length equal to 16.

Types

type Config

type Config struct {
	// InstanceID specifis the ID of the instance.
	InstanceID string `envconfig:"INSTANCE_ID"`

	// MaxRunning is maximum number of jobs that can be running at once.
	BackgroundJobsMaxRunning int `envconfig:"JOBS_MAX_RUNNING" default:"10"`

	// RetentionTime is the duration after which non-recurring,
	// finished and failed jobs will be purged from the DB.
	BackgroundJobsRetentionTime time.Duration `envconfig:"JOBS_RETENTION_TIME" default:"120h"` // 5 days

}

type Definition

type Definition struct {
	UID        string
	Type       string
	MaxRetries int
	Timeout    time.Duration
	Data       string
}

func (*Definition) Validate

func (def *Definition) Validate() error

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor holds map of Handler objects per each job type registered. The Scheduler uses the Executor to start execution of jobs.

func NewExecutor

func NewExecutor(store Store, publisher pubsub.Publisher) *Executor

NewExecutor creates new Executor.

func ProvideExecutor

func ProvideExecutor(
	store Store,
	pubsubService pubsub.PubSub,
) *Executor

func (*Executor) Register

func (e *Executor) Register(jobType string, exec Handler) error

Register registers a job Handler for the provided job type. This function is not thread safe. All calls are expected to be made in a single thread during the application boot time.

type Handler

type Handler interface {
	Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}

Handler is a job executor for a specific job type. An implementation should try to honor the context and try to abort the execution as soon as the context is done.

type Job

type Job struct {
	UID                 string   `db:"job_uid"`
	Created             int64    `db:"job_created"`
	Updated             int64    `db:"job_updated"`
	Type                string   `db:"job_type"`
	Priority            Priority `db:"job_priority"`
	Data                string   `db:"job_data"`
	Result              string   `db:"job_result"`
	MaxDurationSeconds  int      `db:"job_max_duration_seconds"`
	MaxRetries          int      `db:"job_max_retries"`
	State               State    `db:"job_state"`
	Scheduled           int64    `db:"job_scheduled"`
	TotalExecutions     int      `db:"job_total_executions"`
	RunBy               string   `db:"job_run_by"`
	RunDeadline         int64    `db:"job_run_deadline"`
	RunProgress         int      `db:"job_run_progress"`
	LastExecuted        int64    `db:"job_last_executed"`
	IsRecurring         bool     `db:"job_is_recurring"`
	RecurringCron       string   `db:"job_recurring_cron"`
	ConsecutiveFailures int      `db:"job_consecutive_failures"`
	LastFailureError    string   `db:"job_last_failure_error"`
	GroupID             string   `db:"job_group_id"`
}

type Priority

type Priority int

Priority represents priority of a background job.

const (
	JobPriorityNormal   Priority = 0
	JobPriorityElevated Priority = 1
)

JobPriority enumeration.

type Progress

type Progress struct {
	State    State  `json:"state"`
	Progress int    `json:"progress"`
	Result   string `json:"result,omitempty"`
	Failure  string `json:"failure,omitempty"`
}

func FailProgress

func FailProgress() Progress

type ProgressReporter

type ProgressReporter func(progress int, result string) error

ProgressReporter can be used by a job Handler to report back the execution progress.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler controls execution of background jobs.

func NewScheduler

func NewScheduler(
	store Store,
	executor *Executor,
	mxManager lock.MutexManager,
	pubsubService pubsub.PubSub,
	instanceID string,
	maxRunning int,
	retentionTime time.Duration,
) (*Scheduler, error)

func ProvideScheduler

func ProvideScheduler(
	store Store,
	executor *Executor,
	mutexManager lock.MutexManager,
	pubsubService pubsub.PubSub,
	config Config,
) (*Scheduler, error)

func (*Scheduler) AddRecurring

func (s *Scheduler) AddRecurring(
	ctx context.Context,
	jobUID,
	jobType,
	cronDef string,
	maxDur time.Duration,
) error

func (*Scheduler) CancelJob

func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error

CancelJob cancels a currently running or scheduled job.

func (*Scheduler) GetJobProgress

func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error)

func (*Scheduler) GetJobProgressForGroup

func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error)

func (*Scheduler) PurgeJobByUID

func (s *Scheduler) PurgeJobByUID(ctx context.Context, jobUID string) error

func (*Scheduler) PurgeJobsByGroupID

func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (int64, error)

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run runs the background job scheduler. It's a blocking call. It blocks until the provided context is done.

func (*Scheduler) RunJob

func (s *Scheduler) RunJob(ctx context.Context, def Definition) error

RunJob runs a single job of the type Definition.Type. All parameters a job Handler receives must be inside the Definition.Data string (as JSON or whatever the job Handler can interpret).

func (*Scheduler) RunJobs

func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definition) error

RunJobs runs a several jobs. It's more efficient than calling RunJob several times because it locks the DB only once.

func (*Scheduler) WaitJobsDone

func (s *Scheduler) WaitJobsDone(ctx context.Context)

WaitJobsDone waits until execution of all jobs has finished. It is intended to be used for graceful shutdown, after the Run method has finished.

type State

type State string

State represents state of a background job.

const (
	JobStateScheduled State = "scheduled"
	JobStateRunning   State = "running"
	JobStateFinished  State = "finished"
	JobStateFailed    State = "failed"
	JobStateCanceled  State = "canceled"
)

State enumeration.

func (State) Enum

func (State) Enum() []interface{}

func (State) IsCompleted

func (s State) IsCompleted() bool

func (State) Sanitize

func (s State) Sanitize() (State, bool)

type StateChange

type StateChange struct {
	UID      string `json:"uid"`
	Type     string `json:"type"`
	State    State  `json:"state"`
	Progress int    `json:"progress"`
	Result   string `json:"result"`
	Failure  string `json:"failure"`
}

func DecodeStateChange

func DecodeStateChange(payload []byte) (*StateChange, error)

type Store

type Store interface {
	// Find fetches a job by its unique identifier.
	Find(ctx context.Context, uid string) (*Job, error)

	// ListByGroupID fetches all jobs for a group id
	ListByGroupID(ctx context.Context, groupID string) ([]*Job, error)

	// DeleteByGroupID deletes all jobs for a group id
	DeleteByGroupID(ctx context.Context, groupID string) (int64, error)

	// Create is used to create a new job.
	Create(ctx context.Context, job *Job) error

	// Upsert will insert the job in the database if the job didn't already exist,
	// or it will update the existing one but only if its definition has changed.
	Upsert(ctx context.Context, job *Job) error

	// UpdateDefinition is used to update a job definition.
	UpdateDefinition(ctx context.Context, job *Job) error

	// UpdateExecution is used to update a job before and after execution.
	UpdateExecution(ctx context.Context, job *Job) error

	// UpdateProgress is used to update a job progress data.
	UpdateProgress(ctx context.Context, job *Job) error

	// CountRunning returns number of jobs that are currently being run.
	CountRunning(ctx context.Context) (int, error)

	// ListReady returns a list of jobs that are ready for execution.
	ListReady(ctx context.Context, now time.Time, limit int) ([]*Job, error)

	// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
	ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*Job, error)

	// NextScheduledTime returns a scheduled time of the next ready job.
	NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error)

	// DeleteOld removes non-recurring jobs that have finished execution or have failed.
	DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)

	// DeleteByUID deletes a job by its unique identifier.
	DeleteByUID(ctx context.Context, jobUID string) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL