Documentation ¶
Overview ¶
Package derrgroup provides synchronization, error propagation, and cancellation callback for groups of goroutines working on subtasks of a common task.
derrgroup is a fork of golang.org/x/sync/errgroup commit 6e8e738ad208923de99951fe0b48239bfd864f28 (2020-06-04). It is forked to provide only things that cannot reasonably be implemented on top of itself; it is impossible to add goroutine enumeration on top of errgroup without duplicating and doubling up on all of errgroup's synchronization/locking. Anything that can reasonably be implemented *on top of* derrgroup is not included in derrgroup:
- Managing `context.Contexts`s (this is something that errgroup kind of does, but derrgroup ripped out, because it can trivially be implemented on top of derrgroup)
- Signal handling
- Logging
- Hard/soft cancellation
- Having `Wait()` timeout on a shutdown that takes too long
Those are all good and useful things to have. But they should be implemented in a layer *on top of* derrgroup. "derrgroup.Group" was originally called "llGroup" for "low-level group"; it is intentionally low-level in order the be a clean primitive for other things to build on top of.
Right now, there are at least 3 Go implementations of "group" functionality in use at Datawire (amb-sidecar/group, entrypoint/group, and pkg/supervisor), which each offer some subset of the above. derrgroup offers to them a common robust base. If you're writing new application code, you should use one of those, and not use derrgroup directly. If you're writing a new "group" abstraction, you should use derrgroup instead of implementing your own locking/synchronization.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GoroutineState ¶
type GoroutineState int
const ( GoroutineRunning GoroutineState = iota GoroutineExited GoroutineErrored )
func (GoroutineState) String ¶
func (s GoroutineState) String() string
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
A Group is a collection of goroutines working on subtasks that are part of the same overall task.
A zero Group is valid and does not cancel on error.
Example (JustErrors) ¶
JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.
package main import ( "fmt" "net/http" errgroup "github.com/datawire/ambassador/pkg/derrgroup" ) func main() { g := new(errgroup.Group) var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Launch a goroutine to fetch the URL. url := url // https://golang.org/doc/faq#closures_and_goroutines g.Go(url, func() error { // Fetch the URL. resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } // Wait for all HTTP fetches to complete. if err := g.Wait(); err == nil { fmt.Println("Successfully fetched all URLs.") } }
Output:
Example (Parallel) ¶
Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.
package main import ( "context" "fmt" "os" errgroup "github.com/datawire/ambassador/pkg/derrgroup" ) var ( Web = fakeSearch("web") Image = fakeSearch("image") Video = fakeSearch("video") ) type Result string type Search func(ctx context.Context, query string) (Result, error) func fakeSearch(kind string) Search { return func(_ context.Context, query string) (Result, error) { return Result(fmt.Sprintf("%s result for %q", kind, query)), nil } } func main() { Google := func(ctx context.Context, query string) ([]Result, error) { ctx, cancel := context.WithCancel(ctx) g := errgroup.NewGroup(cancel) searches := []Search{Web, Image, Video} results := make([]Result, len(searches)) for i, search := range searches { i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines g.Go(fmt.Sprintf("search-%d", i), func() error { result, err := search(ctx, query) if err == nil { results[i] = result } return err }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } results, err := Google(context.Background(), "golang") if err != nil { fmt.Fprintln(os.Stderr, err) return } for _, result := range results { fmt.Println(result) } }
Output: web result for "golang" image result for "golang" video result for "golang"
Example (Pipeline) ¶
Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.
package main import ( "context" "crypto/md5" "fmt" "io/ioutil" "log" "os" "path/filepath" errgroup "github.com/datawire/ambassador/pkg/derrgroup" ) // Pipeline demonstrates the use of a Group to implement a multi-stage // pipeline: a version of the MD5All function with bounded parallelism from // https://blog.golang.org/pipelines. func main() { m, err := MD5All(context.Background(), ".") if err != nil { log.Fatal(err) } for k, sum := range m { fmt.Printf("%s:\t%x\n", k, sum) } } type result struct { path string sum [md5.Size]byte } // MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // ctx is canceled when g.Wait() returns. When this version of MD5All returns // - even in case of error! - we know that all of the goroutines have finished // and the memory they were using can be garbage-collected. ctx, cancel := context.WithCancel(ctx) g := errgroup.NewGroup(cancel) paths := make(chan string) g.Go("walk", func() error { defer close(paths) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-ctx.Done(): return ctx.Err() } return nil }) }) // Start a fixed number of goroutines to read and digest files. c := make(chan result) const numDigesters = 20 for i := 0; i < numDigesters; i++ { g.Go(fmt.Sprintf("digestor-%d", i), func() error { for path := range paths { data, err := ioutil.ReadFile(path) if err != nil { return err } select { case c <- result{path, md5.Sum(data)}: case <-ctx.Done(): return ctx.Err() } } return nil }) } go func() { g.Wait() close(c) }() m := make(map[string][md5.Size]byte) for r := range c { m[r.path] = r.sum } // Check whether any of the goroutines failed. Since g is accumulating the // errors, we don't need to send them (or check for them) in the individual // results sent on the channel. if err := g.Wait(); err != nil { return nil, err } return m, nil }
Output:
func NewGroup ¶
func NewGroup(cancel func()) *Group
NewGroup returns a new Group.
The provided 'cancel' function is called the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.
func (*Group) Go ¶
Go calls the given function in a new goroutine.
The first call to return a non-nil error cancels the group; its error will be returned by Wait.
func (*Group) List ¶
func (g *Group) List() map[string]GoroutineState
List returns a listing of all goroutines launched with Go.