eon

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

README

Eon

Task scheduling for Golang

go get github.com/hx/eon

GoDoc

Overview

Eon is an in-process task scheduler. Its key advantages include:

  • Delay and repeat jobs.
  • Debounce and dedupe multiple requests for the same job.
  • Supersede jobs with other jobs (e.g. "reload all" replaces "reload one").
  • Finely tunable concurrency limits with FIFO queuing.
  • Events for scheduling, starting, progressing, and finishing jobs, providing flexible UI integration.
  • Jobs can enqueue child jobs, complete with event bubbling.
  • Context-based cancellation at any level, including the root scheduler.

Usage

Typical usage of Eon involves a single Scheduler spanning the lifetime of a process.

package main

import (
    "context"
    "fmt"
    "github.com/hx/eon"
    "os/signal"
    "syscall"
)

func main() {
    ctx, _ := signal.NotifyContext(
        context.Background(),
        syscall.SIGINT,
        syscall.SIGTERM,
    )

    scheduler := eon.NewScheduler(ctx)

    scheduler.Run(ctx, &eon.Job{
        Runner: func(ctx *eon.Context) error {
            _, err := fmt.Println("Hello, world!")
            return err
        },
    })

    <-ctx.Done() // Wait for an INT or TERM signal
}
Timing

Jobs can be scheduled with delays.

To delay a job by 10 seconds:

scheduler.Schedule(ctx, 10*time.Second, 0, &eon.Job{ … })

Jobs can also be made to self-reschedule on completion, by specifying a positive repeatAfter value.

To repeat the above job with one-minute pauses:

scheduler.Schedule(ctx, 10*time.Second, time.minute, &eon.Job{ … })

This is not the same as a one-minute interval. It is the amount of time between a process finishing and the next process starting. The total interval is the repeatAfter value plus the duration of the process itself.

Jobs that return errors are not rescheduled.

Parallelism

The Run function in the example above accepts one or more Job values, and runs them one after the other. It is equivalent to calling the Parallel function with a concurrency value of 1.

You can also specify maximum concurrency:

jobs := make([]*eon.Job, len(users))
for i, user := range users {
    jobs[i] = reapExpiredAuthTokens(user)
}

// Process a maximum of 3 users at once
scheduler.Parallel(ctx, 3, jobs...)

Specify concurrency as 0 to set no limit.

Child jobs

Processes (running jobs) can schedule children using their Context arguments.

scheduler.Run(
    ctx,
    &eon.Job{
        Tag: "parent job",
        Runner: func(ctx *eon.Context) error {
            ctx.Run(
                ctx.Ctx(),
                &eon.Job{
                    Tag: "child job",
                    Runner: func(ctx *eon.Context) error {
                        …
                    },
                },
            )

            return nil
        },
    },
)
Monitoring

To monitor the activity of a Dispatcher, assign a Delegate. Package events contains several implementations, or you can use your own.

scheduler.Delegate = &events.Delegate{
    Scheduled: func(process *eon.Process) {
        fmt.Printf("Process %v scheduled.\n", process.Job.Tag)
    },
    Ended: func(process *eon.Process, err error) {
        fmt.Printf("Process %v ended with %v.\n", process.Job.Tag, err)
    },
}

Other events include Blocked, Starting, and Progressed. See API docs for more information.

Although processes run on their own goroutines, schedulers run on a single goroutine. This includes calls to Delegate methods, which ensures they are invoked in the expected order. If you want to run expensive operations in delegates, consider using extra goroutines so as not to block the scheduler.

Delegates can also be assigned to the Context of any running process. Delegates are invoked in ascending order, with the root (scheduler) delegate being invoked last.

Exclusivity

Specifying a Readiness function allows jobs to block, particularly based on running jobs (processes).

job := &eon.Job{
    Tag: SomeType{},
    Runner: func(ctx *eon.Context) error {
        …
    },
    Readiness: func(running []*eon.Process) eon.Readiness {
        for _, process := range running {
            if _, ok := process.Job.Tag.(SomeType); ok {
                return eon.Blocked(process)
            }
        }
        return eon.Ready()
    },
}

Readiness functions may return one of:

  • Ready(), indicating that the job is ready to run;
  • Discard(), indicating the job should be discarded (e.g. as a duplicate); or
  • Blocked(processes...), indicating the job conflicts with one or more running processes. Given processes do not affect behaviour, but are included in Blocked events.
Supersession

Before a job starts, it has an opportunity to supersede other scheduled jobs.

scheduler.Run(ctx, &eon.Job{
    Tag:    "delete all access tokens",
    Runner: func(ctx *eon.Context) error { … },
    Supersedes: func(enqueued []*eon.Process) (superseded []*eon.Process) {
        for _, process := range enqueued {
            if process.Job.Tag == "delete all access tokens" || 
                process.Job.Tag == "delete expired access tokens" {
                superseded = append(superseded, process)
            }
        }
        return
    },
})

Common behaviours

Cancel on error

It is often beneficial to halt a sequence of jobs after a job fails.

Eon does not provide an opinionated means to achieve this, since there are enumerable nuances to error propagation, application-specific job wrappers, and approaches to concurrency.

A "run jobs until one fails" function in your application is a reasonable component of a robust solution, and context cancellation is generally the right tool for the job, as demonstrated in this example:

type BatchHalted struct {
    FailedJobIndex int
    Jobs           []*eon.Job
    Err            error
}

func (b BatchHalted) Error() string {
    return fmt.Sprintf("batch halted at job %d of %d: %s", b.FailedJobIndex+1, len(b.Jobs), b.Err)
}

func (b BatchHalted) Unwrap() error { return b.Err }

func RunUntilError(ctx context.Context, dispatcher *eon.Dispatcher, jobs ...*eon.Job) {
    var (
        c, cancel        = context.WithCancelCause(ctx)
        originalDelegate = dispatcher.Delegate
        wg               sync.WaitGroup
        completed int
    )
    wg.Add(len(jobs))
    dispatcher.Delegate = events.MultiDelegate{originalDelegate, &events.Delegate{
        Ended: func(process *eon.Process, err error) {
            cancel(BatchHalted{completed, jobs, err}) // no-op after first call
            wg.Done()
            completed++
        },
    }}
    dispatcher.Run(c, jobs...)
    
    // Here, blocking on the wait group allows synchronous execution in the caller.
    // It also restores the delegate at the correct time. This function could be
    // made asynchronous, in which case, using an events.Handler instead of
    // events.MultiDelegate would be a more suitable concurrency-safe alternative.
    wg.Wait()
    dispatcher.Delegate = originalDelegate
}
TUI reporting

TODO

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	*Dispatcher
	// contains filtered or unexported fields
}

A Context is created for every running Process.

func (*Context) Ctx

func (c *Context) Ctx() context.Context

Ctx is the context.Context with which the process's Job was scheduled.

func (*Context) Progress

func (c *Context) Progress(payload any)

Progress reports the process's process. The given payload is passed to the Delegate.JobProgressed function of delegates in the receiver and its ancestors.

type Delegate

type Delegate interface {
	// JobScheduled is invoked when Job is scheduled as a new Process, and is waiting to start.
	JobScheduled(process *Process)

	// JobBlocked is invoked when a Job's Process is due to run, but is waiting for another Process(es) to finish.
	JobBlocked(process *Process, blockers []*Process)

	// JobStarting is invoked just before a Job's Process is started.
	JobStarting(process *Process)

	// JobProgressed is invoked whenever Context.Progress is called from a running Process.
	JobProgressed(process *Process, payload any)

	// JobEnded is invoked whenever a Job's Process ends, whether by completing, being superseded, or having its
	// context cancelled. Every invocation of JobScheduled will eventually be followed by JobEnded.
	JobEnded(process *Process, err error)
}

Delegate implementations can be assigned to any Dispatcher (i.e. a Scheduler or a Context), to receive notifications about changes in Process state.

type Dispatcher

type Dispatcher struct {
	// Delegate receives notifications whenever a Process started by the Dispatcher, or one of its descendents, is
	// scheduled, blocked, starting, progressed, or ended.
	//
	// Care should be taken to assign Delegate when no child processes are running, to avoid races. If it is necessary
	// to add and/or remove event handlers while processes are running, consider using events.Handler as your Delegate,
	// which can be safely modified from multiple goroutines.
	Delegate Delegate
	// contains filtered or unexported fields
}

A Dispatcher can be used to schedule new jobs, and track their progress through its Delegate.

Dispatcher is used by the Scheduler and Context types. It is not intended to be used on its own. It can, however, be pulled out and passed to more complex scheduling functions that are intended to work on either Scheduler or Context values.

func (*Dispatcher) Parallel

func (d *Dispatcher) Parallel(ctx context.Context, concurrency int, jobs ...*Job)

Parallel is identical to Run, but allows maximum concurrency to be specified for the batch.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context, jobs ...*Job)

Run schedules the given jobs to run one after the other, and blocks until they have all ended.

A failing job will not prevent subsequent jobs from running. To halt a batch of jobs on error, use Run with a context that can be cancelled by Delegate.JobEnded when it receives an error.

Internally, handling each context requires an extra goroutine to live from when the given batch is scheduled until the last job starts. If you don't need to be able to stop your jobs, leave ctx as nil to save the cost of a goroutine.

func (*Dispatcher) Schedule

func (d *Dispatcher) Schedule(ctx context.Context, delay, repeatAfter time.Duration, job *Job)

Schedule schedules a job to be run in the background, with an optional delay. A positive repeatAfter value will reschedule the job upon its completion with a new delay.

Internally, handling each context requires an extra goroutine to live from when the given job is scheduled until it starts. If you don't need to be able to stop your job, leave ctx as nil to save the cost of a goroutine.

type ErrContextExpired

type ErrContextExpired struct {
	Context context.Context
}

ErrContextExpired is passed to Delegate.JobEnded when a Process's context expires before it starts.

func (ErrContextExpired) Error

func (e ErrContextExpired) Error() string

func (ErrContextExpired) Unwrap

func (e ErrContextExpired) Unwrap() error

type ErrDiscarded

type ErrDiscarded struct{}

ErrDiscarded is passed to Delegate.JobEnded when a Process is discarded by its Job's Readiness function.

func (ErrDiscarded) Error

func (e ErrDiscarded) Error() string

type ErrSchedulerContextExpired

type ErrSchedulerContextExpired struct {
	ErrContextExpired
}

ErrSchedulerContextExpired is passed to Delegate.JobEnded for every unstarted Process of a Scheduler whose context has expired.

type ErrSuperseded

type ErrSuperseded struct {
	Replacement *Process
}

ErrSuperseded is passed to Delegate.JobEnded when a Process is by Replacement's Job.Supersedes function before it can start.

func (ErrSuperseded) Error

func (e ErrSuperseded) Error() string

type Job

type Job struct {
	// Runner is the work to be performed by the Job.
	Runner func(ctx *Context) error

	// Tag can be used by the Job creator to attach whatever data they need to a Job. It can be used in Delegate methods
	// to report activity, and in Supersedes and Readiness functions to identify jobs in other processes. It has no
	// meaning in this package.
	Tag any

	// Supersedes, if not nil, is invoked when a Process is about to start, with a slice of other processes that have
	// not yet started. It should return a (possibly empty) subset of that slice containing processes that are to be
	// superseded by the receiver's process.
	Supersedes func(enqueued []*Process) (superseded []*Process)

	// Readiness, if not nil, is invoked when a Process is about to start, and should return one of Ready(), Discard(),
	// or Blocked(blockers...), where blockers is a subset of the passed slice of running processes. If nil, the Job is
	// assumed to always be ready.
	Readiness func(running []*Process) Readiness
}

A Job is a single unit of work. A Job becomes a Process when it is scheduled to run with a Scheduler or Context.

Jobs should be considered immutable. You can safely schedule a Job more than once, on multiple

type Process

type Process struct {
	Job *Job
	// contains filtered or unexported fields
}

A Process is a Job that has been scheduled.

func (*Process) IsBlocked

func (p *Process) IsBlocked() bool

IsBlocked is true when the Process is waiting for another Process to end.

func (*Process) IsEnded

func (p *Process) IsEnded() bool

IsEnded is true when the Process is finished, has been superseded, or has an expired context.

func (*Process) IsRunning

func (p *Process) IsRunning() bool

IsRunning is true when the Process is running.

func (*Process) IsScheduled

func (p *Process) IsScheduled() bool

IsScheduled is true when the Process is not yet due to run, or has not yet had its Readiness evaluated.

func (*Process) Parent

func (p *Process) Parent() *Process

Parent is the Process whose Context was used to schedule this Process. If the Process was scheduled directly on a Scheduler, Parent will be nil.

type Readiness

type Readiness struct {
	// contains filtered or unexported fields
}

Readiness is the type returned by Job.Readiness that indicates whether a job is ready to run. Its zero value is equivalent to Ready(). Other values can be declared with Blocked(blockers...) and Discard().

func Blocked

func Blocked(blockers ...*Process) Readiness

Blocked indicates that a job is blocked by another job or jobs, and should wait for them to finish.

func Discard

func Discard() Readiness

Discard indicates that a job should not run, and should be discarded. If it is a repeating job, it will not be rescheduled.

func Ready

func Ready() Readiness

Ready indicates that a job is ready to run.

type Scheduler

type Scheduler struct {
	*Dispatcher
	// contains filtered or unexported fields
}

A Scheduler is the top level Dispatcher for a group of processes.

A zero value is not usable. Use NewScheduler instead.

func NewScheduler

func NewScheduler(ctx context.Context) (scheduler *Scheduler)

NewScheduler returns a ready-to-use Scheduler. It starts a scheduling goroutine that will run until the given context expires, after which newly scheduled jobs will panic.

In normal use, the given context should be context.Background. Expiring a Scheduler's context stops all schedule processing, including that of running jobs. No more events will be emitted, including end-job events. Goroutines trying to emit those events will freeze and leak. Only expire a Scheduler's context after a clean wrap-up of all jobs, either in testing, or when forcing a process to shut down.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL