scheduler

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2021 License: Apache-2.0 Imports: 13 Imported by: 4

Documentation

Overview

Package scheduler provides a managed API to Goroutines using Lifecycles.

The most basic type of management is using the Schedulable interface with a Scheduler:

worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-time.After(100 * time.Millisecond):
            fmt.Println("Mmm... pie.")
        }
    }
})

l := scheduler.NewScheduler(scheduler.OneSchedulable(worker))

sl := l.Start(scheduler.LifecycleStartOptions{})

time.Sleep(1 * time.Second)

// Tell the scheduler to start closing.
sl.Close()

// Wait for all managed routines to finish.
<-sl.Done()

Schedulers terminate when all of their children exit.

You can choose from three canned error behaviors for most lifecycles: ErrorBehaviorDrop, ErrorBehaviorCollect, and ErrorBehaviorTerminate. ErrorBehaviorDrop ignores errors, allowing the lifecycle to continue executing normally. ErrorBehaviorCollect stores all errors returned (potentially allowing for unbounded memory growth, so use with discretion) and provides them when the lifecycle completes. ErrorBehaviorTerminate causes the lifecycle to close as soon as it receives an error. You may implement your own error behaviors by conforming to the ErrorBehavior interface.

If you have a few lifecycles that are parameterized differently and you want to manage them together, the Parent lifecycle aggregates them and runs them in parallel.

This package also provides a more sophisticated lifecycle, Segment. A Segment provides a worker pool and a mechanism for dispatching work. Dispatchers implement the Descriptor interface and work items implement the Process interface. The example above could equivalently be written as follows:

proc := scheduler.ProcessFunc(func(ctx context.Context) error {
    fmt.Println("Mmm... pie.")
    return nil
})

l := scheduler.NewSegment(1, []scheduler.Descriptor{
    scheduler.NewIntervalDescriptor(100*time.Millisecond, proc),
})
// Start, close, and wait on the lifecycle as before.

Descriptors are particularly useful when asynchronously waiting on events from external APIs for processing.

Index

Examples

Constants

This section is empty.

Variables

DefaultRecoveryDescriptorBackoffFactory is an exponential backoff starting at 5 milliseconds with a factor of 2, a 10 second cap, and full jitter using the default RNG. It automatically resets the backoff after 30 seconds of inactivity.

Functions

func CloseWaitContext

func CloseWaitContext(ctx context.Context, lc StartedLifecycle) error

CloseWaitContext terminates the given lifecycle and then gracefully waits for it to shut down.

func NewAdhocDescriptor

func NewAdhocDescriptor() (*AdhocDescriptor, *AdhocSubmitter)

NewAdhocDescriptor returns a bound pair of adhoc descriptor and submitter. Submitting work items through the returned submitter will enqueue them to the returned descriptor.

func WaitContext

func WaitContext(ctx context.Context, lc StartedLifecycle) error

WaitContext waits until the given lifecycle completes or the context is done. If the context is canceled first, the context error is returned.

Types

type AdhocDescriptor

type AdhocDescriptor struct {
	// contains filtered or unexported fields
}

AdhocDescriptor is a descriptor that allows external access to submit work to be scheduled. It is paired with an AdhocSubmitter, which should be provided to external clients to receive the work.

This descriptor is non-blocking; it will indefinitely queue work, consuming a proportional amount of memory per pending process if the scheduler does not have availability. You may want to rate limit submissions.

func (*AdhocDescriptor) Run

func (ad *AdhocDescriptor) Run(ctx context.Context, pc chan<- Process) error

Run executes this descriptor with the given process channel.

type AdhocSubmitter

type AdhocSubmitter struct {
	// contains filtered or unexported fields
}

AdhocSubmitter is used to submit work to an adhoc descriptor.

Work is always immediately enqueued.

func (*AdhocSubmitter) QueueLen

func (as *AdhocSubmitter) QueueLen() int

QueueLen returns the number of work items in the descriptor's queue. These items have not yet been submitted to the scheduler for processing.

func (*AdhocSubmitter) Submit

func (as *AdhocSubmitter) Submit(p Process) <-chan error

Submit adds a new work item to the descriptor's queue.

type Descriptor

type Descriptor interface {
	Run(ctx context.Context, pc chan<- Process) error
}

Descriptor provides a way to emit work to a scheduler. Descriptors are provided to segment lifecycles and are monitored by them for completion.

Errors returned by descriptors are reported to a handler defined by the error behavior of the scheduler executing them.

type DescriptorError added in v0.2.0

type DescriptorError struct {
	Descriptor Descriptor
	Cause      error
}

func (*DescriptorError) Error added in v0.2.0

func (e *DescriptorError) Error() string

type DescriptorFunc added in v0.3.0

type DescriptorFunc func(ctx context.Context, pc chan<- Process) error

DescriptorFunc converts an arbitrary function to a descriptor.

func (DescriptorFunc) Run added in v0.3.0

func (d DescriptorFunc) Run(ctx context.Context, pc chan<- Process) error

Run calls the underlying fuction.

type ErrorBehavior

type ErrorBehavior interface {
	// NewHandler returns a new handler that provides the desired error
	// management functionality.
	NewHandler() ErrorHandler
}

ErrorBehavior defines the way a lifecycle handles errors from descriptors and processes.

var (
	// ErrorBehaviorCollect allows all processes to complete and returns a full
	// set of errors when they have finished.
	ErrorBehaviorCollect ErrorBehavior = &errorBehaviorCollect{}

	// ErrorBehaviorTerminate causes the entire lifecycle to clean up and exit
	// when the first error occurs.
	ErrorBehaviorTerminate ErrorBehavior = &errorBehaviorTerminate{}

	// ErrorBehaviorDrop ignores errors, merely logging them for reference.
	ErrorBehaviorDrop ErrorBehavior = &errorBehaviorDrop{}
)

type ErrorHandler

type ErrorHandler interface {
	ErrorReporter

	// Done returns a channel that closes when this error handler cannot accept
	// further submissions.
	Done() <-chan struct{}

	// Errors returns a copy of the errors collected by this error handler.
	Errs() []error
}

ErrorHandler defines a contract that a Scheduler uses to manage errors. When the error handler's Done channel closes, the scheduler begins the process of terminating. The errors returned by the error handler then become the errors returned by the scheduler.

type ErrorReporter

type ErrorReporter interface {
	Put(err error)
}

ErrorReporter allows reporter processes to submit errors to a handler created by a particular error behavior.

type ImmediateDescriptor

type ImmediateDescriptor struct {
	// contains filtered or unexported fields
}

ImmediateDescriptor schedules a given process exactly once and then terminates.

func NewImmediateDescriptor

func NewImmediateDescriptor(process Process) *ImmediateDescriptor

NewImmediateDescriptor creates an immediately-scheduling descriptor for the given process.

func (*ImmediateDescriptor) Run

func (id *ImmediateDescriptor) Run(ctx context.Context, pc chan<- Process) error

Run schedules the process specified in this descriptor.

type IntervalDescriptor

type IntervalDescriptor struct {
	// contains filtered or unexported fields
}

IntervalDescriptor schedules a given process many times with the same time duration between subsequent runs.

The interval refers specifically to the time between executions; it is relative to the time the prior execution ended, not the time said execution started.

For example, starting from midnight (00:00:00), given an interval of 60 seconds, and a process that takes 20 seconds to execute, the second execution will occur around 00:01:20.

func NewIntervalDescriptor

func NewIntervalDescriptor(interval time.Duration, process Process) *IntervalDescriptor

NewIntervalDescriptor creates a new descriptor that repeats the given process according to the given interval.

func (*IntervalDescriptor) Run

func (id *IntervalDescriptor) Run(ctx context.Context, pc chan<- Process) error

Run starts scheduling this descriptor's process to the given channel. It will terminate only when the context terminates.

type Lifecycle

type Lifecycle interface {
	// Start starts all the descriptors for the lifecycle and begins handling
	// the processes for them.
	Start(opts LifecycleStartOptions) StartedLifecycle
}

Lifecycle represents a partially or fully configured scheduler instance. Starting a lifecycle will dispatch the descriptors attached to the given lifecycle.

type LifecycleStartOptions

type LifecycleStartOptions struct {
	// Capturer is the error capturer for this lifecycle.
	Capturer trackers.Capturer
}

LifecycleStartOptions are the options all lifecycles must handle when starting.

type ManySchedulable

type ManySchedulable interface {
	// Len returns the number of items of work to schedule. The contract of this
	// interface requires that this be a pure function.
	Len() int

	// Run executes the *i*th schedulable work.
	Run(ctx context.Context, i int, er ErrorReporter)
}

ManySchedulable represents multiple items of work, each of which needs to be scheduled.

func ManySchedulableDescriptor

func ManySchedulableDescriptor(ds []Descriptor, pc chan<- Process) ManySchedulable

ManySchedulableDescriptor adapts a slice of descriptors to the ManySchedulable interface.

func NManySchedulable

func NManySchedulable(size int, delegate Schedulable) ManySchedulable

NManySchedulable creates a new worker pool of the given size. The pool will execute the given delegate exactly the number of times as requested by the provided size.

func OneSchedulable

func OneSchedulable(s Schedulable) ManySchedulable

OneSchedulable conforms a single schedulable work item to the ManySchedulable interface.

type ManySchedulableSlice

type ManySchedulableSlice []Schedulable

ManySchedulableSlice is an implementation of ManySchedulable for a slice of Schedulables.

func (ManySchedulableSlice) Len

func (mss ManySchedulableSlice) Len() int

Len returns the length of the underlying slice.

func (ManySchedulableSlice) Run

Run executes the schedulable work at the *i*th element in the underlying slice.

type PanicError added in v0.2.0

type PanicError struct {
	Cause error
}

func (*PanicError) Error added in v0.2.0

func (e *PanicError) Error() string

type Parent

type Parent struct {
	// contains filtered or unexported fields
}

Parent is a lifecycle that aggregates other lifecycles.

func NewParent

func NewParent(delegates ...Lifecycle) *Parent

NewParent creates a new parent lifecycle comprised of the given delegate lifecycles.

func (*Parent) Start

Start starts all the delegate lifecycles that are part of this parent in parallel and waits for them to terminate according to the specified error behavior.

func (*Parent) WithErrorBehavior

func (p *Parent) WithErrorBehavior(errorBehavior ErrorBehavior) *Parent

WithErrorBehavior changes the error behavior for the parent. It does not affect the error behavior of any delegate lifecycles.

type Process

type Process interface {
	Description() string
	Run(ctx context.Context) error
}

Process is the primary unit of work for external users of the scheduler. It can be generalized by the Schedulable interface.

A process may (and should expect to) be run multiple times depending on the configuration of the descriptor that emits it.

Errors returned by a process are reported to a handler defined by the error behavior of the scheduler executing them.

func DescribeProcessFunc

func DescribeProcessFunc(desc string, fn ProcessFunc) Process

DescribeProcessFunc associates the given description with the arbitrary process function and returns it as a process.

type ProcessError added in v0.2.0

type ProcessError struct {
	Request *request.Request
	Process Process
	Cause   error
}

func (*ProcessError) Error added in v0.2.0

func (e *ProcessError) Error() string

type ProcessFunc

type ProcessFunc func(ctx context.Context) error

ProcessFunc converts an arbitrary function to a process.

func (ProcessFunc) Description

func (ProcessFunc) Description() string

Description of an arbitrary function is always "<anonymous>" unless provided by DescribeProcessFunc.

func (ProcessFunc) Run

func (p ProcessFunc) Run(ctx context.Context) error

Run calls the underlying function.

type RecoveryDescriptor

type RecoveryDescriptor struct {
	// contains filtered or unexported fields
}

RecoveryDescriptor wraps a given descriptor so that it restarts if the descriptor itself fails. This is useful for descriptors that work off of external information (APIs, events, etc.).

func NewRecoveryDescriptor

func NewRecoveryDescriptor(delegate Descriptor, opts ...RecoveryDescriptorOption) *RecoveryDescriptor

NewRecoveryDescriptor creates a new recovering descriptor wrapping the given delegate descriptor. Default backoff and retry parameters will be used.

func (*RecoveryDescriptor) Run

func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error

Run delegates work to another descriptor, catching any errors are restarting the descriptor immediately if an error occurs. It might return a max retries error. It only terminates when the context is done or the max retries have been exceeded.

type RecoveryDescriptorOption added in v0.2.0

type RecoveryDescriptorOption interface {
	// ApplyToRecoveryDescriptorOptions configures the specified recovery
	// descriptor options for this option.
	ApplyToRecoveryDescriptorOptions(target *RecoveryDescriptorOptions)
}

RecoveryDescriptorOption is a setter for one or more recovery descriptor options.

func RecoveryDescriptorWithBackoffFactory added in v0.2.0

func RecoveryDescriptorWithBackoffFactory(bf *backoff.Factory) RecoveryDescriptorOption

RecoveryDescriptorWithBackoffFactory changes the backoff algorithm to the specified one.

func RecoveryDescriptorWithClock added in v0.2.0

func RecoveryDescriptorWithClock(c clock.Clock) RecoveryDescriptorOption

RecoveryDescriptorWithClock changes the backoff clock to the specified one.

type RecoveryDescriptorOptionFunc added in v0.2.0

type RecoveryDescriptorOptionFunc func(target *RecoveryDescriptorOptions)

RecoveryDescriptorOptionFunc allows a function to be used as a recovery descriptor option.

func (RecoveryDescriptorOptionFunc) ApplyToRecoveryDescriptorOptions added in v0.2.0

func (rdof RecoveryDescriptorOptionFunc) ApplyToRecoveryDescriptorOptions(target *RecoveryDescriptorOptions)

ApplyToRecoveryDescriptorOptions configures the specified recovery descriptor options by calling this function.

type RecoveryDescriptorOptions

type RecoveryDescriptorOptions struct {
	// BackoffFactory is the backoff algorithm to use when a guarded descriptor
	// fails. If not specified, a sensibile default
	// (DefaultRecoveryDescriptorBackoffFactory) is used.
	BackoffFactory *backoff.Factory

	// Clock is the clock implementation used to perform the backoff.
	Clock clock.Clock
}

RecoveryDescriptorOptions contains fields that allow backoff and retry parameters to be set.

func (*RecoveryDescriptorOptions) ApplyOptions added in v0.2.0

func (o *RecoveryDescriptorOptions) ApplyOptions(opts []RecoveryDescriptorOption)

ApplyOptions runs each of the given options against this options struct.

type RepeatingDescriptor

type RepeatingDescriptor struct {
	// contains filtered or unexported fields
}

RepeatingDescriptor schedules a given process repeatedly. It does not allow process executions to overlap; i.e., an execution of a process immediately follows the completion of the prior execution.

func NewRepeatingDescriptor

func NewRepeatingDescriptor(process Process) *RepeatingDescriptor

NewRepeatingDescriptor creates a new repeating descriptor that emits the given process.

func (*RepeatingDescriptor) Run

func (rd *RepeatingDescriptor) Run(ctx context.Context, pc chan<- Process) error

Run starts scheduling this descriptor's process. It terminates only when the context is done.

type RestartableDescriptor added in v0.3.0

type RestartableDescriptor struct {
	// contains filtered or unexported fields
}

RestartableDescriptor provides a mechanism for interrupting a delegate descriptor. It will cancel the context of any running delegates that it manages. When those delegates then exit, they will be automatically started again.

This descriptor does not return until its parent context is done or its delegate descriptor exits with an error. If its delegate descriptor finishes without an error, it can be restarted.

func NewRestartableDescriptor added in v0.3.0

func NewRestartableDescriptor(delegate Descriptor) (*RestartableDescriptor, func())

NewRestartableDescriptor returns a new descriptor that can be restarted by calling a function.

func (*RestartableDescriptor) Run added in v0.3.0

func (rd *RestartableDescriptor) Run(ctx context.Context, pc chan<- Process) error

type Schedulable

type Schedulable interface {
	// Run executes work with the expectation that the work will take all
	// possible measures to cleanly terminate when the given context is done.
	// Any errors that occur may be reported to the given error reporter.
	Run(ctx context.Context, er ErrorReporter)
}

Schedulable represents an individual item of work that can be scheduled in this package. It is the most general form of work for managed execution.

func SchedulableDescriptor

func SchedulableDescriptor(d Descriptor, pc chan<- Process) Schedulable

SchedulableDescriptor adapts a descriptor to the Schedulable interface.

func SchedulableLifecycle

func SchedulableLifecycle(l Lifecycle, opts LifecycleStartOptions) Schedulable

SchedulableLifecycle adapts a lifecycle to the Schedulable interface.

func SchedulableProcess

func SchedulableProcess(p Process) Schedulable

SchedulableProcess makes the given process conform to the Schedulable interface.

type SchedulableFunc

type SchedulableFunc func(ctx context.Context, er ErrorReporter)

SchedulableFunc makes the given function conform to the Schedulable interface.

func (SchedulableFunc) Run

func (sf SchedulableFunc) Run(ctx context.Context, er ErrorReporter)

Run calls the underlying function.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler provides a generic mechanism for managing parallel work. It extends Goroutines by providing consistent error handling and external termination.

All lifecycles in this package use one or more schedulers to execute. Generally, you should not need to use this lifecycle, but should prefer one of the other more concrete lifecycles. However, in conjuction with the Schedulable interface adapters for lifecycles, descriptors, and processes, it is relatively easy to implement new scheduling algorithms if desired.

Example
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	"github.com/puppetlabs/leg/scheduler"
)

func main() {
	c := int32(5)

	worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) {
		for {
			select {
			case <-ctx.Done():
				return
			default:
			}

			if atomic.AddInt32(&c, -1) < 0 {
				return
			}

			fmt.Println("Mmm... pie.")
		}
	})

	l := scheduler.NewScheduler(scheduler.OneSchedulable(worker))

	sl := l.Start(scheduler.LifecycleStartOptions{})
	<-sl.Done()

}
Output:

Mmm... pie.
Mmm... pie.
Mmm... pie.
Mmm... pie.
Mmm... pie.

func NewScheduler

func NewScheduler(children ManySchedulable) *Scheduler

NewScheduler creates a scheduler to manage all of the schedulable children provided.

func (*Scheduler) Start

Start starts this scheduler, asynchronously dispatching and managing all of the provided children.

func (*Scheduler) WithErrorBehavior

func (s *Scheduler) WithErrorBehavior(behavior ErrorBehavior) *Scheduler

WithErrorBehavior sets the error behavior for this scheduler.

func (*Scheduler) WithEventHandler

func (s *Scheduler) WithEventHandler(handler SchedulerEventHandler) *Scheduler

WithEventHandler adds an event handler to the list of event handlers to trigger.

type SchedulerEventHandler

type SchedulerEventHandler interface {
	// OnDone is fired when the scheduler terminates.
	OnDone()
}

SchedulerEventHandler defines an interface by which a client can respond to lifecycle events.

type SchedulerEventHandlerFuncs

type SchedulerEventHandlerFuncs struct {
	// OnDoneFunc is the function to call when the scheduler terminates.
	OnDoneFunc func()
}

SchedulerEventHandlerFuncs allows a client to partially implement an event handler, choosing to receiving only the events they provide handlers for.

func (*SchedulerEventHandlerFuncs) OnDone

func (sehf *SchedulerEventHandlerFuncs) OnDone()

OnDone fires OnDoneFunc if non-nil.

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment is a bounded executor for processes.

It manages a slice of descriptors, which are responsible for emitting the processes to schedule. Each descriptor is run concurrently when the segment is started.

When all descriptors terminate, a segment automatically terminates. It attempts to complete all remaining work before terminating.

The concurrency of the segment defines the size of the worker pool that handle processes. If all workers are busy, the channel used by descriptors to emit processes will block until a process completes.

func NewSegment

func NewSegment(concurrency int, descriptors []Descriptor) *Segment

NewSegment creates a new segment with the given worker pool size (concurrency) and slice of descriptors.

func (*Segment) Start

Start starts this segment, creating a worker pool of size equal to the concurrency of this segment and executing all descriptors.

func (*Segment) WithDescriptorErrorBehavior

func (s *Segment) WithDescriptorErrorBehavior(behavior ErrorBehavior) *Segment

WithDescriptorErrorBehavior sets the error behavior for descriptors in this segment.

func (*Segment) WithErrorBehavior

func (s *Segment) WithErrorBehavior(behavior ErrorBehavior) *Segment

WithErrorBehavior sets the error behavior for descriptors and processes for this segment.

func (*Segment) WithProcessErrorBehavior

func (s *Segment) WithProcessErrorBehavior(behavior ErrorBehavior) *Segment

WithProcessErrorBehavior sets the error behavior for processes run by this segment.

type StartedLifecycle

type StartedLifecycle interface {
	// Done returns a channel that closes when the lifecycle terminates.
	Done() <-chan struct{}

	// Errs returns the errors associated with this lifecycle. If the lifecycle
	// is not yet closed, this method returns nil.
	Errs() []error

	// Close terminates descriptors, dropping any processes emitted by those
	// descriptors, and asks any running processes to terminate.
	Close()
}

StartedLifecycle represents a fully configured, operating scheduler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL