Documentation ¶
Overview ¶
Package cron is an implementation of a job scheduler to run within a worker or a server. It allows developers to configure flexible schedules to run jobs, and trigger retries on failure.
Index ¶
- Constants
- Variables
- func Deref(t *time.Time) time.Time
- func FormatTime(t time.Time) string
- func IsJobAlreadyLoaded(err error) bool
- func IsJobCancelled(ctx context.Context) bool
- func IsJobNotLoaded(err error) bool
- func IsTaskNotFound(err error) bool
- func IsWeekDay(day time.Weekday) bool
- func IsWeekendDay(day time.Weekday) bool
- func Max(t1, t2 time.Time) time.Time
- func Min(t1, t2 time.Time) time.Time
- func NewEventListener(listener func(e *Event)) logger.Listener
- func Now() time.Time
- func Optional(t time.Time) *time.Time
- func SetDefault(jm *JobManager)
- func Since(t time.Time) time.Duration
- type Any
- type AtomicCounter
- type AtomicFlag
- type CancellationSignalReciever
- type Config
- type DailySchedule
- type EnabledProvider
- type Error
- type Event
- func (e Event) Complete() bool
- func (e Event) Elapsed() time.Duration
- func (e Event) Err() error
- func (e Event) IsEnabled() bool
- func (e Event) IsWritable() bool
- func (e Event) TaskName() string
- func (e *Event) WithAnnotation(key, value string) *Event
- func (e *Event) WithElapsed(d time.Duration) *Event
- func (e *Event) WithErr(err error) *Event
- func (e *Event) WithFlag(f logger.Flag) *Event
- func (e *Event) WithHeadings(headings ...string) *Event
- func (e *Event) WithIsEnabled(isEnabled bool) *Event
- func (e *Event) WithIsWritable(isWritable bool) *Event
- func (e *Event) WithLabel(key, value string) *Event
- func (e *Event) WithTaskName(taskName string) *Event
- func (e *Event) WithTimestamp(ts time.Time) *Event
- func (e Event) WriteJSON() logger.JSONObj
- func (e Event) WriteText(tf logger.TextFormatter, buf *bytes.Buffer)
- type EventShouldWriteOutputProvider
- type EventTriggerListenersProvider
- type ImmediateSchedule
- type IntervalSchedule
- type Job
- type JobFactory
- func (jf *JobFactory) Action() TaskAction
- func (jf *JobFactory) Execute(ctx context.Context) error
- func (jf *JobFactory) IsEnabled() bool
- func (jf *JobFactory) Name() string
- func (jf *JobFactory) Schedule() Schedule
- func (jf *JobFactory) ShowMessages() bool
- func (jf *JobFactory) Timeout() time.Duration
- func (jf *JobFactory) WithAction(action TaskAction) *JobFactory
- func (jf *JobFactory) WithIsEnabledProvider(provider func() bool) *JobFactory
- func (jf *JobFactory) WithName(name string) *JobFactory
- func (jf *JobFactory) WithSchedule(schedule Schedule) *JobFactory
- func (jf *JobFactory) WithShowMessagesProvider(provider func() bool) *JobFactory
- func (jf *JobFactory) WithTimeout(timeout time.Duration) *JobFactory
- type JobManager
- func (jm *JobManager) CancelTask(taskName string) (err error)
- func (jm *JobManager) DisableJob(jobName string) error
- func (jm *JobManager) DisableJobs(jobNames ...string) error
- func (jm *JobManager) EnableJob(jobName string) error
- func (jm *JobManager) EnableJobs(jobNames ...string) error
- func (jm *JobManager) HasJob(jobName string) (hasJob bool)
- func (jm *JobManager) HeartbeatInterval() time.Duration
- func (jm *JobManager) IsDisabled(jobName string) (value bool)
- func (jm *JobManager) IsRunning(taskName string) (isRunning bool)
- func (jm *JobManager) Job(jobName string) (job Job)
- func (jm *JobManager) LoadJob(job Job) error
- func (jm *JobManager) LoadJobs(jobs ...Job) error
- func (jm *JobManager) Logger() *logger.Logger
- func (jm *JobManager) ReadAllJobs(action func(jobs map[string]*JobMeta))
- func (jm *JobManager) RunAllJobs() (err error)
- func (jm *JobManager) RunJob(jobName string) error
- func (jm *JobManager) RunJobs(jobNames ...string) (err error)
- func (jm *JobManager) RunTask(task Task) error
- func (jm *JobManager) Start()
- func (jm *JobManager) Status() *Status
- func (jm *JobManager) Stop()
- func (jm *JobManager) Tracer() Tracer
- func (jm *JobManager) WithDefaultHeartbeat() *JobManager
- func (jm *JobManager) WithHeartbeatInterval(interval time.Duration) *JobManager
- func (jm *JobManager) WithHighPrecisionHeartbeat() *JobManager
- func (jm *JobManager) WithLogger(log *logger.Logger) *JobManager
- func (jm *JobManager) WithTracer(tracer Tracer) *JobManager
- type JobMeta
- type Labels
- type OnCancellationReceiver
- type OnCompleteReceiver
- type OnDemandSchedule
- type OnStartReceiver
- type OnTheHour
- type OnTheHourAt
- type OnTheQuarterHour
- type OnceAtSchedule
- type ResumeProvider
- type Schedule
- func DailyAt(hour, minute, second int) Schedule
- func Every(interval time.Duration) Schedule
- func EveryHour() Schedule
- func EveryHourAt(minute int) Schedule
- func EveryHourOnTheHour() Schedule
- func EveryMinute() Schedule
- func EveryQuarterHour() Schedule
- func EverySecond() Schedule
- func OnDemand() Schedule
- func OnceAt(t time.Time) Schedule
- func WeekdaysAt(hour, minute, second int) Schedule
- func WeekendsAt(hour, minute, second int) Schedule
- func WeeklyAt(hour, minute, second int, days ...time.Weekday) Schedule
- type SerialProvider
- type State
- type Status
- type StatusProvider
- type Task
- type TaskAction
- type TaskMeta
- type TaskStatus
- type TimeoutProvider
- type TraceFinisher
- type Tracer
- type Vars
Constants ¶
const ( // DefaultHeartbeatInterval is the interval between schedule next run checks. DefaultHeartbeatInterval = 100 * time.Millisecond // DefaultHighPrecisionHeartbeatInterval is the high precision interval between schedule next run checks. DefaultHighPrecisionHeartbeatInterval = 10 * time.Millisecond )
const ( // FlagStarted is an event flag. FlagStarted logger.Flag = "cron.started" // FlagFailed is an event flag. FlagFailed logger.Flag = "cron.failed" // FlagComplete is an event flag. FlagComplete logger.Flag = "cron.complete" // FlagCancelled is an event flag. FlagCancelled logger.Flag = "cron.cancelled" )
const ( // AllDaysMask is a bitmask of all the days of the week. AllDaysMask = 1<<uint(time.Sunday) | 1<<uint(time.Monday) | 1<<uint(time.Tuesday) | 1<<uint(time.Wednesday) | 1<<uint(time.Thursday) | 1<<uint(time.Friday) | 1<<uint(time.Saturday) // WeekDaysMask is a bitmask of all the weekdays of the week. WeekDaysMask = 1<<uint(time.Monday) | 1<<uint(time.Tuesday) | 1<<uint(time.Wednesday) | 1<<uint(time.Thursday) | 1<<uint(time.Friday) //WeekendDaysMask is a bitmask of the weekend days of the week. WeekendDaysMask = 1<<uint(time.Sunday) | 1<<uint(time.Saturday) )
NOTE: we have to use shifts here because in their infinite wisdom google didn't make these values powers of two for masking
const (
// EnvVarHeartbeatInterval is an environment variable name.
EnvVarHeartbeatInterval = "CRON_HEARTBEAT_INTERVAL"
)
Variables ¶
var ( // DaysOfWeek are all the time.Weekday in an array for utility purposes. DaysOfWeek = []time.Weekday{ time.Sunday, time.Monday, time.Tuesday, time.Wednesday, time.Thursday, time.Friday, time.Saturday, } // WeekDays are the business time.Weekday in an array. WeekDays = []time.Weekday{ time.Monday, time.Tuesday, time.Wednesday, time.Thursday, time.Friday, } // WeekWeekEndDaysDays are the weekend time.Weekday in an array. WeekendDays = []time.Weekday{ time.Sunday, time.Saturday, } // Epoch is unix epoch saved for utility purposes. Epoch = time.Unix(0, 0) )
NOTE: time.Zero()? what's that?
Functions ¶
func IsJobAlreadyLoaded ¶
IsJobAlreadyLoaded returns if the error is a job already loaded error.
func IsJobCancelled ¶
IsJobCancelled check if a job is cancelled
func IsJobNotLoaded ¶
IsJobNotLoaded returns if the error is a job not loaded error.
func IsTaskNotFound ¶
IsTaskNotFound returns if the error is a task not found error.
func IsWeekendDay ¶
IsWeekendDay returns if the day is a monday->friday.
func NewEventListener ¶
NewEventListener returns a new event listener.
Types ¶
type AtomicCounter ¶
type AtomicCounter struct {
// contains filtered or unexported fields
}
AtomicCounter is a counter to help with atomic operations.
func (*AtomicCounter) Get ¶
func (ac *AtomicCounter) Get() (value int32)
Get returns the counter value.
type AtomicFlag ¶
type AtomicFlag struct {
// contains filtered or unexported fields
}
AtomicFlag is a boolean value that is syncronized.
type CancellationSignalReciever ¶
type CancellationSignalReciever func()
CancellationSignalReciever is a function that can be used as a receiver for cancellation signals.
type Config ¶
type Config struct {
HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval" env:"CRON_HEARTBEAT_INTERVAL"`
}
Config is the config object.
func MustNewConfigFromEnv ¶
func MustNewConfigFromEnv() *Config
MustNewConfigFromEnv returns a new config set from environment variables, it will panic if there is an error.
func NewConfigFromEnv ¶
NewConfigFromEnv creates a new config from the environment.
type DailySchedule ¶
DailySchedule is a schedule that fires every day that satisfies the DayOfWeekMask at the given TimeOfDayUTC.
func (DailySchedule) GetNextRunTime ¶
func (ds DailySchedule) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime implements Schedule.
type EnabledProvider ¶
type EnabledProvider interface {
Enabled() bool
}
EnabledProvider is an optional interface that will allow jobs to control if they're enabled.
type Event ¶
Event is an event.
func (Event) IsWritable ¶
IsWritable determines if the event is written to the logger output.
func (*Event) WithAnnotation ¶
WithAnnotation adds an annotation to the event.
func (*Event) WithElapsed ¶
WithElapsed sets the elapsed time.
func (*Event) WithHeadings ¶
WithHeadings sets the headings.
func (*Event) WithIsEnabled ¶
WithIsEnabled sets if the event is enabled
func (*Event) WithIsWritable ¶
WithIsWritable sets if the event is writable.
func (*Event) WithTaskName ¶
WithTaskName sets the task name.
func (*Event) WithTimestamp ¶
WithTimestamp sets the message timestamp.
type EventShouldWriteOutputProvider ¶
type EventShouldWriteOutputProvider interface {
ShouldWriteOutput() bool
}
EventShouldWriteOutputProvider is a type that enables or disables logger output for events.
type EventTriggerListenersProvider ¶
type EventTriggerListenersProvider interface {
ShouldTriggerListeners() bool
}
EventTriggerListenersProvider is a type that enables or disables logger listeners.
type ImmediateSchedule ¶
ImmediateSchedule fires immediately with an optional subsequent schedule..
func Immediately ¶
func Immediately() *ImmediateSchedule
Immediately Returns a schedule that casues a job to run immediately on start, with an optional subsequent schedule.
func (*ImmediateSchedule) GetNextRunTime ¶
func (i *ImmediateSchedule) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime implements Schedule.
func (*ImmediateSchedule) Then ¶
func (i *ImmediateSchedule) Then(then Schedule) Schedule
Then allows you to specify a subsequent schedule after the first run.
type IntervalSchedule ¶
IntervalSchedule is as chedule that fires every given interval with an optional start delay.
func (IntervalSchedule) GetNextRunTime ¶
func (i IntervalSchedule) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime implements Schedule.
type JobFactory ¶
type JobFactory struct {
// contains filtered or unexported fields
}
JobFactory allows for job creation w/o a fully formed struct.
func (*JobFactory) Action ¶
func (jf *JobFactory) Action() TaskAction
Action returns the job action.
func (*JobFactory) Execute ¶
func (jf *JobFactory) Execute(ctx context.Context) error
Execute runs the job action if it's set.
func (*JobFactory) IsEnabled ¶
func (jf *JobFactory) IsEnabled() bool
IsEnabled returns if the job is enabled.
func (*JobFactory) Schedule ¶
func (jf *JobFactory) Schedule() Schedule
Schedule returns the job schedule.
func (*JobFactory) ShowMessages ¶
func (jf *JobFactory) ShowMessages() bool
ShowMessages returns if the job should trigger logging events.
func (*JobFactory) Timeout ¶
func (jf *JobFactory) Timeout() time.Duration
Timeout returns the job timeout.
func (*JobFactory) WithAction ¶
func (jf *JobFactory) WithAction(action TaskAction) *JobFactory
WithAction sets the job action.
func (*JobFactory) WithIsEnabledProvider ¶
func (jf *JobFactory) WithIsEnabledProvider(provider func() bool) *JobFactory
WithIsEnabledProvider sets the enabled provider for the job.
func (*JobFactory) WithName ¶
func (jf *JobFactory) WithName(name string) *JobFactory
WithName sets the job name.
func (*JobFactory) WithSchedule ¶
func (jf *JobFactory) WithSchedule(schedule Schedule) *JobFactory
WithSchedule sets the schedule for the job.
func (*JobFactory) WithShowMessagesProvider ¶
func (jf *JobFactory) WithShowMessagesProvider(provider func() bool) *JobFactory
WithShowMessagesProvider sets the enabled provider for the job.
func (*JobFactory) WithTimeout ¶
func (jf *JobFactory) WithTimeout(timeout time.Duration) *JobFactory
WithTimeout sets the timeout.
type JobManager ¶
JobManager is the main orchestration and job management object.
func Default ¶
func Default() *JobManager
Default returns a shared instance of a JobManager. If unset, it will initialize it with `New()`.
func MustNewFromEnv ¶
func MustNewFromEnv() *JobManager
MustNewFromEnv returns a new job manager from the environment.
func NewFromConfig ¶
func NewFromConfig(cfg *Config) *JobManager
NewFromConfig returns a new job manager from a given config.
func NewFromEnv ¶
func NewFromEnv() (*JobManager, error)
NewFromEnv returns a new job manager from the environment.
func (*JobManager) CancelTask ¶
func (jm *JobManager) CancelTask(taskName string) (err error)
CancelTask cancels (sends the cancellation signal) to a running task.
func (*JobManager) DisableJob ¶
func (jm *JobManager) DisableJob(jobName string) error
DisableJob stops a job from running but does not unload it.
func (*JobManager) DisableJobs ¶
func (jm *JobManager) DisableJobs(jobNames ...string) error
DisableJobs disables a variadic list of job names.
func (*JobManager) EnableJob ¶
func (jm *JobManager) EnableJob(jobName string) error
EnableJob enables a job that has been disabled.
func (*JobManager) EnableJobs ¶
func (jm *JobManager) EnableJobs(jobNames ...string) error
EnableJobs enables a variadic list of job names.
func (*JobManager) HasJob ¶
func (jm *JobManager) HasJob(jobName string) (hasJob bool)
HasJob returns if a jobName is loaded or not.
func (*JobManager) HeartbeatInterval ¶
func (jm *JobManager) HeartbeatInterval() time.Duration
HeartbeatInterval returns the current heartbeat interval.
func (*JobManager) IsDisabled ¶
func (jm *JobManager) IsDisabled(jobName string) (value bool)
IsDisabled returns if a job is disabled.
func (*JobManager) IsRunning ¶
func (jm *JobManager) IsRunning(taskName string) (isRunning bool)
IsRunning returns if a task is currently running.
func (*JobManager) Job ¶
func (jm *JobManager) Job(jobName string) (job Job)
Job returns a job instance by name.
func (*JobManager) LoadJobs ¶
func (jm *JobManager) LoadJobs(jobs ...Job) error
LoadJobs loads a variadic list of jobs.
func (*JobManager) Logger ¶
func (jm *JobManager) Logger() *logger.Logger
Logger returns the diagnostics agent.
func (*JobManager) ReadAllJobs ¶
func (jm *JobManager) ReadAllJobs(action func(jobs map[string]*JobMeta))
ReadAllJobs allows the consumer to do something with the full job list, using a read lock.
func (*JobManager) RunAllJobs ¶
func (jm *JobManager) RunAllJobs() (err error)
RunAllJobs runs every job that has been loaded in the JobManager at once.
func (*JobManager) RunJob ¶
func (jm *JobManager) RunJob(jobName string) error
RunJob runs a job by jobName on demand.
func (*JobManager) RunJobs ¶
func (jm *JobManager) RunJobs(jobNames ...string) (err error)
RunJobs runs a variadic list of job names.
func (*JobManager) Start ¶
func (jm *JobManager) Start()
Start begins the schedule runner for a JobManager.
func (*JobManager) Stop ¶
func (jm *JobManager) Stop()
Stop stops the schedule runner for a JobManager.
func (*JobManager) Tracer ¶
func (jm *JobManager) Tracer() Tracer
Tracer returns the manager's tracer.
func (*JobManager) WithDefaultHeartbeat ¶
func (jm *JobManager) WithDefaultHeartbeat() *JobManager
WithDefaultHeartbeat sets the heartbeat interval to the default interval and returns the job manager.
func (*JobManager) WithHeartbeatInterval ¶
func (jm *JobManager) WithHeartbeatInterval(interval time.Duration) *JobManager
WithHeartbeatInterval sets the heartbeat interval explicitly and returns the job manager.
func (*JobManager) WithHighPrecisionHeartbeat ¶
func (jm *JobManager) WithHighPrecisionHeartbeat() *JobManager
WithHighPrecisionHeartbeat sets the heartbeat interval to the high precision interval and returns the job manager.
func (*JobManager) WithLogger ¶
func (jm *JobManager) WithLogger(log *logger.Logger) *JobManager
WithLogger sets the logger and returns a reference to the job manager.
func (*JobManager) WithTracer ¶
func (jm *JobManager) WithTracer(tracer Tracer) *JobManager
WithTracer sets the manager's tracer.
type JobMeta ¶
type JobMeta struct { Name string `json:"name"` Job Job `json:"-"` Disabled bool `json:"disabled"` Schedule Schedule `json:"-"` EnabledProvider func() bool `json:"-"` NextRunTime time.Time `json:"nextRunTime"` LastRunTime time.Time `json:"lastRunTime"` }
JobMeta is runtime metadata for a job.
type OnCancellationReceiver ¶
type OnCancellationReceiver interface {
OnCancellation()
}
OnCancellationReceiver is an interface that allows a task to be signaled when it has been canceled.
type OnCompleteReceiver ¶
type OnCompleteReceiver interface {
OnComplete(err error)
}
OnCompleteReceiver is an interface that allows a task to be signaled when it has been completed.
type OnDemandSchedule ¶
type OnDemandSchedule struct{}
OnDemandSchedule is a schedule that runs on demand.
func (OnDemandSchedule) GetNextRunTime ¶
func (ods OnDemandSchedule) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime gets the next run time.
type OnStartReceiver ¶
type OnStartReceiver interface {
OnStart()
}
OnStartReceiver is an interface that allows a task to be signaled when it has started.
type OnTheHour ¶
type OnTheHour struct{}
OnTheHour is a schedule that fires every hour on the 00th minute.
type OnTheHourAt ¶
type OnTheHourAt struct {
Minute int
}
OnTheHourAt is a schedule that fires every hour on the given minute.
func (OnTheHourAt) GetNextRunTime ¶
func (o OnTheHourAt) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime implements the chronometer Schedule api.
type OnTheQuarterHour ¶
type OnTheQuarterHour struct{}
OnTheQuarterHour is a schedule that fires every 15 minutes, on the quarter hours.
func (OnTheQuarterHour) GetNextRunTime ¶
func (o OnTheQuarterHour) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime implements the chronometer Schedule api.
type OnceAtSchedule ¶
OnceAtSchedule is a schedule.
func (OnceAtSchedule) GetNextRunTime ¶
func (oa OnceAtSchedule) GetNextRunTime(after *time.Time) *time.Time
GetNextRunTime returns the next runtime.
type ResumeProvider ¶
type ResumeProvider interface { State() interface{} Resume(state interface{}) error }
ResumeProvider is an interface that allows a task to be resumed.
type Schedule ¶
type Schedule interface { // GetNextRuntime should return the next runtime after a given previous runtime. If `after` is <nil> it should be assumed // the job hasn't run yet. If <nil> is returned by the schedule it is inferred that the job should not run again. GetNextRunTime(*time.Time) *time.Time }
Schedule is a type that provides a next runtime after a given previous runtime.
func EveryHourAt ¶
EveryHourAt returns a schedule that fires every hour at a given minute.
func EveryHourOnTheHour ¶
func EveryHourOnTheHour() Schedule
EveryHourOnTheHour returns a schedule that fires every 60 minutes on the 00th minute.
func EveryMinute ¶
func EveryMinute() Schedule
EveryMinute returns a schedule that fires every minute.
func EveryQuarterHour ¶
func EveryQuarterHour() Schedule
EveryQuarterHour returns a schedule that fires every 15 minutes, on the quarter hours (0, 15, 30, 45)
func EverySecond ¶
func EverySecond() Schedule
EverySecond returns a schedule that fires every second.
func OnDemand ¶
func OnDemand() Schedule
OnDemand returns an on demand schedule, or a schedule that only allows the job to be run explicitly by calling `RunJob` on the `JobManager`.
func WeekdaysAt ¶
WeekdaysAt returns a schedule that fires every week day at the given hour, minut and second.
func WeekendsAt ¶
WeekendsAt returns a schedule that fires every weekend day at the given hour, minut and second.
type SerialProvider ¶
type SerialProvider interface {
Serial()
}
SerialProvider is an optional interface that prohibits a task from running multiple times in parallel.
type StatusProvider ¶
type StatusProvider interface {
Status() string
}
StatusProvider is an interface that allows a task to report its status.
type Task ¶
Task is an interface that structs can satisfy to allow them to be run as tasks.
func NewSerialTask ¶
func NewSerialTask(action TaskAction) Task
NewSerialTask creates a task that run only serially, provided an action and a policy
func NewSerialTaskWithName ¶
func NewSerialTaskWithName(name string, action TaskAction) Task
NewSerialTaskWithName creates a task that can only be run serially given an action, name, and policy
func NewTask ¶
func NewTask(action TaskAction) Task
NewTask returns a new task wrapper for a given TaskAction.
func NewTaskWithName ¶
func NewTaskWithName(name string, action TaskAction) Task
NewTaskWithName returns a new task wrapper with a given name for a given TaskAction.
type TaskAction ¶
TaskAction is an function that can be run as a task
type TaskMeta ¶
type TaskMeta struct { Name string `json:"name"` Task Task `json:"-"` StartTime time.Time `json:"startTime"` Timeout time.Time `json:"timeout"` Context context.Context `json:"-"` Cancel context.CancelFunc `json:"-"` }
TaskMeta is metadata for a running task.
type TaskStatus ¶
type TaskStatus struct { Name string `json:"name"` State State `json:"state"` Status string `json:"status,omitempty"` LastRunTime string `json:"last_run_time,omitempty"` NextRunTime string `json:"next_run_time,omitempy"` RunningFor string `json:"running_for,omitempty"` Serial bool `json:"serial_execution,omitempty"` }
TaskStatus is the basic format of a status of a task.
type TimeoutProvider ¶
TimeoutProvider is an interface that allows a task to be timed out.
type TraceFinisher ¶
TraceFinisher is a finisher for traces.