threadgroup

package module
v0.0.0-...-e9331fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2018 License: MIT Imports: 2 Imported by: 6

README

threadgroup

Threadgroup is a utility to facilitate clean and quick shutdown of related, long-running threads or resources. Threads or resources can call Add to signal that shutdown should be blocked until they have finished, and then can call Done when they have finished. Calling Stop will block until all resources have called Done, and will return an error if future resources attempt to call Add.

Threadgroup has two helper functions, OnStop and AfterStop, which can help to clean up resources which are intended to run for the life of the group. Functions added to the threadgroup with OnStop will be called immediately after Stop is called, before waiting for all existing threads to return. OnStop is frequently called with resources like a net.Listener, where you want to halt new connections immediately. AfterStop will be called after waiting for all resources to return. AfterStop is frequently used for resources like loggers, which need to be closed but not until they are not needed anymore.

Finally, IsStopped returns a channel that gets closed when Stop is called, which can be passed as a cancel channel to things like net.Dial to facilitate shutting down quickly when Stop is called.

Example:

var tg threadgroup.ThreadGroup

// Create the logger and set it to shutdown upon closing.
log := NewLogger()
tg.AfterStop(func() error {
	return log.Close()
})

// Create a thread to repeatedly dial a remote address with quick shutdown.
go func() {
	// Block shutdown until this thread has completed.
	err := tg.Add()
	if err != nil {
		return
	}
	defer tg.Done()

	// Repeatedly perform a dial. Latency means the dial could take up to a
	// minute, which would delay shutdown without a cancel chan.
	for {
		// Perform the dial, but abort quickly if 'Stop' is called.
		dialer := &net.Dialer{
			Cancel:  tg.StopChan(),
			Timeout: time.Minute,
		}
		conn, err := dialer.Dial("tcp", 8.8.8.8)
		if err == nil {
			conn.Close()
		}

		// Sleep for an hour, but abort quickly if 'Stop' is called.
		select {
		case <-time.After(time.Hour):
			continue
		case <-tg.StopChan():
			return
		}
	}

	// Close will not be called on the logger until after this Println has been
	// called, because AfterStop functions do not run until after all threads
	// have called tg.Done().
	log.Println("closed cleanly")
}()

// Create a long running thread to listen on the network.
go func() {
	// Block shutdown until this thread has completed.
	err := tg.Add()
	if err != nil {
		return
	}
	defer tg.Done()

	// Create the listener.
	listener, err := net.Listen("tcp", ":12345")
	if err != nil {
		return
	}
	// Close the listener as soon as 'Stop' is called, no need to wait for the
	// other resources to shut down.
	tg.OnStop(func() error {
		return listener.Close()
	})

	for {
		conn, err := listener.Accept()
		if err != nil {
			// Accept will return an error as soon as the listener is closed.
			return
		}
		conn.Close()
	}

}()

// Calling Stop will result in a quick, organized shutdown that closes all
// long-running resources.
err := tg.Stop()
if err != nil {
	fmt.Println(err)
}

Documentation

Overview

Package threadgroup provides a utility for performing organized clean shutdown and quick shutdown of long running groups of threads, such as networking threads, background threads, or resources like databases.

The OnStop and AfterStop functions are helpers which enable shutdown code to be inlined with resource allocation, similar to defer. The difference is that `OnStop` and `AfterStop` will be called following tg.Stop, instead of when the parent function goes out of scope.

Index

Constants

This section is empty.

Variables

View Source
var ErrStopped = errors.New("ThreadGroup already stopped")

ErrStopped is returned by ThreadGroup methods if Stop has already been called.

Functions

This section is empty.

Types

type ThreadGroup

type ThreadGroup struct {
	// contains filtered or unexported fields
}

A ThreadGroup is a one-time-use object to manage the life cycle of a group of threads. It is a sync.WaitGroup that provides functions for coordinating actions and shutting down threads. After Stop() is called, the thread group is no longer useful.

It is safe to call Add(), Done(), and Stop() concurrently.

func (*ThreadGroup) Add

func (tg *ThreadGroup) Add() error

Add increments the thread group counter.

func (*ThreadGroup) AfterStop

func (tg *ThreadGroup) AfterStop(fn func() error) error

AfterStop ensures that a function will be called after Stop() has been called and after all running routines have called Done(). The functions will be called in reverse order to how they were added, similar to defer. If Stop() has already been called, the input function will be called immediately, and a composition of ErrStopped and the error from calling fn will be returned.

The primary use of AfterStop is to allow code that opens and closes resources to be positioned next to each other. The purpose is similar to `defer`, except for resources that outlive the function which creates them.

func (*ThreadGroup) Done

func (tg *ThreadGroup) Done()

Done decrements the thread group counter.

func (*ThreadGroup) OnStop

func (tg *ThreadGroup) OnStop(fn func() error) error

OnStop ensures that a function will be called after Stop() has been called, and before blocking until all running routines have called Done(). It is safe to use OnStop to coordinate the closing of long-running threads. The OnStop functions will be called in the reverse order in which they were added, similar to defer. If Stop() has already been called, the input function will be called immediately, and a composition of ErrStopped and the error from calling fn will be returned.

func (*ThreadGroup) Stop

func (tg *ThreadGroup) Stop() error

Stop will close the stop channel of the thread group, then call all 'OnStop' functions in reverse order, then will wait until the thread group counter reaches zero, then will call all of the 'AfterStop' functions in reverse order.

The errors returned by the OnStop and AfterStop functions will be composed into a single error.

func (*ThreadGroup) StopChan

func (tg *ThreadGroup) StopChan() <-chan struct{}

StopChan provides read-only access to the ThreadGroup's stopChan. Callers should select on StopChan in order to interrupt long-running reads (such as time.After).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL