supervisor

package
v1.13.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Example
// We are going to create a server and a client worker. The
// server needs to first bind to a port before it is ready to
// use. By listing the dependency between the client and the
// server worker, the client will not be started until the
// server signals that it is ready.
ctx := context.Background()
s := WithContext(ctx)
var addr string
s.Supervise(&Worker{
	Name: "server",
	Work: func(p *Process) error {
		// :0 will ask for an unused port
		l, err := net.Listen("tcp", ":0")
		if err != nil {
			return err
		}
		defer l.Close()
		// store the unused port that was allocated so
		// that the client knows where to talk to
		addr = l.Addr().String()
		p.Logf("listening on %s", addr)

		http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
			w.Write([]byte("hello"))
			s.Shutdown()
		})

		srv := &http.Server{}
		// launch an anonymous child worker to serve requests
		p.Go(func(p *Process) error {
			return srv.Serve(l)
		})

		fmt.Println("server ready")
		// signal that we are ready
		p.Ready()

		<-p.Shutdown() // await graceful shutdown signal
		return srv.Shutdown(p.Context())
	},
})
s.Supervise(&Worker{
	Name:     "client",
	Requires: []string{"server"},
	Work: func(p *Process) error {
		resp, err := http.Get(fmt.Sprintf("http://%s", addr))
		if err != nil {
			return err
		}
		defer resp.Body.Close()
		bytes, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		fmt.Printf("client %s\n", string(bytes))
		return nil
	},
})
errors := s.Run()
for _, err := range errors {
	log.Println(err.Error())
}
Output:

server ready
client hello

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustRun

func MustRun(name string, f func(*Process) error)

MustRun is like Run, but panics if there are errors.

func Run

func Run(name string, f func(*Process) error) []error

Run creates a single-purpose Supervisor and runs a worker function with it.

func WorkFunc

func WorkFunc(fn interface{}, args ...interface{}) func(*Process) error

WorkFunc creates a work function from a function whose signature includes a process plus additional arguments.

Types

type Cmd

type Cmd struct {
	*exec.Cmd
	// contains filtered or unexported fields
}

A Cmd is like an os/exec.Cmd, but logs what happens on stdin/stdout/stderr, and has a slightly different API.

func Command

func Command(prefix, name string, args ...string) (result *Cmd)

Command creates a single purpose supervisor and uses it to produce and return a *supervisor.Cmd.

func (*Cmd) Capture

func (c *Cmd) Capture(stdin io.Reader) (output string, err error)

Capture runs a command with the supplied input and captures the output as a string.

func (*Cmd) CaptureErr

func (c *Cmd) CaptureErr(stdin io.Reader) (output string, err error)

CaptureErr runs a command with the supplied input and captures stdout and stderr as a string.

func (*Cmd) MustCapture

func (c *Cmd) MustCapture(stdin io.Reader) (output string)

MustCapture is like Capture, but panics if there is an error.

func (*Cmd) MustCaptureErr

func (c *Cmd) MustCaptureErr(stdin io.Reader) (output string)

MustCaptureErr is like CaptureErr, but panics if there is an error.

func (*Cmd) Run

func (c *Cmd) Run() error

Run is like os/exec.Cmd.Run.

func (*Cmd) Start

func (c *Cmd) Start() error

Start is like os/exec.Cmd.Start.

func (*Cmd) Wait

func (c *Cmd) Wait() error

Wait is like os/exec.Cmd.Wait.

type Logger

type Logger func(ctx context.Context, format string, v ...interface{})

Logger is what Supervisor may use as a logging backend.

type Process

type Process struct {
	// contains filtered or unexported fields
}

A Process represents a goroutine being run from a Worker.

func (*Process) Command

func (p *Process) Command(name string, args ...string) *Cmd

Command creates a command that automatically logs inputs, outputs, and exit codes to the process logger.

func (*Process) Context

func (p *Process) Context() context.Context

Context returns the Process' context.

func (*Process) Debug added in v1.5.1

func (p *Process) Debug(obj interface{})

We would _like_ to have Debug and Debugf, but we can't really support that with dlog right now. So for now, these are no-ops.

func (*Process) Debugf added in v1.5.1

func (p *Process) Debugf(format string, args ...interface{})

func (*Process) Do

func (p *Process) Do(fn func() error) (err error)

Do is shorthand for proper shutdown handling while doing a potentially blocking activity. This method will return nil if the activity completes normally and an error if the activity panics or returns an error.

If you want to know whether the work was aborted or might still be running when Do returns, then use DoClean like so:

aborted := errors.New("aborted")

err := p.DoClean(..., func() { return aborted })

if err == aborted {
  ...
}

func (*Process) DoClean

func (p *Process) DoClean(fn, clean func() error) error

DoClean is the same as Process.Do() but executes the supplied clean function on abort.

func (*Process) Go

func (p *Process) Go(fn func(*Process) error) *Worker

Go is shorthand for launching a child worker... it is named "<parent>[<child-count>]".

func (*Process) GoName

func (p *Process) GoName(name string, fn func(*Process) error) *Worker

GoName is shorthand for launching a named worker... it is named "<parent>.<name>".

func (*Process) Log

func (p *Process) Log(obj interface{})

Log is used for logging...

func (*Process) Logf

func (p *Process) Logf(format string, args ...interface{})

Logf is used for logging...

func (*Process) Ready

func (p *Process) Ready()

Ready is called by the Process' Worker to notify the supervisor that it is now ready.

func (*Process) Shutdown

func (p *Process) Shutdown() <-chan struct{}

Shutdown is used for graceful shutdown...

func (*Process) Supervisor

func (p *Process) Supervisor() *Supervisor

Supervisor returns the Supervisor that is managing this Process.

func (*Process) Worker

func (p *Process) Worker() *Worker

Worker returns the Worker that this Process is running.

type Supervisor

type Supervisor struct {
	Logger Logger
	// contains filtered or unexported fields
}

A supervisor provides an abstraction for managing a group of related goroutines, and provides:

- startup and shutdown ordering based on dependencies - both graceful and hard shutdown - error propagation - retry - logging

Example
ctx := context.Background()
s := WithContext(ctx)
for idx, url := range []string{"https://www.google.com", "https://www.bing.com"} {
	url_capture := url
	s.Supervise(&Worker{
		Name: fmt.Sprintf("url-%d", idx),
		Work: func(p *Process) error {
			resp, err := http.Get(url)
			if err != nil {
				return err
			}
			defer resp.Body.Close()
			fmt.Printf("url %s: %s\n", url_capture, resp.Status)
			return nil
		},
	})
}

// The Run() method will block until all workers are done. Any
// and all background errors in workers, including panics are
// returned by Run().
errors := s.Run()
for _, err := range errors {
	fmt.Println(err)
}
Output:

url https://www.bing.com: 200 OK
url https://www.google.com: 200 OK

func WithContext

func WithContext(ctx context.Context) *Supervisor

func (*Supervisor) Get

func (s *Supervisor) Get(name string) *Worker

Gets the worker with the specified name. Will return nil if no such worker exists.

func (*Supervisor) Run

func (s *Supervisor) Run() []error

A supervisor will run until all its workers exit. There are multiple ways workers can exit:

  • normally (returning a non-nil error)
  • error (either via return result or panic)
  • graceful shutdown request
  • canceled context

A normal exit does not trigger any special action other than causing Run to return if it is the last worker.

If a worker exits with an error, the behavior depends on the value of the Retry flag on the worker. If Retry is true, the worker will be restarted. If not the supervisor shutdown sequence is triggerd.

The supervisor shutdown sequence can be deliberately triggered by invoking supervisor.Shutdown(). This can be done from any goroutine including workers.

The graceful shutdown sequence shuts down workers in an order that respects worker dependencies.

func (*Supervisor) Shutdown

func (s *Supervisor) Shutdown()

Triggers a graceful shutdown sequence. This can be invoked from any goroutine.

func (*Supervisor) Supervise

func (s *Supervisor) Supervise(worker *Worker)

Supervise adds a Worker to be run as a Process when s.Run() is called.

type Worker

type Worker struct {
	Name     string               // the name of the worker
	Work     func(*Process) error // the function to perform the work
	Requires []string             // a list of required worker names
	Retry    bool                 // whether or not to retry on error
	// contains filtered or unexported fields
}

A Worker represents a managed goroutine being prepared or run.

I (LukeShu) don't think a Worker can be reused after being run by a Supervisor.

func (*Worker) Error

func (w *Worker) Error() string

func (*Worker) Restart

func (w *Worker) Restart()

Restart is used to cause a finished Worker to restart. It can only be called on Workers that are done. The only way to be sure a worker is done is to call Wait() on it, e.g.:

...
worker.Shutdown()
worker.Wait()
worker.Restart()
...

func (*Worker) Shutdown

func (w *Worker) Shutdown()

Shutdown shuts down the worker. Note that if the worker has other workers that depend on it, the shutdown won't actually be initiated until those dependent workers exit.

func (*Worker) Wait

func (w *Worker) Wait()

Wait blocks until the worker is done.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL