Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Context ¶
type Context struct { *Dispatcher // contains filtered or unexported fields }
A Context is created for every running Process.
type Delegate ¶
type Delegate interface { // JobScheduled is invoked when Job is scheduled as a new Process, and is waiting to start. JobScheduled(process *Process) // JobBlocked is invoked when a Job's Process is due to run, but is waiting for another Process(es) to finish. JobBlocked(process *Process, blockers []*Process) // JobStarting is invoked just before a Job's Process is started. JobStarting(process *Process) // JobProgressed is invoked whenever Context.Progress is called from a running Process. JobProgressed(process *Process, payload any) // JobEnded is invoked whenever a Job's Process ends, whether by completing, being superseded, or having its // context cancelled. Every invocation of JobScheduled will eventually be followed by JobEnded. JobEnded(process *Process, err error) }
Delegate implementations can be assigned to any Dispatcher (i.e. a Scheduler or a Context), to receive notifications about changes in Process state.
type Dispatcher ¶
type Dispatcher struct { // Delegate receives notifications whenever a Process started by the Dispatcher, or one of its descendents, is // scheduled, blocked, starting, progressed, or ended. // // Care should be taken to assign Delegate when no child processes are running, to avoid races. If it is necessary // to add and/or remove event handlers while processes are running, consider using events.Handler as your Delegate, // which can be safely modified from multiple goroutines. Delegate Delegate // contains filtered or unexported fields }
A Dispatcher can be used to schedule new jobs, and track their progress through its Delegate.
Dispatcher is used by the Scheduler and Context types. It is not intended to be used on its own. It can, however, be pulled out and passed to more complex scheduling functions that are intended to work on either Scheduler or Context values.
func (*Dispatcher) Parallel ¶
func (d *Dispatcher) Parallel(ctx context.Context, concurrency int, jobs ...*Job)
Parallel is identical to Run, but allows maximum concurrency to be specified for the batch.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context, jobs ...*Job)
Run schedules the given jobs to run one after the other, and blocks until they have all ended.
A failing job will not prevent subsequent jobs from running. To halt a batch of jobs on error, use Run with a context that can be cancelled by Delegate.JobEnded when it receives an error.
Internally, handling each context requires an extra goroutine to live from when the given batch is scheduled until the last job starts. If you don't need to be able to stop your jobs, leave ctx as nil to save the cost of a goroutine.
func (*Dispatcher) Schedule ¶
Schedule schedules a job to be run in the background, with an optional delay. A positive repeatAfter value will reschedule the job upon its completion with a new delay.
Internally, handling each context requires an extra goroutine to live from when the given job is scheduled until it starts. If you don't need to be able to stop your job, leave ctx as nil to save the cost of a goroutine.
type ErrContextExpired ¶
ErrContextExpired is passed to Delegate.JobEnded when a Process's context expires before it starts.
func (ErrContextExpired) Error ¶
func (e ErrContextExpired) Error() string
func (ErrContextExpired) Unwrap ¶
func (e ErrContextExpired) Unwrap() error
type ErrDiscarded ¶
type ErrDiscarded struct{}
ErrDiscarded is passed to Delegate.JobEnded when a Process is discarded by its Job's Readiness function.
func (ErrDiscarded) Error ¶
func (e ErrDiscarded) Error() string
type ErrSchedulerContextExpired ¶
type ErrSchedulerContextExpired struct {
ErrContextExpired
}
ErrSchedulerContextExpired is passed to Delegate.JobEnded for every unstarted Process of a Scheduler whose context has expired.
type ErrSuperseded ¶
type ErrSuperseded struct {
Replacement *Process
}
ErrSuperseded is passed to Delegate.JobEnded when a Process is by Replacement's Job.Supersedes function before it can start.
func (ErrSuperseded) Error ¶
func (e ErrSuperseded) Error() string
type Job ¶
type Job struct { // Runner is the work to be performed by the Job. Runner func(ctx *Context) error // Tag can be used by the Job creator to attach whatever data they need to a Job. It can be used in Delegate methods // to report activity, and in Supersedes and Readiness functions to identify jobs in other processes. It has no // meaning in this package. Tag any // Supersedes, if not nil, is invoked when a Process is about to start, with a slice of other processes that have // not yet started. It should return a (possibly empty) subset of that slice containing processes that are to be // superseded by the receiver's process. Supersedes func(enqueued []*Process) (superseded []*Process) // Readiness, if not nil, is invoked when a Process is about to start, and should return one of Ready(), Discard(), // or Blocked(blockers...), where blockers is a subset of the passed slice of running processes. If nil, the Job is // assumed to always be ready. Readiness func(running []*Process) Readiness }
A Job is a single unit of work. A Job becomes a Process when it is scheduled to run with a Scheduler or Context.
Jobs should be considered immutable. You can safely schedule a Job more than once, on multiple
type Process ¶
type Process struct { Job *Job // contains filtered or unexported fields }
A Process is a Job that has been scheduled.
func (*Process) IsBlocked ¶
IsBlocked is true when the Process is waiting for another Process to end.
func (*Process) IsEnded ¶
IsEnded is true when the Process is finished, has been superseded, or has an expired context.
func (*Process) IsScheduled ¶
IsScheduled is true when the Process is not yet due to run, or has not yet had its Readiness evaluated.
type Readiness ¶
type Readiness struct {
// contains filtered or unexported fields
}
Readiness is the type returned by Job.Readiness that indicates whether a job is ready to run. Its zero value is equivalent to Ready(). Other values can be declared with Blocked(blockers...) and Discard().
func Blocked ¶
Blocked indicates that a job is blocked by another job or jobs, and should wait for them to finish.
type Scheduler ¶
type Scheduler struct { *Dispatcher // contains filtered or unexported fields }
A Scheduler is the top level Dispatcher for a group of processes.
A zero value is not usable. Use NewScheduler instead.
func NewScheduler ¶
NewScheduler returns a ready-to-use Scheduler. It starts a scheduling goroutine that will run until the given context expires, after which newly scheduled jobs will panic.
In normal use, the given context should be context.Background. Expiring a Scheduler's context stops all schedule processing, including that of running jobs. No more events will be emitted, including end-job events. Goroutines trying to emit those events will freeze and leak. Only expire a Scheduler's context after a clean wrap-up of all jobs, either in testing, or when forcing a process to shut down.