Documentation ¶
Overview ¶
Package scheduler provides a managed API to Goroutines using Lifecycles.
The most basic type of management is using the Schedulable interface with a Scheduler:
worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) { for { select { case <-ctx.Done(): return case <-time.After(100 * time.Millisecond): fmt.Println("Mmm... pie.") } } }) l := scheduler.NewScheduler(scheduler.OneSchedulable(worker)) sl := l.Start(scheduler.LifecycleStartOptions{}) time.Sleep(1 * time.Second) // Tell the scheduler to start closing. sl.Close() // Wait for all managed routines to finish. <-sl.Done()
Schedulers terminate when all of their children exit.
You can choose from three canned error behaviors for most lifecycles: ErrorBehaviorDrop, ErrorBehaviorCollect, and ErrorBehaviorTerminate. ErrorBehaviorDrop ignores errors, allowing the lifecycle to continue executing normally. ErrorBehaviorCollect stores all errors returned (potentially allowing for unbounded memory growth, so use with discretion) and provides them when the lifecycle completes. ErrorBehaviorTerminate causes the lifecycle to close as soon as it receives an error. You may implement your own error behaviors by conforming to the ErrorBehavior interface.
If you have a few lifecycles that are parameterized differently and you want to manage them together, the Parent lifecycle aggregates them and runs them in parallel.
This package also provides a more sophisticated lifecycle, Segment. A Segment provides a worker pool and a mechanism for dispatching work. Dispatchers implement the Descriptor interface and work items implement the Process interface. The example above could equivalently be written as follows:
proc := scheduler.ProcessFunc(func(ctx context.Context) error { fmt.Println("Mmm... pie.") return nil }) l := scheduler.NewSegment(1, []scheduler.Descriptor{ scheduler.NewIntervalDescriptor(100*time.Millisecond, proc), }) // Start, close, and wait on the lifecycle as before.
Descriptors are particularly useful when asynchronously waiting on events from external APIs for processing.
Index ¶
- Variables
- func CloseWaitContext(ctx context.Context, lc StartedLifecycle) error
- func NewAdhocDescriptor() (*AdhocDescriptor, *AdhocSubmitter)
- func WaitContext(ctx context.Context, lc StartedLifecycle) error
- type AdhocDescriptor
- type AdhocSubmitter
- type Descriptor
- type DescriptorError
- type DescriptorFunc
- type ErrorBehavior
- type ErrorHandler
- type ErrorReporter
- type ImmediateDescriptor
- type IntervalDescriptor
- type Lifecycle
- type LifecycleStartOptions
- type ManySchedulable
- type ManySchedulableSlice
- type PanicError
- type Parent
- type Process
- type ProcessError
- type ProcessFunc
- type RecoveryDescriptor
- type RecoveryDescriptorOption
- type RecoveryDescriptorOptionFunc
- type RecoveryDescriptorOptions
- type RepeatingDescriptor
- type RestartableDescriptor
- type Schedulable
- type SchedulableFunc
- type Scheduler
- type SchedulerEventHandler
- type SchedulerEventHandlerFuncs
- type Segment
- type StartedLifecycle
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultRecoveryDescriptorBackoffFactory = backoff.Build( backoff.ResetAfter( backoff.Build( backoff.Exponential(5*time.Millisecond, 2.0), backoff.NonSliding, ), 30*time.Second, ), backoff.MaxBound(10*time.Second), backoff.FullJitter(), )
DefaultRecoveryDescriptorBackoffFactory is an exponential backoff starting at 5 milliseconds with a factor of 2, a 10 second cap, and full jitter using the default RNG. It automatically resets the backoff after 30 seconds of inactivity.
Functions ¶
func CloseWaitContext ¶
func CloseWaitContext(ctx context.Context, lc StartedLifecycle) error
CloseWaitContext terminates the given lifecycle and then gracefully waits for it to shut down.
func NewAdhocDescriptor ¶
func NewAdhocDescriptor() (*AdhocDescriptor, *AdhocSubmitter)
NewAdhocDescriptor returns a bound pair of adhoc descriptor and submitter. Submitting work items through the returned submitter will enqueue them to the returned descriptor.
func WaitContext ¶
func WaitContext(ctx context.Context, lc StartedLifecycle) error
WaitContext waits until the given lifecycle completes or the context is done. If the context is canceled first, the context error is returned.
Types ¶
type AdhocDescriptor ¶
type AdhocDescriptor struct {
// contains filtered or unexported fields
}
AdhocDescriptor is a descriptor that allows external access to submit work to be scheduled. It is paired with an AdhocSubmitter, which should be provided to external clients to receive the work.
This descriptor is non-blocking; it will indefinitely queue work, consuming a proportional amount of memory per pending process if the scheduler does not have availability. You may want to rate limit submissions.
type AdhocSubmitter ¶
type AdhocSubmitter struct {
// contains filtered or unexported fields
}
AdhocSubmitter is used to submit work to an adhoc descriptor.
Work is always immediately enqueued.
func (*AdhocSubmitter) QueueLen ¶
func (as *AdhocSubmitter) QueueLen() int
QueueLen returns the number of work items in the descriptor's queue. These items have not yet been submitted to the scheduler for processing.
func (*AdhocSubmitter) Submit ¶
func (as *AdhocSubmitter) Submit(p Process) <-chan error
Submit adds a new work item to the descriptor's queue.
type Descriptor ¶
Descriptor provides a way to emit work to a scheduler. Descriptors are provided to segment lifecycles and are monitored by them for completion.
Errors returned by descriptors are reported to a handler defined by the error behavior of the scheduler executing them.
type DescriptorError ¶ added in v0.2.0
type DescriptorError struct { Descriptor Descriptor Cause error }
func (*DescriptorError) Error ¶ added in v0.2.0
func (e *DescriptorError) Error() string
type DescriptorFunc ¶ added in v0.3.0
DescriptorFunc converts an arbitrary function to a descriptor.
type ErrorBehavior ¶
type ErrorBehavior interface { // NewHandler returns a new handler that provides the desired error // management functionality. NewHandler() ErrorHandler }
ErrorBehavior defines the way a lifecycle handles errors from descriptors and processes.
var ( // ErrorBehaviorCollect allows all processes to complete and returns a full // set of errors when they have finished. ErrorBehaviorCollect ErrorBehavior = &errorBehaviorCollect{} // ErrorBehaviorTerminate causes the entire lifecycle to clean up and exit // when the first error occurs. ErrorBehaviorTerminate ErrorBehavior = &errorBehaviorTerminate{} // ErrorBehaviorDrop ignores errors, merely logging them for reference. ErrorBehaviorDrop ErrorBehavior = &errorBehaviorDrop{} )
type ErrorHandler ¶
type ErrorHandler interface { ErrorReporter // Done returns a channel that closes when this error handler cannot accept // further submissions. Done() <-chan struct{} // Errors returns a copy of the errors collected by this error handler. Errs() []error }
ErrorHandler defines a contract that a Scheduler uses to manage errors. When the error handler's Done channel closes, the scheduler begins the process of terminating. The errors returned by the error handler then become the errors returned by the scheduler.
type ErrorReporter ¶
type ErrorReporter interface {
Put(err error)
}
ErrorReporter allows reporter processes to submit errors to a handler created by a particular error behavior.
type ImmediateDescriptor ¶
type ImmediateDescriptor struct {
// contains filtered or unexported fields
}
ImmediateDescriptor schedules a given process exactly once and then terminates.
func NewImmediateDescriptor ¶
func NewImmediateDescriptor(process Process) *ImmediateDescriptor
NewImmediateDescriptor creates an immediately-scheduling descriptor for the given process.
type IntervalDescriptor ¶
type IntervalDescriptor struct {
// contains filtered or unexported fields
}
IntervalDescriptor schedules a given process many times with the same time duration between subsequent runs.
The interval refers specifically to the time between executions; it is relative to the time the prior execution ended, not the time said execution started.
For example, starting from midnight (00:00:00), given an interval of 60 seconds, and a process that takes 20 seconds to execute, the second execution will occur around 00:01:20.
func NewIntervalDescriptor ¶
func NewIntervalDescriptor(interval time.Duration, process Process) *IntervalDescriptor
NewIntervalDescriptor creates a new descriptor that repeats the given process according to the given interval.
type Lifecycle ¶
type Lifecycle interface { // Start starts all the descriptors for the lifecycle and begins handling // the processes for them. Start(opts LifecycleStartOptions) StartedLifecycle }
Lifecycle represents a partially or fully configured scheduler instance. Starting a lifecycle will dispatch the descriptors attached to the given lifecycle.
type LifecycleStartOptions ¶
type LifecycleStartOptions struct { // Capturer is the error capturer for this lifecycle. Capturer trackers.Capturer }
LifecycleStartOptions are the options all lifecycles must handle when starting.
type ManySchedulable ¶
type ManySchedulable interface { // Len returns the number of items of work to schedule. The contract of this // interface requires that this be a pure function. Len() int // Run executes the *i*th schedulable work. Run(ctx context.Context, i int, er ErrorReporter) }
ManySchedulable represents multiple items of work, each of which needs to be scheduled.
func ManySchedulableDescriptor ¶
func ManySchedulableDescriptor(ds []Descriptor, pc chan<- Process) ManySchedulable
ManySchedulableDescriptor adapts a slice of descriptors to the ManySchedulable interface.
func NManySchedulable ¶
func NManySchedulable(size int, delegate Schedulable) ManySchedulable
NManySchedulable creates a new worker pool of the given size. The pool will execute the given delegate exactly the number of times as requested by the provided size.
func OneSchedulable ¶
func OneSchedulable(s Schedulable) ManySchedulable
OneSchedulable conforms a single schedulable work item to the ManySchedulable interface.
type ManySchedulableSlice ¶
type ManySchedulableSlice []Schedulable
ManySchedulableSlice is an implementation of ManySchedulable for a slice of Schedulables.
func (ManySchedulableSlice) Len ¶
func (mss ManySchedulableSlice) Len() int
Len returns the length of the underlying slice.
func (ManySchedulableSlice) Run ¶
func (mss ManySchedulableSlice) Run(ctx context.Context, i int, er ErrorReporter)
Run executes the schedulable work at the *i*th element in the underlying slice.
type PanicError ¶ added in v0.2.0
type PanicError struct {
Cause error
}
func (*PanicError) Error ¶ added in v0.2.0
func (e *PanicError) Error() string
type Parent ¶
type Parent struct {
// contains filtered or unexported fields
}
Parent is a lifecycle that aggregates other lifecycles.
func NewParent ¶
NewParent creates a new parent lifecycle comprised of the given delegate lifecycles.
func (*Parent) Start ¶
func (p *Parent) Start(opts LifecycleStartOptions) StartedLifecycle
Start starts all the delegate lifecycles that are part of this parent in parallel and waits for them to terminate according to the specified error behavior.
func (*Parent) WithErrorBehavior ¶
func (p *Parent) WithErrorBehavior(errorBehavior ErrorBehavior) *Parent
WithErrorBehavior changes the error behavior for the parent. It does not affect the error behavior of any delegate lifecycles.
type Process ¶
Process is the primary unit of work for external users of the scheduler. It can be generalized by the Schedulable interface.
A process may (and should expect to) be run multiple times depending on the configuration of the descriptor that emits it.
Errors returned by a process are reported to a handler defined by the error behavior of the scheduler executing them.
func DescribeProcessFunc ¶
func DescribeProcessFunc(desc string, fn ProcessFunc) Process
DescribeProcessFunc associates the given description with the arbitrary process function and returns it as a process.
type ProcessError ¶ added in v0.2.0
func (*ProcessError) Error ¶ added in v0.2.0
func (e *ProcessError) Error() string
type ProcessFunc ¶
ProcessFunc converts an arbitrary function to a process.
func (ProcessFunc) Description ¶
func (ProcessFunc) Description() string
Description of an arbitrary function is always "<anonymous>" unless provided by DescribeProcessFunc.
type RecoveryDescriptor ¶
type RecoveryDescriptor struct {
// contains filtered or unexported fields
}
RecoveryDescriptor wraps a given descriptor so that it restarts if the descriptor itself fails. This is useful for descriptors that work off of external information (APIs, events, etc.).
func NewRecoveryDescriptor ¶
func NewRecoveryDescriptor(delegate Descriptor, opts ...RecoveryDescriptorOption) *RecoveryDescriptor
NewRecoveryDescriptor creates a new recovering descriptor wrapping the given delegate descriptor. Default backoff and retry parameters will be used.
func (*RecoveryDescriptor) Run ¶
func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error
Run delegates work to another descriptor, catching any errors are restarting the descriptor immediately if an error occurs. It might return a max retries error. It only terminates when the context is done or the max retries have been exceeded.
type RecoveryDescriptorOption ¶ added in v0.2.0
type RecoveryDescriptorOption interface { // ApplyToRecoveryDescriptorOptions configures the specified recovery // descriptor options for this option. ApplyToRecoveryDescriptorOptions(target *RecoveryDescriptorOptions) }
RecoveryDescriptorOption is a setter for one or more recovery descriptor options.
func RecoveryDescriptorWithBackoffFactory ¶ added in v0.2.0
func RecoveryDescriptorWithBackoffFactory(bf *backoff.Factory) RecoveryDescriptorOption
RecoveryDescriptorWithBackoffFactory changes the backoff algorithm to the specified one.
func RecoveryDescriptorWithClock ¶ added in v0.2.0
func RecoveryDescriptorWithClock(c clock.Clock) RecoveryDescriptorOption
RecoveryDescriptorWithClock changes the backoff clock to the specified one.
type RecoveryDescriptorOptionFunc ¶ added in v0.2.0
type RecoveryDescriptorOptionFunc func(target *RecoveryDescriptorOptions)
RecoveryDescriptorOptionFunc allows a function to be used as a recovery descriptor option.
func (RecoveryDescriptorOptionFunc) ApplyToRecoveryDescriptorOptions ¶ added in v0.2.0
func (rdof RecoveryDescriptorOptionFunc) ApplyToRecoveryDescriptorOptions(target *RecoveryDescriptorOptions)
ApplyToRecoveryDescriptorOptions configures the specified recovery descriptor options by calling this function.
type RecoveryDescriptorOptions ¶
type RecoveryDescriptorOptions struct { // BackoffFactory is the backoff algorithm to use when a guarded descriptor // fails. If not specified, a sensibile default // (DefaultRecoveryDescriptorBackoffFactory) is used. BackoffFactory *backoff.Factory // Clock is the clock implementation used to perform the backoff. Clock clock.Clock }
RecoveryDescriptorOptions contains fields that allow backoff and retry parameters to be set.
func (*RecoveryDescriptorOptions) ApplyOptions ¶ added in v0.2.0
func (o *RecoveryDescriptorOptions) ApplyOptions(opts []RecoveryDescriptorOption)
ApplyOptions runs each of the given options against this options struct.
type RepeatingDescriptor ¶
type RepeatingDescriptor struct {
// contains filtered or unexported fields
}
RepeatingDescriptor schedules a given process repeatedly. It does not allow process executions to overlap; i.e., an execution of a process immediately follows the completion of the prior execution.
func NewRepeatingDescriptor ¶
func NewRepeatingDescriptor(process Process) *RepeatingDescriptor
NewRepeatingDescriptor creates a new repeating descriptor that emits the given process.
type RestartableDescriptor ¶ added in v0.3.0
type RestartableDescriptor struct {
// contains filtered or unexported fields
}
RestartableDescriptor provides a mechanism for interrupting a delegate descriptor. It will cancel the context of any running delegates that it manages. When those delegates then exit, they will be automatically started again.
This descriptor does not return until its parent context is done or its delegate descriptor exits with an error. If its delegate descriptor finishes without an error, it can be restarted.
func NewRestartableDescriptor ¶ added in v0.3.0
func NewRestartableDescriptor(delegate Descriptor) (*RestartableDescriptor, func())
NewRestartableDescriptor returns a new descriptor that can be restarted by calling a function.
type Schedulable ¶
type Schedulable interface { // Run executes work with the expectation that the work will take all // possible measures to cleanly terminate when the given context is done. // Any errors that occur may be reported to the given error reporter. Run(ctx context.Context, er ErrorReporter) }
Schedulable represents an individual item of work that can be scheduled in this package. It is the most general form of work for managed execution.
func SchedulableDescriptor ¶
func SchedulableDescriptor(d Descriptor, pc chan<- Process) Schedulable
SchedulableDescriptor adapts a descriptor to the Schedulable interface.
func SchedulableLifecycle ¶
func SchedulableLifecycle(l Lifecycle, opts LifecycleStartOptions) Schedulable
SchedulableLifecycle adapts a lifecycle to the Schedulable interface.
func SchedulableProcess ¶
func SchedulableProcess(p Process) Schedulable
SchedulableProcess makes the given process conform to the Schedulable interface.
type SchedulableFunc ¶
type SchedulableFunc func(ctx context.Context, er ErrorReporter)
SchedulableFunc makes the given function conform to the Schedulable interface.
func (SchedulableFunc) Run ¶
func (sf SchedulableFunc) Run(ctx context.Context, er ErrorReporter)
Run calls the underlying function.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler provides a generic mechanism for managing parallel work. It extends Goroutines by providing consistent error handling and external termination.
All lifecycles in this package use one or more schedulers to execute. Generally, you should not need to use this lifecycle, but should prefer one of the other more concrete lifecycles. However, in conjuction with the Schedulable interface adapters for lifecycles, descriptors, and processes, it is relatively easy to implement new scheduling algorithms if desired.
Example ¶
package main import ( "context" "fmt" "sync/atomic" "github.com/puppetlabs/leg/scheduler" ) func main() { c := int32(5) worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) { for { select { case <-ctx.Done(): return default: } if atomic.AddInt32(&c, -1) < 0 { return } fmt.Println("Mmm... pie.") } }) l := scheduler.NewScheduler(scheduler.OneSchedulable(worker)) sl := l.Start(scheduler.LifecycleStartOptions{}) <-sl.Done() }
Output: Mmm... pie. Mmm... pie. Mmm... pie. Mmm... pie. Mmm... pie.
func NewScheduler ¶
func NewScheduler(children ManySchedulable) *Scheduler
NewScheduler creates a scheduler to manage all of the schedulable children provided.
func (*Scheduler) Start ¶
func (s *Scheduler) Start(opts LifecycleStartOptions) StartedLifecycle
Start starts this scheduler, asynchronously dispatching and managing all of the provided children.
func (*Scheduler) WithErrorBehavior ¶
func (s *Scheduler) WithErrorBehavior(behavior ErrorBehavior) *Scheduler
WithErrorBehavior sets the error behavior for this scheduler.
func (*Scheduler) WithEventHandler ¶
func (s *Scheduler) WithEventHandler(handler SchedulerEventHandler) *Scheduler
WithEventHandler adds an event handler to the list of event handlers to trigger.
type SchedulerEventHandler ¶
type SchedulerEventHandler interface {
// OnDone is fired when the scheduler terminates.
OnDone()
}
SchedulerEventHandler defines an interface by which a client can respond to lifecycle events.
type SchedulerEventHandlerFuncs ¶
type SchedulerEventHandlerFuncs struct {
// OnDoneFunc is the function to call when the scheduler terminates.
OnDoneFunc func()
}
SchedulerEventHandlerFuncs allows a client to partially implement an event handler, choosing to receiving only the events they provide handlers for.
func (*SchedulerEventHandlerFuncs) OnDone ¶
func (sehf *SchedulerEventHandlerFuncs) OnDone()
OnDone fires OnDoneFunc if non-nil.
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment is a bounded executor for processes.
It manages a slice of descriptors, which are responsible for emitting the processes to schedule. Each descriptor is run concurrently when the segment is started.
When all descriptors terminate, a segment automatically terminates. It attempts to complete all remaining work before terminating.
The concurrency of the segment defines the size of the worker pool that handle processes. If all workers are busy, the channel used by descriptors to emit processes will block until a process completes.
func NewSegment ¶
func NewSegment(concurrency int, descriptors []Descriptor) *Segment
NewSegment creates a new segment with the given worker pool size (concurrency) and slice of descriptors.
func (*Segment) Start ¶
func (s *Segment) Start(opts LifecycleStartOptions) StartedLifecycle
Start starts this segment, creating a worker pool of size equal to the concurrency of this segment and executing all descriptors.
func (*Segment) WithDescriptorErrorBehavior ¶
func (s *Segment) WithDescriptorErrorBehavior(behavior ErrorBehavior) *Segment
WithDescriptorErrorBehavior sets the error behavior for descriptors in this segment.
func (*Segment) WithErrorBehavior ¶
func (s *Segment) WithErrorBehavior(behavior ErrorBehavior) *Segment
WithErrorBehavior sets the error behavior for descriptors and processes for this segment.
func (*Segment) WithProcessErrorBehavior ¶
func (s *Segment) WithProcessErrorBehavior(behavior ErrorBehavior) *Segment
WithProcessErrorBehavior sets the error behavior for processes run by this segment.
type StartedLifecycle ¶
type StartedLifecycle interface { // Done returns a channel that closes when the lifecycle terminates. Done() <-chan struct{} // Errs returns the errors associated with this lifecycle. If the lifecycle // is not yet closed, this method returns nil. Errs() []error // Close terminates descriptors, dropping any processes emitted by those // descriptors, and asks any running processes to terminate. Close() }
StartedLifecycle represents a fully configured, operating scheduler.