derrgroup

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2020 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package derrgroup provides synchronization, error propagation, and cancellation callback for groups of goroutines working on subtasks of a common task.

derrgroup is a fork of golang.org/x/sync/errgroup commit 6e8e738ad208923de99951fe0b48239bfd864f28 (2020-06-04). It is forked to provide only things that cannot reasonably be implemented on top of itself; it is impossible to add goroutine enumeration on top of errgroup without duplicating and doubling up on all of errgroup's synchronization/locking. Anything that can reasonably be implemented *on top of* derrgroup is not included in derrgroup:

  • Managing `context.Contexts`s (this is something that errgroup kind of does, but derrgroup ripped out, because it can trivially be implemented on top of derrgroup)
  • Signal handling
  • Logging
  • Hard/soft cancellation
  • Having `Wait()` timeout on a shutdown that takes too long

Those are all good and useful things to have. But they should be implemented in a layer *on top of* derrgroup. "derrgroup.Group" was originally called "llGroup" for "low-level group"; it is intentionally low-level in order the be a clean primitive for other things to build on top of.

Right now, there are at least 3 Go implementations of "group" functionality in use at Datawire (pkg/dgroup, entrypoint/group, and pkg/supervisor), which each offer some subset of the above. derrgroup offers to them a common robust base. If you're writing new application code, you should use one of those, and not use derrgroup directly. If you're writing a new "group" abstraction, you should use derrgroup instead of implementing your own locking/synchronization.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GoroutineState

type GoroutineState int
const (
	GoroutineRunning GoroutineState = iota
	GoroutineExited
	GoroutineErrored
)

func (GoroutineState) String

func (s GoroutineState) String() string

type Group

type Group struct {
	// contains filtered or unexported fields
}

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

A zero Group is valid and does not cancel on error.

Example (JustErrors)

JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.

package main

import (
	"fmt"
	"net/http"

	errgroup "github.com/datawire/ambassador/pkg/derrgroup"
)

func main() {
	g := new(errgroup.Group)
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		// Launch a goroutine to fetch the URL.
		url := url // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(url, func() error {
			// Fetch the URL.
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	// Wait for all HTTP fetches to complete.
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}
Output:

Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

package main

import (
	"context"
	"fmt"
	"os"

	errgroup "github.com/datawire/ambassador/pkg/derrgroup"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
	Google := func(ctx context.Context, query string) ([]Result, error) {
		ctx, cancel := context.WithCancel(ctx)
		g := errgroup.NewGroup(cancel)

		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
			g.Go(fmt.Sprintf("search-%d", i), func() error {
				result, err := search(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

web result for "golang"
image result for "golang"
video result for "golang"
Example (Pipeline)

Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	errgroup "github.com/datawire/ambassador/pkg/derrgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	ctx, cancel := context.WithCancel(ctx)
	g := errgroup.NewGroup(cancel)
	paths := make(chan string)

	g.Go("walk", func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(fmt.Sprintf("digestor-%d", i), func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}
Output:

func NewGroup

func NewGroup(cancel func()) *Group

NewGroup returns a new Group.

The provided 'cancel' function is called the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

func (*Group) Go

func (g *Group) Go(name string, f func() error)

Go calls the given function in a new goroutine.

The first call to return a non-nil error cancels the group; its error will be returned by Wait.

func (*Group) List

func (g *Group) List() map[string]GoroutineState

List returns a listing of all goroutines launched with Go.

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL