scheduler

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: MIT Imports: 12 Imported by: 0

README

Go scheduler

hourglass

scheduler represents sequence(s) of planned events or schedules dispatched on the scheduler's notification channel C.
When a Schedule's time expires, the schedule is sent as an Entry on C.

Sample Usage
	// initialize a scheduler
	sched := scheduler.New()
	_ = sched.Start()
	
	// start a receiver 
	go func(sched *scheduler.Scheduler) {
		for {
			select {
			
			case entry, ok := <-sched.C():
				if !ok {
					return
				}
				s := entry.S()
				// do something with the schedule
				// potentially in a goroutine
				// go func(s scheduler.Schedule) { s.Fn()() }(s)
				s.Fn()()
				
			// other select cases	
			case ...	
			}
		}
	}(sched)

	// stop the scheduler when done	
	_ = sched.Stop()

The scheduler can be started either empty (using function Start() as above) or with known schedules using function Run(schedules []*Schedule).

Once the scheduler is running, schedules can be added or removed. The scheduler can also return a dump of the current schedules.

Options

The default scheduler (using New) is initialized with a buffered channel of size 1 and configured such that if the receiver is slow handling schedules and the buffer is full, the scheduler waits at most 20 millis before giving up.

These options are configurable through Options passed to function NewScheduler.
Below an Options example with a zero channel size and skipping schedules after 1 millisecond when the channel is full:

	return &Options{
		ChannelSize: 0,
		OnChannelFull: OnChannelFull{
		    MaxWait: time.Millisecond,
		},
	}

For cases of processes where the monotonic clock may stop, but the system clock continues, eg. when a VM is paused or a laptop hibernated, options may include a ClockHealth attribute. When enabled the clock skew is checked every PollPeriod and when the clock skew exceeds MaxClockSkew, the scheduler recomputes the delay to fire the next reschedule.

                ...
	        ClockHealth: ClockHealth{
		        Enabled:      true,
		        MaxClockSkew: time.Second * 3,
		        PollPeriod:   time.Second,
	        },
Schedules

The scheduler has helper functions to add simple schedules:

  • firing at a given date: Scheduler.At(at utc.UTC, o interface{}) ScheduleID
  • firing after a given duration: Scheduler.In(d time.Duration, o interface{}) ScheduleID

More sophisticated schedules are added via function Scheduler.Add(sc *Schedule) bool.

Building schedules

A Schedule is built via function NewSchedule(id string, o interface{}, opts ...ScheduleOpt) (*Schedule, error)

Occur

scheduler.Occur are options to build a one time schedule or the initial date of a recurrent schedule:

  • At( date ) specifies the date when to fire the schedule.
  • In( duration ) specifies a duration after which the schedule must be fired. The exact date is computed when the scheduler receives the schedule.
Recurrent schedules

There are two manners to make a recurrent schedule:

  • call function Reschedule(t utc.UTC) on the received schedule in the receiver code, or
  • use a Recur option when building the schedule. Note that for a schedule built using a Recur option, calling Reschedule has no effect.

Using Reschedule(t utc.UTC) might be preferred when a function is attached to the schedule and the goal is to plan the next execution after the attached function has run.
By contrast, when using a Recur option, the next firing date is computed immediately after the schedule was sent to the notification channel.

scheduler.Recur options to build recurring schedules:

  • Next(n Nexter) uses a Nexter object to compute the next firing date.
  • Every(d time.Duration) makes the schedule fire every given duration.
  • NextTime(fn func(t time.Time) time.Time) computes the next time the schedule must fire
  • Cron(schedule cron.Schedule) uses a cron/v3 schedule object.

Note that a Nexter can also be built around cronexpr

The first date of a recurring schedule is computed via the recurring option when the scheduler receives the schedule unless an initial date was given via an Occur option.

note: when a schedule is recurrent, the scheduler dispatches it to the channel, then computes the next occurrence and plans it. However, if the receiver is slow in handling schedules and the schedule is still in the buffer of the channel, the schedule in not dispatched and the next occurrence is computed and planned. This ensures that slow receivers won't lead to overflowing the channel with unhandled schedules.

Limiting recurrent schedules

scheduler.Until options provide conditions to tell the scheduler to stop rescheduling schedules built via a Recur option:

  • Count(count int) limits the number of times the schedule is fired.
  • Date(u utc.UTC) specifies a maximum date for the schedule.
  • Elapsed(d time.Duration) is the maximum duration of the schedule after first dispatching.

If both Date and Elapsed are specified, the earliest resulting date is used.

Other projects in the same area

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	Occur = occur{} // Occur has functions to give an initial date to a Schedule
	Recur = recur{} // Recur has functions to configure a recurrent Schedule
	Until = until{} // Until has functions to configure termination of a recurrent schedule
)

Functions

This section is empty.

Types

type ClockHealth

type ClockHealth struct {
	Enabled      bool
	MaxClockSkew time.Duration
	PollPeriod   time.Duration
}

ClockHealth allows checking the system clock is in sync with the internal monotonic clock, for example in cases where the user puts the system to sleep.

type Details

type Details struct {
	ScheduledAt      utc.UTC `json:"scheduled_at"`                // time the Schedule was scheduled
	DispatchedAt     utc.UTC `json:"dispatched_at"`               // when the schedule was sent to the notification channel
	DispatchedCount  int     `json:"dispatched_count,omitempty"`  // how many times the schedule was dispatched
	RescheduledCount int     `json:"rescheduled_count,omitempty"` // how many times the schedule was re-scheduled
}

Details of Schedule are available for logging or troubleshooting

type Entry

type Entry interface {
	S() *Schedule
}

type Logger

type Logger = *elog.Log

type Nexter

type Nexter interface {
	// Next returns when next notification after now must occur or utc.Zero
	Next(now utc.UTC, s *Schedule) utc.UTC
}

Nexter is the interface of objects returning the Next UTC date of a Schedule

type NexterFn

type NexterFn func(now utc.UTC, s *Schedule) utc.UTC

NexterFn is a function implementing Nexter

func (NexterFn) Next

func (f NexterFn) Next(now utc.UTC, s *Schedule) utc.UTC

type NexterTime

type NexterTime interface {
	// Next returns when next notification after now must occur or the zero time
	Next(now time.Time, s *Schedule) time.Time
}

NexterTime is the interface of objects returning the Next date of a Schedule

type NexterTimeFn

type NexterTimeFn func(now time.Time, s *Schedule) time.Time

NexterTimeFn is a function implementing NexterTime

func (NexterTimeFn) Next

func (f NexterTimeFn) Next(now time.Time, s *Schedule) time.Time

type OnChannelFull

type OnChannelFull struct {
	MaxWait time.Duration // max duration to wait for dispatching a schedule
}

OnChannelFull options tell the scheduler what to do when the dispatching channel is full

type Options

type Options struct {
	ChannelSize   uint          // size of the notification channel
	Logger        Logger        // internal logger
	OnChannelFull OnChannelFull // what to do when the channel is full
	ClockHealth   ClockHealth   // check the system clock is in sync with the monotonic clock
}

Options are options for the Scheduler

func NewOptions

func NewOptions() *Options

NewOptions returns default options for the Scheduler.

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

Schedule is a planned event.

func MustSchedule

func MustSchedule(id string, o interface{}, opts ...ScheduleOpt) *Schedule

MustSchedule returns a new initialized Schedule or panics if an error occurs.

func NewSchedule

func NewSchedule(id string, o interface{}, opts ...ScheduleOpt) (*Schedule, error)

NewSchedule returns a new initialized Schedule or an error if incompatible options are used. The provided options must set the next scheduled time (e.g. through an Occur or a Recur option)

func (*Schedule) Details

func (s *Schedule) Details() Details

Details returns the 'runtime' Details of the schedule

func (*Schedule) Fn

func (s *Schedule) Fn() func()

Fn returns the object attached to this Schedule as a func() or nil if not such a function.

func (*Schedule) FnE

func (s *Schedule) FnE() func() error

FnE returns the object attached to this Schedule as a func() error or nil if not such a function.

func (*Schedule) ID

func (s *Schedule) ID() ScheduleID

ID returns the id of the schedule

func (*Schedule) Object

func (s *Schedule) Object() interface{}

Object returns the interface object attached to the Schedule

func (*Schedule) RescheduleAt

func (s *Schedule) RescheduleAt(t utc.UTC, o ...interface{}) bool

RescheduleAt reschedules the schedule at the given date. The function returns false if the schedule was built via a Recur option or if the schedule was not dispatched by the scheduler. Returns true if the schedule was sent to the scheduler.

func (*Schedule) RescheduleIn

func (s *Schedule) RescheduleIn(d time.Duration, o ...interface{}) bool

RescheduleIn reschedules the schedule at a date in d after now. The date is computed immediately. The function returns false if the schedule was built via a Recur option or if the schedule was not dispatched by the scheduler. Returns true if the schedule was sent to the scheduler.

func (*Schedule) S

func (s *Schedule) S() *Schedule

func (*Schedule) Str

func (s *Schedule) Str() string

Str returns the object attached to this Schedule as a string or the empty string if the object is not a string.

func (*Schedule) String

func (s *Schedule) String() string

func (*Schedule) Time

func (s *Schedule) Time() utc.UTC

Time returns the utc time at which the schedule was planned

type ScheduleID

type ScheduleID string

ScheduleID is the ID of a Schedule

type ScheduleOpt

type ScheduleOpt func(s *Schedule) error

ScheduleOpt is a configuration function of Schedule. The predefined variables Occur, Recur and Until provide such functions.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler represents a sequence of planned events which are notified through channel C().

func New

func New() *Scheduler

New returns a new Scheduler initialized with default options.

Example
sched := scheduler.New()
_ = sched.Start()

wg := sync.WaitGroup{}
wg.Add(1)
go func(sched *scheduler.Scheduler) {
	defer wg.Done()
	//for s := range sched.C() {
	//	s.Fn()()
	//}
	for {
		select {
		case entry, ok := <-sched.C():
			if !ok {
				return
			}
			s := entry.S()
			// do something with the schedule
			// potentially in a goroutine
			// go func(s scheduler.Schedule) { s.Fn()() }(s)
			s.Fn()()
		}
	}
}(sched)

in := time.Millisecond * 200
sched.At(utc.Now().Add(in), func() { fmt.Println("fired") })
time.Sleep(time.Millisecond * 205)

_ = sched.Stop()
wg.Wait()
Output:

fired

func NewScheduler

func NewScheduler(opts *Options) *Scheduler

NewScheduler returns a new Scheduler initialized with the given options. Default options are used if opts is nil.

func (*Scheduler) Add

func (s *Scheduler) Add(sc *Schedule) bool

Add adds the given schedule to the scheduler. Returns false if the scheduler is not running.

func (*Scheduler) At

func (s *Scheduler) At(at utc.UTC, o interface{}) ScheduleID

At adds a schedule that will run at the given date. Returns false if the scheduler is not running.

func (*Scheduler) C

func (s *Scheduler) C() chan Entry

C provides a channel of notifications for planned events

func (*Scheduler) Dump

func (s *Scheduler) Dump() []*Schedule

Dump returns a dump of the current schedules

func (*Scheduler) DumpStop

func (s *Scheduler) DumpStop() ([]*Schedule, error)

DumpStop stops the scheduler and returns a dump of current schedules

func (*Scheduler) In

func (s *Scheduler) In(d time.Duration, o interface{}) ScheduleID

In adds a schedule that will run after the given duration. Returns false if the scheduler is not running.

func (*Scheduler) Outlet

func (s *Scheduler) Outlet() []*Schedule

Outlet returns the schedules that could not be dispatched because the channel was full

func (*Scheduler) Remove

func (s *Scheduler) Remove(id ScheduleID) bool

Remove removes schedules(s) with the given ScheduleID Returns false if the scheduler is not running.

func (*Scheduler) Run

func (s *Scheduler) Run(schedules []*Schedule) error

Run starts the scheduler with the given initial Schedule instances

func (*Scheduler) Running

func (s *Scheduler) Running() bool

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start starts the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop stops the scheduler

type Schedules

type Schedules []*Schedule

Schedules is a slice of *Schedule sortable by their next utc time.

func (Schedules) Len

func (s Schedules) Len() int

func (Schedules) Less

func (s Schedules) Less(i, j int) bool

func (Schedules) Swap

func (s Schedules) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL