Documentation
¶
Index ¶
- Constants
- Variables
- type Config
- type CronJob
- type CronJobInput
- type DeleteOptions
- type Job
- type JobWithSchedule
- type ListJobsOptions
- type PauseResumeOptions
- type RawJob
- type Repository
- type RepositoryGorm
- func (r *RepositoryGorm) AddJobs(jobs []JobWithSchedule) error
- func (r *RepositoryGorm) DeleteJobsByIds(jobsID []int64) error
- func (r *RepositoryGorm) ErrorTypeIfMismatchCount() error
- func (r *RepositoryGorm) GetAllJobsToExecute() ([]JobWithSchedule, error)
- func (r *RepositoryGorm) GetJob(jobID int64) (JobWithSchedule, error)
- func (r *RepositoryGorm) GetJobsByIds(jobsID []int64) ([]JobWithSchedule, error)
- func (r *RepositoryGorm) ListJobs(options ToListOptions) ([]RawJob, error)
- func (r *RepositoryGorm) PauseJobs(jobs []RawJob) error
- func (r *RepositoryGorm) ResumeJobs(jobs []JobWithSchedule) error
- func (r *RepositoryGorm) SetCronId(jobs []JobWithSchedule) error
- func (r *RepositoryGorm) SetCronIdAndChangeScheduleAndJobInput(jobs []JobWithSchedule) error
- type RepositoryGormConfig
- type SchedulerConfig
- type SmallBen
- func (s *SmallBen) AddJobs(jobs []Job) error
- func (s *SmallBen) DeleteJobs(options *DeleteOptions) error
- func (s *SmallBen) ErrorTypeIfMismatchCount() error
- func (s *SmallBen) ListJobs(options *ListJobsOptions) ([]Job, error)
- func (s *SmallBen) PauseJobs(options *PauseResumeOptions) error
- func (s *SmallBen) RegisterMetrics(registry *prometheus.Registry) error
- func (s *SmallBen) ResumeJobs(options *PauseResumeOptions) error
- func (s *SmallBen) Start() error
- func (s *SmallBen) Stop()
- func (s *SmallBen) UpdateJobs(scheduleInfo []UpdateOption) error
- type ToListOptions
- type UpdateOption
Constants ¶
const DefaultCronID = int64(0)
DefaultCronID is the CronID of a job that has not been inserted yet.
Variables ¶
var DefaultLogger cron.Logger
DefaultLogger is the Zap logger that is used in case none is provided.
var ( // ErrUpdateOptionInvalid is returned when the fields // of UpdateOption are invalid. // This error is returned when the combination // of the fields is not valid (i.e., both nil). // For error in the CronExpression field, // the specific error set by the library is returned, ErrUpdateOptionInvalid = errors.New("invalid option") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // SchedulerConfig configures the scheduler SchedulerConfig SchedulerConfig // Logger is the logger to use. Logger logr.Logger }
Config is the struct configuring the overall SmallBen object.
type CronJob ¶
type CronJob interface {
Run(input CronJobInput)
}
CronJob is the interface jobs have to implement. It contains only one single method, `Run`.
type CronJobInput ¶
type CronJobInput struct { // JobID is the ID of the current job. JobID int64 // GroupID is the GroupID of the current job. GroupID int64 // SuperGroupID is the SuperGroupID of the current job. SuperGroupID int64 // CronExpression is the interval of execution, as specified on job creation. CronExpression string // OtherInputs contains the other inputs of the job. OtherInputs map[string]interface{} }
CronJobInput is the input passed to the Run function.
type DeleteOptions ¶
type DeleteOptions struct { PauseResumeOptions // Paused specifies whether to delete paused // jobs or not (or do not care about it). Paused *bool }
PauseResumeOptions governs the behavior of the DeleteJobs method.
type Job ¶
type Job struct { // ID is a unique ID identifying the rawJob object. // It is chosen by the user. ID int64 // GroupID is the ID of the group this rawJob is inserted in. GroupID int64 // SuperGroupID specifies the ID of the super group // where this group is contained in. SuperGroupID int64 // CronExpression specifies the scheduling of the job. CronExpression string // Job is the real unit of work to be executed Job CronJob // JobInput is the additional input to pass to the inner Job. JobInput map[string]interface{} // contains filtered or unexported fields }
Job is the struct used to interact with SmallBen.
func (*Job) CreatedAt ¶ added in v0.9.5
CreatedAt returns the time when this Job has been added to the scheduler.
type JobWithSchedule ¶
type JobWithSchedule struct {
// contains filtered or unexported fields
}
JobWithSchedule is a RawJob object with a cron.Schedule object in it. The schedule can be accessed by using the Schedule() method. This object should be created only by calling the method toJobWithSchedule().
func (*JobWithSchedule) BuildJob ¶
func (j *JobWithSchedule) BuildJob() (RawJob, error)
BuildJob builds the raw version of the inner job, by encoding it. In particular, the encoding is done as follows: - for the serialized job, it is encoded in Gob and then in base64 - for the job input, it is encoded in json. This is needed since, when converting from a `RawJob` to a `JobWithSchedule`, the binary serialization of the Job is not kept in memory.
type ListJobsOptions ¶
type ListJobsOptions struct { // Paused controls the `paused` field. // If paused = true list all jobs that have been paused. // If paused = false list all jobs that have not been paused. // If paused = nil list all jobs no matter if they have been paused or not. Paused *bool // GroupIDs filters the jobs in the given Group ID. // if nil, it is ignored. // It makes ListJobs returning all the jobs whose GroupID // is in GroupIDs GroupIDs []int64 // SuperGroupIDs filters the jobs by the given Super Group ID. // if nil, it is ignored. // It makes ListJobs returning all the jobs whose SuperGroupID // is in SuperGroupIDs SuperGroupIDs []int64 // JobIDs filters the jobs by the given job ID. // This option logically overrides other options // since it is the most specific. JobIDs []int64 }
ListJobsOptions defines the options to use when listing the jobs. All options are *combined*, i.e., with an `AND`.
type PauseResumeOptions ¶
type PauseResumeOptions struct { // JobIDs specifies which jobs will be // paused or resumed. JobIDs []int64 // GroupIDs specifies the group ids // whose jobs will be paused or resumed. GroupIDs []int64 // SuperGroupIDs specifies the super group ids // whose jobs will be paused or resumed. SuperGroupIDs []int64 }
PauseResumeOptions governs the behavior of the PauseJobs and ResumeJobs methods. The eventual fields are combined when querying the database.
type RawJob ¶
type RawJob struct { // ID is a unique ID identifying the rawJob object. // It is chosen by the user. ID int64 `gorm:"primaryKey,column:id"` // GroupID is the ID of the group this rawJob is inserted in. GroupID int64 `gorm:"column:group_id"` // SuperGroupID specifies the ID of the super group // where this group is contained in. SuperGroupID int64 `gorm:"column:super_group_id"` // CronID is the ID of the cron rawJob as assigned by the scheduler // internally. CronID int64 `gorm:"column:cron_id"` // CronExpression specifies the scheduling of the job. CronExpression string `gorm:"column:cron_expression"` // Paused specifies whether this rawJob has been paused. Paused bool `gorm:"column:paused"` // CreatedAt specifies when this rawJob has been created. CreatedAt time.Time `gorm:"column:created_at"` // UpdatedAt specifies the last time this object has been updated, // i.e., paused/resumed/schedule updated. UpdatedAt time.Time `gorm:"column:updated_at"` // SerializedJob is the base64(gob-encoded byte array) // of the interface executing this rawJob SerializedJob string `gorm:"column:serialized_job"` // SerializedJobInput is the base64(gob-encoded byte array) // of the map containing the argument for the job. SerializedJobInput string `gorm:"column:serialized_job_input"` }
RawJob models a raw rawJob coming from the database.
func (*RawJob) ToJobWithSchedule ¶
func (j *RawJob) ToJobWithSchedule() (JobWithSchedule, error)
ToJobWithSchedule returns a JobWithSchedule object from the current RawJob, by copy. It returns errors in case the given schedule is not valid, or in case the conversion of the rawJob interface/input fails. It does NOT copy the byte arrays from j.
type Repository ¶
type Repository interface { // AddJobs adds `jobs` to the backend. This operation // must be atomic. AddJobs(jobs []JobWithSchedule) error // GetJob returns the job whose ID is `jobID`, returning // an error in case the required job has not been found. // That error must be of the type returned by ErrorTypeIfMismatchCount(). GetJob(jobID int64) (JobWithSchedule, error) // PauseJobs pause `jobs`, i.e., marks them as paused. // This operation must be atomic. // // It must return an error of type ErrorTypeIfMismatchCount() in case // the number of updated jobs is different than the number // of required jobs, i.e., `len(jobs)`. // It should update all the jobs it can, i.e., not stopping // at the first ID not found. PauseJobs(jobs []RawJob) error // ResumeJobs resumes `jobs`, i.e., marks them as non-paused. // This operation must be atomic. // // It must return an error of type ErrorTypeIfMismatchCount() in case // the number of updated jobs is different than the number // of required jobs, i.e., `len(jobs)`. // It should update all the jobs it can, i.e., not stopping // at the first ID not found. ResumeJobs(jobs []JobWithSchedule) error // GetAllJobsToExecute returns all jobs the scheduler // should execute on startup, i.e., all jobs whose `paused` field // is set to `false`. GetAllJobsToExecute() ([]JobWithSchedule, error) // GetJobsByIds returns an array of jobs whose id is in `jobsID`. // // It must return an error of type ErrorTypeIfMismatchCount() in case // the number of returned jobs is different than the number // of required jobs, i.e., `len(jobsID)`. GetJobsByIds(jobsID []int64) ([]JobWithSchedule, error) // DeleteJobsByIds deletes all the jobs whose id is in `jobsID`. // // It must return an error of type ErrorTypeIfMismatchCount() in case // the number of deleted jobs is different than the number // of required jobs, i.e., `len(jobsID)`. DeleteJobsByIds(jobsID []int64) error // SetCronId updates the `cron_id` field of `jobs`. // // It must return an error of type ErrorTypeIfMismatchCount() in case // the number of updated jobs is different than the number // of required jobs, i.e., `len(jobs)`. SetCronId(jobs []JobWithSchedule) error // SetCronIdAndChangeSchedule updates the `cron_id`, `cron_expression` // and `job_input` fields of RawJob. // // It must return an error of type ErrorType() in case // the number of updated jobs is different than the number // of required jobs, i.e., `len(jobs)`. SetCronIdAndChangeScheduleAndJobInput(jobs []JobWithSchedule) error // ListJobs list all the jobs present in the job storage backend, // according to `options`. // // If options is `nil`, no filtering is applied. ListJobs(options ToListOptions) ([]RawJob, error) // ErrorTypeIfMismatchCount specifies the error type // to return in case there is a mismatch count // between the number of jobs involved in a backend operation // and the number of supposed jobs that should have been involved // in such an operation. ErrorTypeIfMismatchCount() error }
Repository is the interface whose storage backends should implement.
type RepositoryGorm ¶
type RepositoryGorm struct {
// contains filtered or unexported fields
}
RepositoryGorm implements the Repository interface by the means of GORM.
func NewRepositoryGorm ¶
func NewRepositoryGorm(config *RepositoryGormConfig) (*RepositoryGorm, error)
NewRepositoryGorm returns an instance of the repository connecting to the given database.
func (*RepositoryGorm) AddJobs ¶
func (r *RepositoryGorm) AddJobs(jobs []JobWithSchedule) error
AddJobs adds `jobs` to the database. This operation can fail if the job serialized fails, or for database errors.
func (*RepositoryGorm) DeleteJobsByIds ¶
func (r *RepositoryGorm) DeleteJobsByIds(jobsID []int64) error
DeleteJobsByIds delete jobs whose ids are 'jobsID`, returning an error of type gorm.ErrRecordNotFound if the number of deleted jobs is less than the length of `jobsID`.
func (*RepositoryGorm) ErrorTypeIfMismatchCount ¶
func (r *RepositoryGorm) ErrorTypeIfMismatchCount() error
ErrorTypeIfMismatchCount returns the error returned during operations where the number of involved jobs is different than the number of expected jobs that should have been involved.
func (*RepositoryGorm) GetAllJobsToExecute ¶
func (r *RepositoryGorm) GetAllJobsToExecute() ([]JobWithSchedule, error)
GetAllJobsToExecute returns all the jobs whose `paused` field is set to `false`.
func (*RepositoryGorm) GetJob ¶
func (r *RepositoryGorm) GetJob(jobID int64) (JobWithSchedule, error)
GetJob returns the JobWithSchedule whose id is `jobID`. In case the job is not found, an error of type gorm.ErrRecordNotFound is returned.
func (*RepositoryGorm) GetJobsByIds ¶
func (r *RepositoryGorm) GetJobsByIds(jobsID []int64) ([]JobWithSchedule, error)
GetJobsByIds returns all the jobsToAdd whose ids are in `jobsID`. Returns an error of type `gorm.ErrRecordNotFound` in case there are less jobs than the requested ones.
func (*RepositoryGorm) ListJobs ¶
func (r *RepositoryGorm) ListJobs(options ToListOptions) ([]RawJob, error)
ListJobs list all jobs using options. If nil, no options will be used, thus returning all the jobs.
func (*RepositoryGorm) PauseJobs ¶
func (r *RepositoryGorm) PauseJobs(jobs []RawJob) error
PauseJobs pause jobs whose id are in `jobs`. It returns an error `gorm.ErrRecordNotFound` in case the number of updated rows is different than the length of jobsToAdd.
func (*RepositoryGorm) ResumeJobs ¶
func (r *RepositoryGorm) ResumeJobs(jobs []JobWithSchedule) error
PauseJobs resume jobs whose id are in `jobs`. It returns an error `gorm.ErrRecordNotFound` in case the number of updated rows is different then the length of jobsToAdd.
func (*RepositoryGorm) SetCronId ¶
func (r *RepositoryGorm) SetCronId(jobs []JobWithSchedule) error
SetCronId updates the cron_id field of `jobs`.
func (*RepositoryGorm) SetCronIdAndChangeScheduleAndJobInput ¶ added in v0.10.0
func (r *RepositoryGorm) SetCronIdAndChangeScheduleAndJobInput(jobs []JobWithSchedule) error
SetCronIdAndChangeScheduleAndJobInput updates the fields `cron_id`, `cron_expression` and `serialized_job_input` of jobs.
In particular, the job input must have been set internally, since this call will encode the job input.
type RepositoryGormConfig ¶
type RepositoryGormConfig struct { // Dialector is the dialector to use to connect to the database Dialector gorm.Dialector // Config is the configuration to use to connect to the database. Config gorm.Config }
RepositoryGormConfig regulates the internal working of the scheduler.
type SchedulerConfig ¶
type SchedulerConfig struct { // DelayIfStillRunning delays a job starting // if that job has not finished yet. // Equivalent to attaching: https://pkg.go.dev/github.com/robfig/cron/v3#DelayIfStillRunning DelayIfStillRunning bool // SkipIfStillRunning skips a job starting // if that job has not finished yet. // Equivalent to attaching: https://pkg.go.dev/github.com/robfig/cron/v3#SkipIfStillRunning SkipIfStillRunning bool // WithSeconds enable seconds-grained scheduling. // Equivalent to: https://pkg.go.dev/github.com/robfig/cron/v3#WithSeconds WithSeconds bool // WithLocation sets the location for the scheduler. // Equivalent to: https://pkg.go.dev/github.com/robfig/cron/v3#WithLocation WithLocation *time.Location // contains filtered or unexported fields }
SchedulerConfig contains the configuration for the scheduler. It provides most of the option for configuring cron through this struct instead of using the Option-style pattern. Note that the parser used to parse cron entries cannot be set, and only the default cron parser works, i.e., no way to set option https://pkg.go.dev/github.com/robfig/cron/v3#WithParser.
type SmallBen ¶
type SmallBen struct {
// contains filtered or unexported fields
}
SmallBen is the struct managing the persistent scheduler state. SmallBen is *goroutine-safe*, since all access are protected by a r-w lock.
func New ¶
func New(repository Repository, config *Config) *SmallBen
New creates a new instance of SmallBen. It takes in input the repository and the configuration for the scheduler.
func (*SmallBen) DeleteJobs ¶
func (s *SmallBen) DeleteJobs(options *DeleteOptions) error
DeleteJobs deletes permanently jobs according to options. It returns an error of type repository.ErrorTypeIfMismatchCount() if the number of deleted jobs does not match the expected one.
func (*SmallBen) ErrorTypeIfMismatchCount ¶ added in v0.9.2
ErrorTypeIfMismatchCount returns the error returned during operations where the number of involved jobs is different than the number of expected jobs that should have been involved.
func (*SmallBen) ListJobs ¶
func (s *SmallBen) ListJobs(options *ListJobsOptions) ([]Job, error)
ListJobs returns the jobs according to `options`. It may fail in case of: - backend error - deserialization error
func (*SmallBen) PauseJobs ¶
func (s *SmallBen) PauseJobs(options *PauseResumeOptions) error
PauseJobs pauses the jobs according to the filter defined in options. If no jobs matching options are found, an error of type ErrorTypeIfMismatchCount is returned.
func (*SmallBen) RegisterMetrics ¶
func (s *SmallBen) RegisterMetrics(registry *prometheus.Registry) error
RegisterMetrics registers the prometheus metrics to registry. If registry is nil, then they are registered to the default registry.
func (*SmallBen) ResumeJobs ¶
func (s *SmallBen) ResumeJobs(options *PauseResumeOptions) error
ResumeTests restarts the RawJob according to options. Eventual jobsToAdd that were not paused, will keep run smoothly. In case of errors during the last steps of the execution, the jobsToAdd are removed from the scheduler. If no jobs matching options are found, an error of type ErrorTypeIfMismatchCount is returned.
func (*SmallBen) Start ¶
Start starts the SmallBen, by starting the inner scheduler and filling it in with the needed RawJob. This call is idempotent and goroutine-safe.
func (*SmallBen) Stop ¶
func (s *SmallBen) Stop()
Stop stops the SmallBen. This call will block until all *running* jobs have finished their current execution.
func (*SmallBen) UpdateJobs ¶ added in v0.10.0
func (s *SmallBen) UpdateJobs(scheduleInfo []UpdateOption) error
UpdateOption updates the scheduler internal state according to `scheduleInfo`. In particular, two things can be updated:
* the schedule of the Job
* the JobOtherInputs of the Job.
In case of errors, it is guaranteed that, in the worst case, jobs will be removed from the scheduler will still being in the database with the old schedule and old JobOtherInputs.
type ToListOptions ¶
type ToListOptions interface {
// contains filtered or unexported methods
}
ToListOptions is an interface implemented by structs that can be converted to a ListOptions struct.
type UpdateOption ¶ added in v0.10.0
type UpdateOption struct { // JobID is the ID of the Job to update. JobID int64 // CronExpression is the new schedule of the Job. // If nil, it is ignored, i.e., the schedule // is not changed. CronExpression *string // JobOtherInputs is the new OtherInputs of the Job. // If nil, it is ignored, i.e., // the Job input is not changed. JobOtherInputs *map[string]interface{} }
UpdateOption is the struct used to update a Job.
An update on a Job consists in changing the schedule, by using the field `CronExpression`, or the Job input, by using the field `JobOtherInputs`.
If none of those fields are specified, i.e., they are both nil, the struct is considered invalid.
func (*UpdateOption) Valid ¶ added in v0.10.0
func (u *UpdateOption) Valid() error
Valid returns whether the fields in this struct are valid. If the struct is valid, no errors are returned.
UpdateOption is considered valid if at least one field between CronExpression and JobOtherInputs are not nil, and the cron string can be parsed.