Documentation ¶
Index ¶
- func NewScheduler(executor Executor, checkpointer SchedulableService, ...) (*TreeScheduler, *SchedulerMetrics, error)
- func ValidateSchedule(c string) error
- func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc
- func WithOnErrorFn(fn ErrorFunc) treeSchedulerOptFunc
- func WithTime(t clock.Clock) treeSchedulerOptFunc
- type ErrUnrecoverable
- type ErrorFunc
- type Executor
- type ID
- type Item
- type NoopScheduler
- type Schedulable
- type SchedulableService
- type Schedule
- type Scheduler
- type SchedulerMetrics
- type TreeScheduler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewScheduler ¶
func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error)
NewScheduler gives us a new TreeScheduler and SchedulerMetrics when given an Executor, a SchedulableService, and zero or more options. Schedulers should be initialized with this function.
func ValidateSchedule ¶
ValidSchedule returns an error if the cron string is invalid.
func WithMaxConcurrentWorkers ¶
func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc
WithMaxConcurrentWorkers is an option that sets the max number of concurrent workers that a TreeScheduler will use.
func WithOnErrorFn ¶
func WithOnErrorFn(fn ErrorFunc) treeSchedulerOptFunc
WithOnErrorFn is an option that sets the error function that gets called when there is an error in a TreeScheduler. its useful for injecting logging or special error handling.
Types ¶
type ErrUnrecoverable ¶
type ErrUnrecoverable struct {
// contains filtered or unexported fields
}
func (*ErrUnrecoverable) Error ¶
func (e *ErrUnrecoverable) Error() string
func (*ErrUnrecoverable) Unwrap ¶
func (e *ErrUnrecoverable) Unwrap() error
type ErrorFunc ¶
ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler.
type Executor ¶
type Executor interface { // Execute is used to execute run's for any schedulable object. // the executor can go through manual runs, clean currently running, and then create a new run based on `now`. // if Now is zero we can just do the first 2 steps (This is how we would trigger manual runs). // Errors returned from the execute request imply that this attempt has failed and // should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors // so the time can be configurable. Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error }
Executor is a system used by the scheduler to actually execute the scheduleable item.
type ID ¶
type ID uint64
ID duplicates the influxdb ID so users of the scheduler don't have to import influxdb for the ID.
type Item ¶
Item is a task in the scheduler.
type NoopScheduler ¶
type NoopScheduler struct{}
NoopScheduler is a no-op scheduler. It is used when we don't want the standard scheduler to run (e.g. when "--no-tasks" flag is present).
func (*NoopScheduler) Release ¶
func (n *NoopScheduler) Release(taskID ID) error
Release is a mocked Scheduler.Release method.
func (*NoopScheduler) Schedule ¶
func (n *NoopScheduler) Schedule(task Schedulable) error
Schedule is a mocked Scheduler.Schedule method.
type Schedulable ¶
type Schedulable interface { // ID is the unique identifier for this Schedulable ID() ID // Schedule defines the frequency for which this Schedulable should be // queued for execution. Schedule() Schedule // Offset defines a negative or positive duration that should be added // to the scheduled time, resulting in the instance running earlier or later // than the scheduled time. Offset() time.Duration // LastScheduled specifies last time this Schedulable was queued // for execution. LastScheduled() time.Time }
Schedulable is the interface that encapsulates work that is to be executed on a specified schedule.
type SchedulableService ¶
type SchedulableService interface { // UpdateLastScheduled notifies the instance that it was scheduled for // execution at the specified time UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error }
SchedulableService encapsulates the work necessary to schedule a job
type Schedule ¶
type Schedule struct {
// contains filtered or unexported fields
}
Schedule is an object a valid schedule of runs
func NewSchedule ¶
type Scheduler ¶
type Scheduler interface { // Schedule adds the specified task to the scheduler. Schedule(task Schedulable) error // Release removes the specified task from the scheduler. Release(taskID ID) error }
Scheduler is a example interface of a Scheduler. // todo(lh): remove this once we start building the actual scheduler
type SchedulerMetrics ¶
type SchedulerMetrics struct {
// contains filtered or unexported fields
}
func NewSchedulerMetrics ¶
func NewSchedulerMetrics(te *TreeScheduler) *SchedulerMetrics
func (*SchedulerMetrics) PrometheusCollectors ¶
func (em *SchedulerMetrics) PrometheusCollectors() []prometheus.Collector
PrometheusCollectors satisfies the prom.PrometheusCollector interface.
type TreeScheduler ¶
type TreeScheduler struct {
// contains filtered or unexported fields
}
TreeScheduler is a Scheduler based on a btree. It calls Executor in-order per ID. That means you are guaranteed that for a specific ID,
- The scheduler should, after creation, automatically call ExecutorFunc, when a task should run as defined by its Schedulable.
- the scheduler's should not be able to get into a state where blocks Release and Schedule indefinitely.
- Schedule should add a Schedulable to being scheduled, and Release should remove a task from being scheduled.
- Calling of ExecutorFunc should be serial in time on a per taskID basis. I.E.: the run at 12:00 will go before the run at 12:01.
Design:
The core of the scheduler is a btree keyed by time, a nonce, and a task ID, and a map keyed by task ID and containing a nonce and a time (called a uniqueness index from now on). The map is to ensure task uniqueness in the tree, so we can replace or delete tasks in the tree.
Scheduling in the tree consists of a main loop that feeds a fixed set of workers, each with their own communication channel. Distribution is handled by hashing the TaskID (to ensure uniform distribution) and then distributing over those channels evenly based on the hashed ID. This is to ensure that all tasks of the same ID go to the same worker.
The workers call ExecutorFunc handle any errors and update the LastScheduled time internally and also via the Checkpointer.
The main loop:
The main loop waits on a time.Timer to grab the task with the minimum time. Once it successfully grabs a task ready to trigger, it will start walking the btree from the item nearest
Putting a task into the scheduler:
Adding a task to the scheduler acquires a write lock, grabs the task from the uniqueness map, and replaces the item in the uniqueness index and btree. If new task would trigger sooner than the current soonest triggering task, it replaces the Timer when added to the scheduler. Finally it releases the write lock.
Removing a task from the scheduler:
Removing a task from the scheduler acquires a write lock, deletes the task from the uniqueness index and from the btree, then releases the lock. We do not have to readjust the time on delete, because, if the minimum task isn't ready yet, the main loop just resets the timer and keeps going.
func (*TreeScheduler) Release ¶
func (s *TreeScheduler) Release(taskID ID) error
Release releases a task. Release also cancels the running task. Task deletion would be faster if the tree supported deleting ranges.
func (*TreeScheduler) Schedule ¶
func (s *TreeScheduler) Schedule(sch Schedulable) error
Schedule put puts a Schedulable on the TreeScheduler.
func (*TreeScheduler) Stop ¶
func (s *TreeScheduler) Stop()
func (*TreeScheduler) When ¶
func (s *TreeScheduler) When() time.Time
When gives us the next time the scheduler will run a task.