scheduler

package
v2.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2023 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewScheduler

func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error)

NewScheduler gives us a new TreeScheduler and SchedulerMetrics when given an Executor, a SchedulableService, and zero or more options. Schedulers should be initialized with this function.

func ValidateSchedule

func ValidateSchedule(c string) error

ValidSchedule returns an error if the cron string is invalid.

func WithMaxConcurrentWorkers

func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc

WithMaxConcurrentWorkers is an option that sets the max number of concurrent workers that a TreeScheduler will use.

func WithOnErrorFn

func WithOnErrorFn(fn ErrorFunc) treeSchedulerOptFunc

WithOnErrorFn is an option that sets the error function that gets called when there is an error in a TreeScheduler. its useful for injecting logging or special error handling.

func WithTime

func WithTime(t clock.Clock) treeSchedulerOptFunc

WithTime is an option for NewScheduler that allows you to inject a clock.Clock from ben johnson's github.com/benbjohnson/clock library, for testing purposes.

Types

type ErrUnrecoverable

type ErrUnrecoverable struct {
	// contains filtered or unexported fields
}

func (*ErrUnrecoverable) Error

func (e *ErrUnrecoverable) Error() string

func (*ErrUnrecoverable) Unwrap

func (e *ErrUnrecoverable) Unwrap() error

type ErrorFunc

type ErrorFunc func(ctx context.Context, taskID ID, scheduledFor time.Time, err error)

ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler.

type Executor

type Executor interface {
	// Execute is used to execute run's for any schedulable object.
	// the executor can go through manual runs, clean currently running, and then create a new run based on `now`.
	// if Now is zero we can just do the first 2 steps (This is how we would trigger manual runs).
	// Errors returned from the execute request imply that this attempt has failed and
	// should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors
	// so the time can be configurable.
	Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error
}

Executor is a system used by the scheduler to actually execute the scheduleable item.

type ID

type ID uint64

ID duplicates the influxdb ID so users of the scheduler don't have to import influxdb for the ID.

type Item

type Item struct {
	Offset int64
	// contains filtered or unexported fields
}

Item is a task in the scheduler.

func (Item) Less

func (it Item) Less(bItem btree.Item) bool

Less tells us if one Item is less than another

func (Item) Next

func (it Item) Next() time.Time

func (Item) When

func (it Item) When() time.Time

type NoopScheduler

type NoopScheduler struct{}

NoopScheduler is a no-op scheduler. It is used when we don't want the standard scheduler to run (e.g. when "--no-tasks" flag is present).

func (*NoopScheduler) Release

func (n *NoopScheduler) Release(taskID ID) error

Release is a mocked Scheduler.Release method.

func (*NoopScheduler) Schedule

func (n *NoopScheduler) Schedule(task Schedulable) error

Schedule is a mocked Scheduler.Schedule method.

func (*NoopScheduler) Stop

func (n *NoopScheduler) Stop()

Stop is a mocked stop method.

type Schedulable

type Schedulable interface {
	// ID is the unique identifier for this Schedulable
	ID() ID

	// Schedule defines the frequency for which this Schedulable should be
	// queued for execution.
	Schedule() Schedule

	// Offset defines a negative or positive duration that should be added
	// to the scheduled time, resulting in the instance running earlier or later
	// than the scheduled time.
	Offset() time.Duration

	// LastScheduled specifies last time this Schedulable was queued
	// for execution.
	LastScheduled() time.Time
}

Schedulable is the interface that encapsulates work that is to be executed on a specified schedule.

type SchedulableService

type SchedulableService interface {

	// UpdateLastScheduled notifies the instance that it was scheduled for
	// execution at the specified time
	UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error
}

SchedulableService encapsulates the work necessary to schedule a job

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

Schedule is an object a valid schedule of runs

func NewSchedule

func NewSchedule(unparsed string, lastScheduledAt time.Time) (Schedule, time.Time, error)

func (Schedule) Next

func (s Schedule) Next(from time.Time) (time.Time, error)

Next returns the next time after from that a schedule should trigger on.

type Scheduler

type Scheduler interface {

	// Schedule adds the specified task to the scheduler.
	Schedule(task Schedulable) error

	// Release removes the specified task from the scheduler.
	Release(taskID ID) error
}

Scheduler is a example interface of a Scheduler. // todo(lh): remove this once we start building the actual scheduler

type SchedulerMetrics

type SchedulerMetrics struct {
	// contains filtered or unexported fields
}

func NewSchedulerMetrics

func NewSchedulerMetrics(te *TreeScheduler) *SchedulerMetrics

func (*SchedulerMetrics) PrometheusCollectors

func (em *SchedulerMetrics) PrometheusCollectors() []prometheus.Collector

PrometheusCollectors satisfies the prom.PrometheusCollector interface.

type TreeScheduler

type TreeScheduler struct {
	// contains filtered or unexported fields
}

TreeScheduler is a Scheduler based on a btree. It calls Executor in-order per ID. That means you are guaranteed that for a specific ID,

- The scheduler should, after creation, automatically call ExecutorFunc, when a task should run as defined by its Schedulable.

- the scheduler's should not be able to get into a state where blocks Release and Schedule indefinitely.

- Schedule should add a Schedulable to being scheduled, and Release should remove a task from being scheduled.

- Calling of ExecutorFunc should be serial in time on a per taskID basis. I.E.: the run at 12:00 will go before the run at 12:01.

Design:

The core of the scheduler is a btree keyed by time, a nonce, and a task ID, and a map keyed by task ID and containing a nonce and a time (called a uniqueness index from now on). The map is to ensure task uniqueness in the tree, so we can replace or delete tasks in the tree.

Scheduling in the tree consists of a main loop that feeds a fixed set of workers, each with their own communication channel. Distribution is handled by hashing the TaskID (to ensure uniform distribution) and then distributing over those channels evenly based on the hashed ID. This is to ensure that all tasks of the same ID go to the same worker.

The workers call ExecutorFunc handle any errors and update the LastScheduled time internally and also via the Checkpointer.

The main loop:

The main loop waits on a time.Timer to grab the task with the minimum time. Once it successfully grabs a task ready to trigger, it will start walking the btree from the item nearest

Putting a task into the scheduler:

Adding a task to the scheduler acquires a write lock, grabs the task from the uniqueness map, and replaces the item in the uniqueness index and btree. If new task would trigger sooner than the current soonest triggering task, it replaces the Timer when added to the scheduler. Finally it releases the write lock.

Removing a task from the scheduler:

Removing a task from the scheduler acquires a write lock, deletes the task from the uniqueness index and from the btree, then releases the lock. We do not have to readjust the time on delete, because, if the minimum task isn't ready yet, the main loop just resets the timer and keeps going.

func (*TreeScheduler) Release

func (s *TreeScheduler) Release(taskID ID) error

Release releases a task. Release also cancels the running task. Task deletion would be faster if the tree supported deleting ranges.

func (*TreeScheduler) Schedule

func (s *TreeScheduler) Schedule(sch Schedulable) error

Schedule put puts a Schedulable on the TreeScheduler.

func (*TreeScheduler) Stop

func (s *TreeScheduler) Stop()

func (*TreeScheduler) When

func (s *TreeScheduler) When() time.Time

When gives us the next time the scheduler will run a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL