Documentation ¶
Overview ¶
Example ¶
// We are going to create a server and a client worker. The // server needs to first bind to a port before it is ready to // use. By listing the dependency between the client and the // server worker, the client will not be started until the // server signals that it is ready. ctx := context.Background() s := WithContext(ctx) var addr string s.Supervise(&Worker{ Name: "server", Work: func(p *Process) error { // :0 will ask for an unused port l, err := net.Listen("tcp", ":0") if err != nil { return err } defer l.Close() // store the unused port that was allocated so // that the client knows where to talk to addr = l.Addr().String() p.Logf("listening on %s", addr) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("hello")) s.Shutdown() }) srv := &http.Server{} // launch an anonymous child worker to serve requests p.Go(func(p *Process) error { return srv.Serve(l) }) fmt.Println("server ready") // signal that we are ready p.Ready() <-p.Shutdown() // await graceful shutdown signal return srv.Shutdown(p.Context()) }, }) s.Supervise(&Worker{ Name: "client", Requires: []string{"server"}, Work: func(p *Process) error { resp, err := http.Get(fmt.Sprintf("http://%s", addr)) if err != nil { return err } defer resp.Body.Close() bytes, err := ioutil.ReadAll(resp.Body) if err != nil { return err } fmt.Printf("client %s\n", string(bytes)) return nil }, }) errors := s.Run() for _, err := range errors { log.Println(err.Error()) }
Output: server ready client hello
Index ¶
- func MustRun(name string, f func(*Process) error)
- func Run(name string, f func(*Process) error) []error
- func WorkFunc(fn interface{}, args ...interface{}) func(*Process) error
- type Cmd
- func (c *Cmd) Capture(stdin io.Reader) (output string, err error)
- func (c *Cmd) CaptureErr(stdin io.Reader) (output string, err error)
- func (c *Cmd) MustCapture(stdin io.Reader) (output string)
- func (c *Cmd) MustCaptureErr(stdin io.Reader) (output string)
- func (c *Cmd) Run() error
- func (c *Cmd) Start() error
- func (c *Cmd) Wait() error
- type DefaultLogger
- type Logger
- type Process
- func (p *Process) Command(name string, args ...string) *Cmd
- func (p *Process) Context() context.Context
- func (p *Process) Debug(obj interface{})
- func (p *Process) Debugf(format string, args ...interface{})
- func (p *Process) Do(fn func() error) (err error)
- func (p *Process) DoClean(fn, clean func() error) (err error)
- func (p *Process) Go(fn func(*Process) error) *Worker
- func (p *Process) GoName(name string, fn func(*Process) error) *Worker
- func (p *Process) Log(obj interface{})
- func (p *Process) Logf(format string, args ...interface{})
- func (p *Process) Ready()
- func (p *Process) Shutdown() <-chan struct{}
- func (p *Process) Supervisor() *Supervisor
- func (p *Process) Worker() *Worker
- type Supervisor
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Cmd ¶
A Cmd is like an os/exec.Cmd, but logs what happens on stdin/stdout/stderr, and has a slightly different API.
func Command ¶
Command creates a single purpose supervisor and uses it to produce and return a *supervisor.Cmd.
func (*Cmd) Capture ¶
Capture runs a command with the supplied input and captures the output as a string.
func (*Cmd) CaptureErr ¶
CaptureErr runs a command with the supplied input and captures stdout and stderr as a string.
func (*Cmd) MustCapture ¶
MustCapture is like Capture, but panics if there is an error.
func (*Cmd) MustCaptureErr ¶
MustCaptureErr is like CaptureErr, but panics if there is an error.
type DefaultLogger ¶
type DefaultLogger struct{}
DefaultLogger is a logger that uses the stdlib "log" package default logger.
func (*DefaultLogger) Printf ¶
func (d *DefaultLogger) Printf(format string, v ...interface{})
Printf implements the Logger interface.
type Logger ¶
type Logger interface {
Printf(format string, v ...interface{})
}
Logger is what Supervisor may use as a logging backend.
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
A Process represents a goroutine being run from a Worker.
func (*Process) Command ¶
Command creates a command that automatically logs inputs, outputs, and exit codes to the process logger.
func (*Process) Debug ¶ added in v1.5.1
func (p *Process) Debug(obj interface{})
We would _like_ to have Debug and Debugf, but we can't really support that with dlog right now. So for now, these are no-ops.
func (*Process) Do ¶
Do is shorthand for proper shutdown handling while doing a potentially blocking activity. This method will return nil if the activity completes normally and an error if the activity panics or returns an error.
If you want to know whether the work was aborted or might still be running when Do returns, then use DoClean like so:
aborted := errors.New("aborted") err := p.DoClean(..., func() { return aborted }) if err == aborted { ... }
func (*Process) DoClean ¶
DoClean is the same as Process.Do() but executes the supplied clean function on abort.
func (*Process) Go ¶
Go is shorthand for launching a child worker... it is named "<parent>[<child-count>]".
func (*Process) GoName ¶
GoName is shorthand for launching a named worker... it is named "<parent>.<name>".
func (*Process) Ready ¶
func (p *Process) Ready()
Ready is called by the Process' Worker to notify the supervisor that it is now ready.
func (*Process) Shutdown ¶
func (p *Process) Shutdown() <-chan struct{}
Shutdown is used for graceful shutdown...
func (*Process) Supervisor ¶
func (p *Process) Supervisor() *Supervisor
Supervisor returns the Supervisor that is managing this Process.
type Supervisor ¶
type Supervisor struct { Logger Logger // contains filtered or unexported fields }
A supervisor provides an abstraction for managing a group of related goroutines, and provides:
- startup and shutdown ordering based on dependencies - both graceful and hard shutdown - error propagation - retry - logging
Example ¶
ctx := context.Background() s := WithContext(ctx) for idx, url := range []string{"https://www.google.com", "https://www.bing.com"} { url_capture := url s.Supervise(&Worker{ Name: fmt.Sprintf("url-%d", idx), Work: func(p *Process) error { resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() fmt.Printf("url %s: %s\n", url_capture, resp.Status) return nil }, }) } // The Run() method will block until all workers are done. Any // and all background errors in workers, including panics are // returned by Run(). errors := s.Run() for _, err := range errors { fmt.Println(err) }
Output: url https://www.bing.com: 200 OK url https://www.google.com: 200 OK
func WithContext ¶
func WithContext(ctx context.Context) *Supervisor
func (*Supervisor) Get ¶
func (s *Supervisor) Get(name string) *Worker
Gets the worker with the specified name. Will return nil if no such worker exists.
func (*Supervisor) Run ¶
func (s *Supervisor) Run() []error
A supervisor will run until all its workers exit. There are multiple ways workers can exit:
- normally (returning a non-nil error)
- error (either via return result or panic)
- graceful shutdown request
- canceled context
A normal exit does not trigger any special action other than causing Run to return if it is the last worker.
If a worker exits with an error, the behavior depends on the value of the Retry flag on the worker. If Retry is true, the worker will be restarted. If not the supervisor shutdown sequence is triggerd.
The supervisor shutdown sequence can be deliberately triggered by invoking supervisor.Shutdown(). This can be done from any goroutine including workers.
The graceful shutdown sequence shuts down workers in an order that respects worker dependencies.
func (*Supervisor) Shutdown ¶
func (s *Supervisor) Shutdown()
Triggers a graceful shutdown sequence. This can be invoked from any goroutine.
func (*Supervisor) Supervise ¶
func (s *Supervisor) Supervise(worker *Worker)
Supervise adds a Worker to be run as a Process when s.Run() is called.
type Worker ¶
type Worker struct { Name string // the name of the worker Work func(*Process) error // the function to perform the work Requires []string // a list of required worker names Retry bool // whether or not to retry on error // contains filtered or unexported fields }
A Worker represents a managed goroutine being prepared or run.
I (LukeShu) don't think a Worker can be reused after being run by a Supervisor.
func (*Worker) Restart ¶
func (w *Worker) Restart()
Restart is used to cause a finished Worker to restart. It can only be called on Workers that are done. The only way to be sure a worker is done is to call Wait() on it, e.g.:
... worker.Shutdown() worker.Wait() worker.Restart() ...