Documentation ¶
Overview ¶
Package wait provides a safer alternative to sync.WaitGroup. It is an alternative to the errgroup package, but does not implement streaming as that package can. We provide a better alternative to that in our stagepipe framework.
This package can leverage our groutines.Pool types for more control over concurrency and implements OTEL spans to record information around what is happening in your goroutines.
Here is a basic example:
g := wait.Group{Name: "Print me"} for i := 0; i < 100; i++ { i := i g.Go(func(ctx context.Context) error{ fmt.Println(i) } } if err := g.Wait(ctx); err != nil { // Handle error }
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FuncCall ¶
FuncCall is a function call that can be used in various functions or methods in this package.
type Group ¶
type Group struct { // Pool is an optional goroutines.Pool for concurrency control and reuse. Pool goroutines.Pool // CancelOnErr holds a CancelFunc that will be called if any goroutine // returns an error. This will automatically be called when Wait() is // finished and then reset to nil to allow reuse. CancelOnErr context.CancelFunc // Name provides an optional name for a WaitGroup for the purpose of // OTEL logging information. Name string // PoolOptions are the options to use when submitting jobs to the Pool. // If you have set PoolOptions but have not supplied a pool or the pool // doesn't support the option, the result is undefined. PoolOptions []goroutines.SubmitOption // contains filtered or unexported fields }
Group provides a Group implementation that allows launching goroutines in safer way by handling the .Add() and .Done() methods in a standard sync.WaitGroup. This prevents problems where you forget to increment or decrement the sync.WaitGroup. In addition you can use a goroutines.Pool object to allow concurrency control and goroutine reuse (if you don't, it just uses a goroutine per call). It provides a Running() method that keeps track of how many goroutines are running. This can be used with the goroutines.Pool stats to understand what goroutines are in use. It has a CancelOnErr() method to allow mimicing of the golang.org/x/sync/errgroup package. Finally we provide OTEL support in the Group that can be named via the Group.Name string. This will provide span messages on the current span when Wait() is called and record any errors in the span.
Example (Cancel_on_err) ¶
CancelOnErr illustrates how to use WaitGroup to do parallel tasks and cancel all remaining tasks if a single task has an error.
ctx, cancel := context.WithCancel(context.Background()) p, _ := pooled.New("poolName", 10) wg := Group{Pool: p, CancelOnErr: cancel} for i := 0; i < 10000; i++ { i := i wg.Go( ctx, func(ctx context.Context) error { if i == 100 { return errors.New("error") } return nil }, ) } if err := wg.Wait(ctx); err != nil { fmt.Println(err) }
Output: error
Example (Just_errors) ¶
JustErrors illustrates the use of WaitGroup in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from This example is derived from errgroup.Group from golang.org/x/sync/errgroup.
ctx := context.Background() wg := Group{} var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Launch a goroutine to fetch the URL. url := url // https://golang.org/doc/faq#closures_and_goroutines wg.Go(ctx, func(ctx context.Context) error { // Fetch the URL. resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } // Wait for all HTTP fetches to complete. if err := wg.Wait(ctx); err != nil { fmt.Println("Successfully fetched all URLs.") }
Output:
Example (Parallel) ¶
Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling. // This example is derived from errgroup.Group from golang.org/x/sync/errgroup.
Google := func(ctx context.Context, query string) ([]Result, error) { wg := Group{} searches := []Search{Web, Image, Video} results := make([]Result, len(searches)) for i, search := range searches { i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines wg.Go(ctx, func(context.Context) error { result, err := search(ctx, query) if err == nil { results[i] = result } return err }) } if err := wg.Wait(ctx); err != nil { return nil, err } return results, nil } results, err := Google(context.Background(), "golang") if err != nil { fmt.Fprintln(os.Stderr, err) return } for _, result := range results { fmt.Println(result) }
Output: web result for "golang" image result for "golang" video result for "golang"
func (*Group) Go ¶
Go spins off a goroutine that executes f(ctx). This will use the underlying goroutines.Pool if provided. If you have set PoolOptions but have not supplied a pool or the pool doesn't support the option, the result is undefined.