taskgroup

package module
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2024 License: BSD-3-Clause Imports: 4 Imported by: 49

README

taskgroup

GoDoc CI

A *taskgroup.Group represents a group of goroutines working on related tasks. New tasks can be added to the group at will, and the caller can wait until all tasks are complete. Errors are automatically collected and delivered synchronously to a user-provided callback. This does not replace the full generality of Go's built-in features, but it simplifies some of the plumbing for common concurrent tasks.

Here is a working example in the Go Playground.

Contents

Rationale

Go provides powerful concurrency primitives, including goroutines, channels, select, and the standard library's sync package. In some common situations, however, managing goroutine lifetimes can become unwieldy using only what is built in.

For example, consider the case of copying a large directory tree: Walk through a source directory recursively, creating a parallel target directory structure and starting a goroutine to copy each of the files concurrently. In outline:

func copyTree(source, target string) error {
    err := filepath.Walk(source, func(path string, fi os.FileInfo, err error) error {
        adjusted := adjustPath(path)
        if fi.IsDir() {
            return os.MkdirAll(adjusted, 0755)
        }
        go copyFile(adjusted, target)
        return nil
    })
    if err != nil {
        // ... clean up the output directory ...
    }
    return err
}

This solution is deficient, however, as it does not provide any way to detect when all the file copies are finished. To do that we will typically use a sync.WaitGroup:

var wg sync.WaitGroup
...
wg.Add(1)
go func() {
    defer wg.Done()
    copyFile(adjusted, target)
}()
...
wg.Wait() // block until all the tasks signal done

In addition, we need to handle errors. Copies might fail (the disk may fill, or there might be a permissions error). For some applications it might suffice to log the error and continue, but usually in case of error we should back out and clean up the partial state.

To do that, we need to capture the return value from the function inside the goroutine―and that will require us either to add a lock or plumb in another channel:

errs := make(chan error)
...
go copyFile(adjusted, target, errs)

Since multiple operations can be running in parallel, we will also need another goroutine to drain the errors channel and accumulate the results somewhere:

var failures []error
go func() {
    for e := range errs {
        failures = append(failures, e)
    }
}()
...
wg.Wait()
close(errs)

Once the work is finished, we must also detect when the error collector is done, so we can examine the failures without a data race. We'll need another channel or wait group to signal for this:

var failures []error
edone := make(chan struct{})
go func() {
    defer close(edone)
    for e := range errs {
        failures = append(failures, e)
    }
}()
...
wg.Wait()   // all the workers are done
close(errs) // signal the error collector to stop
<-edone     // wait for the error collector to be done

Another issue is, if one of the file copies fails, we don't necessarily want to wait around for all the copies to finish before reporting the error―we want to stop everything and clean up the whole operation. Typically we would do this using a context.Context:

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	...
	copyFile(ctx, adjusted, target, errs)

Now copyFile will have to check for ctx to be finished:

func copyFile(ctx context.Context, source, target string, errs chan<- error) {
	if ctx.Err() != nil {
		return
	}
 	// ... do the copy as normal, or propagate an error
}

Finally, we want the ability to to limit the number of concurrent copies. Even if the host has plenty of memory and CPU, unbounded concurrency is likely to run us out of file descriptors. To handle this we might use a semaphore or a throttling channel:

throttle := make(chan struct{}, 64) // allow up to 64 concurrent copies
go func() {
    throttle <- struct{}{} // block until the throttle has a free slot
    defer func() { wg.Done(); <-throttle }()
    copyFile(ctx, adjusted, target, errs)
}()

So far, we're up to four channels (errs, edone, context, and throttle) plus a wait group. The point to note is that while these tools are quite able to express what we want, it can be tedious to wire them all together and keep track of the current state of the system.

The taskgroup package exists to handle the plumbing for the common case of a group of tasks that are all working on a related outcome (e.g., copying a directory structure), and where an error on the part of any single task may be grounds for terminating the work as a whole.

The package provides a taskgroup.Group type that has built-in support for some of these concerns:

  • Limiting the number of active goroutines.
  • Collecting and filtering errors.
  • Waiting for completion and delivering status.

A taskgroup.Group collects error values from each task and can deliver them to a user-provided callback. The callback can filter them or take other actions (such as cancellation). Invocations of the callback are all done from a single goroutine so it is safe to manipulate local resources without a lock.

A group does not directly support cancellation, but integrates cleanly with the standard context package. A context.CancelFunc can be used as a trigger to signal the whole group when an error occurs.

Overview

A task is expressed as a func() error, and is added to a group using the Go method:

var g taskgroup.Group
g.Go(myTask)

Any number of tasks may be added, and it is safe to do so from multiple goroutines concurrently. To wait for the tasks to finish, use:

err := g.Wait()

Wait blocks until all the tasks in the group have returned, and then reports the first non-nil error returned by any of the worker tasks.

An implementation of this example can be found in examples/copytree/copytree.go.

Filtering Errors

The taskgroup.New function takes an optional callback to be invoked for each non-nil error reported by a task in the group. The callback may choose to propagate, replace, or discard the error. For example, suppose we want to ignore "file not found" errors from a copy operation:

g := taskgroup.New(func(err error) error {
    if os.IsNotExist(err) {
        return nil // ignore files that do not exist
    }
    return err
})

This mechanism can also be used to trigger a context cancellation if a task fails, for example:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

g := taskgroup.New(cancel)

Now, if a task in g reports an error, it will cancel the context, allowing any other running tasks to observe a context cancellation and bail out.

Controlling Concurrency

The Limit method supports limiting the number of concurrently active goroutines in the group. It returns a StartFunc that adds goroutines to the group, but will will block when the limit of goroutines is reached until some of the goroutines already running have finished.

For example:

// Allow at most 3 concurrently-active goroutines in the group.
g, start := taskgroup.New(nil).Limit(3)

// Start tasks by calling the function returned by taskgroup.Limit:
start(task1)
start(task2)
start(task3)
start(task4) // blocks until one of the previous tasks is finished
// ...

Solo Tasks

In some cases it is useful to start a single background task to handle an isolated concern (elsewhere sometimes described as a "promise" or a "future").

For example, suppose we want to run some expensive background cleanup task while we take care of other work. Rather than create a whole group for a single goroutine we can create a solo task using the Go or Run functions:

s := taskgroup.Go(func() error {
    for _, v := range itemsToClean {
        if err := cleanup(v); err != nil {
            return err
        }
    }
    return nil
})

Once we're ready, we can Wait for this task to collect its result:

if err := s.Wait(); err != nil {
    log.Printf("WARNING: Cleanup failed: %v", err)
}

Solo tasks are also helpful for functions that return a value. For example, suppose we want to read a file while we handle other matters. The Call function creates a solo task from such a function:

s := taskgroup.Call(func() ([]byte, error) {
    return os.ReadFile(filePath)
})

As before, we can Wait for the result when we're ready:

// N.B.: Wait returns a taskgroup.Result, whose Get method unpacks
// it into a value and an error like a normal function call.
data, err := s.Wait().Get()
if err != nil {
    log.Fatalf("Read configuration: %v", err)
}
doThingsWith(data)

Gathering Results

One common use for a background task is accumulating the results from a batch of concurrent workers. This could be handled by a solo task, as described above, but it is a common enough pattern that the library provides a Gatherer type to handle it specifically.

To use it, pass a function to Gather to receive the values:

var g taskgroup.Group

var sum int
c := taskgroup.Gather(g.Go, func(v int) { sum += v })

The Call, Run, and Report methods of c can now be used to start tasks in g that yield values, and deliver those values to the accumulator:

  • c.Call takes a func() (T, error), returning a value and an error. If the task reports an error, that error is returned as usual. Otherwise, its non-error value is gathered by the callback.

  • c.Run takes a func() T, returning only a value, which is gathered by the callback.

  • c.Report takes a func(func(T)) error, which allows a task to report multiple values to the gatherer via a "report" callback. The task itself returns only an error, but it may call its argument any number of times to gather values.

Calls to the callback are serialized so that it is safe to access state without additional locking:

// Report an error, no value is gathered.
c.Call(func() (int, error) {
    return -1, errors.New("bad")
})

// No error, send gather the value 25.
c.Call(func() (int, error) {
    return 25, nil
})

// Gather a random integer.
c.Run(func() int { return rand.Intn(1000) })

// Gather the values 10, 20, and 30.
//
// Note that even if the function reports an error, any values it sent
// before returning are still gathered.
c.Report(func(report func(int)) error {
    report(10)
    report(20)
    report(30)
    return nil
})

Once all the tasks passed to the gatherer are complete, it is safe to access the values accumulated by the callback:

g.Wait()  // wait for tasks to finish

// Now you can access the values accumulated by c.
fmt.Println(sum)

Documentation

Overview

Package taskgroup manages collections of cooperating goroutines. It defines a Group that handles waiting for goroutine termination and the propagation of error values. The caller may provide a callback to filter and respond to task errors.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Gatherer added in v0.11.0

type Gatherer[T any] struct {
	// contains filtered or unexported fields
}

A Gatherer manages a group of Task functions that report values, and gathers the values they return.

Example
const numTasks = 25
input := rand.Perm(500)

// Start a bunch of tasks to find elements in the input...
g := taskgroup.New(nil)

var total int
c := taskgroup.Gather(g.Go, func(v int) {
	total += v
})

for i := range numTasks {
	target := i + 1
	c.Call(func() (int, error) {
		for _, v := range input {
			if v == target {
				return v, nil
			}
		}
		return 0, errors.New("not found")
	})
}

// Wait for the searchers to finish, then signal the collector to stop.
g.Wait()

// Now get the final result.
fmt.Println(total)
Output:

325

func Gather added in v0.11.0

func Gather[T any](run func(Task), gather func(T)) *Gatherer[T]

Gather creates a new empty gatherer that uses run to execute tasks returning values of type T.

If gather != nil, values reported by successful tasks are passed to the function, otherwise such values are discarded. Calls to gather are synchronized to a single goroutine.

If run == nil, Gather will panic.

func (*Gatherer[T]) Call added in v0.11.0

func (g *Gatherer[T]) Call(f func() (T, error))

Call runs f in g. If f reports an error, the error is propagated to the runner; otherwise the non-error value reported by f is gathered.

func (*Gatherer[T]) Report added in v0.11.0

func (g *Gatherer[T]) Report(f func(report func(T)) error)

Report runs f in g. Any values passed to report are gathered. If f reports an error, that error is propagated to the runner. Any values sent before f returns are still gathered, even if f reports an error.

Example
package main

import (
	"errors"
	"fmt"
	"log"

	"github.com/creachadair/taskgroup"
)

func main() {
	type val struct {
		who string
		v   int
	}

	g := taskgroup.New(nil)
	c := taskgroup.Gather(g.Go, func(z val) {
		fmt.Println(z.who, z.v)
	})

	// The Report method passes its argument a function to report multiple
	// values to the collector.
	c.Report(func(report func(v val)) error {
		for i := range 3 {
			report(val{"even", 2 * i})
		}
		return nil
	})
	// Multiple reporters are fine.
	c.Report(func(report func(v val)) error {
		for i := range 3 {
			report(val{"odd", 2*i + 1})
		}
		// An error from a reporter is propagated like any other task error.
		return errors.New("no bueno")
	})
	err := g.Wait()
	if err == nil || err.Error() != "no bueno" {
		log.Fatalf("Unexpected error: %v", err)
	}
}
Output:

even 0
odd 1
even 2
odd 3
even 4
odd 5

func (*Gatherer[T]) Run added in v0.11.0

func (g *Gatherer[T]) Run(f func() T)

Run runs f in g, and gathers the value it reports.

type Group

type Group struct {
	// contains filtered or unexported fields
}

A Group manages a collection of cooperating goroutines. Add new tasks to the group with Group.Go and Group.Run. Call Group.Wait to wait for the tasks to complete. A zero value is ready for use, but must not be copied after its first use.

The group collects any errors returned by the tasks in the group. The first non-nil error reported by any task (and not otherwise filtered) is returned from the Wait method.

Example
package main

import (
	"fmt"

	"github.com/creachadair/taskgroup"
)

func main() {
	msg := make(chan string)
	g := taskgroup.New(nil)
	g.Run(func() {
		msg <- "ping"
		fmt.Println(<-msg)
	})
	g.Run(func() {
		fmt.Println(<-msg)
		msg <- "pong"
	})
	g.Wait()
	fmt.Println("<done>")

}
Output:

ping
pong
<done>

func New

func New(ef any) *Group

New constructs a new empty group with the specified error filter. See Group.OnError for a description of how errors are filtered. If ef == nil, no filtering is performed.

Example (Cancel)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/creachadair/taskgroup"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	const badTask = 5

	// Construct a group in which any task error cancels the context.
	g := taskgroup.New(cancel)

	for i := range 10 {
		g.Go(func() error {
			if i == badTask {
				return fmt.Errorf("task %d failed", i)
			}
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-time.After(10 * time.Second):
				return nil
			}
		})
	}

	if err := g.Wait(); err == nil {
		log.Fatal("I expected an error here")
	} else {
		fmt.Println(err.Error())
	}
}
Output:

task 5 failed
Example (Listen)
package main

import (
	"errors"
	"fmt"
	"log"
	"strings"

	"github.com/creachadair/taskgroup"
)

func main() {
	// The taskgroup itself will only report the first non-nil task error, but
	// you can use an error listener used to accumulate all of them.
	// Calls to the listener are synchronized, so we don't need a lock.
	var all []error
	g := taskgroup.New(func(e error) {
		all = append(all, e)
	})
	g.Go(func() error { return errors.New("badness 1") })
	g.Go(func() error { return errors.New("badness 2") })
	g.Go(func() error { return errors.New("badness 3") })

	if err := g.Wait(); err == nil || !strings.Contains(err.Error(), "badness") {
		log.Fatalf("Unexpected error: %v", err)
	}
	fmt.Println(errors.Join(all...))
}
Output:

badness 1
badness 2
badness 3

func (*Group) Go

func (g *Group) Go(task Task)

Go runs task in a new goroutine in g.

func (*Group) Limit

func (g *Group) Limit(n int) (*Group, StartFunc)

Limit returns g and a StartFunc that starts each task passed to it in g, allowing no more than n tasks to be active concurrently. If n ≤ 0, no limit is enforced.

The limiting mechanism is optional, and the underlying group is not restricted. A call to the start function will block until a slot is available, but calling g.Go directly will add a task unconditionally and will not take up a limiter slot.

This is a shorthand for constructing a Throttle with capacity n and calling its Limit method. If n ≤ 0, the start function is equivalent to g.Go, which enforces no limit. To share a throttle among multiple groups, construct the throttle separately.

Example
var p peakValue

g, start := taskgroup.New(nil).Limit(4)
for range 100 {
	start.Run(func() {
		p.inc()
		defer p.dec()
		time.Sleep(1 * time.Microsecond)
	})
}
g.Wait()
fmt.Printf("Max active ≤ 4: %v\n", p.max <= 4)
Output:

Max active ≤ 4: true

func (*Group) OnError added in v0.11.0

func (g *Group) OnError(ef any) *Group

OnError sets the error filter for g. If ef == nil, the error filter is removed and errors are no longer filtered. Otherwise, each non-nil error reported by a task running in g is passed to ef.

The concrete type of ef must be a function with one of the following signature schemes, or OnError will panic.

If ef is:

func()

then ef is called once per reported error, and the error is not modified.

If ef is:

func(error)

then ef is called with each reported error, and the error is not modified.

If ef is:

func(error) error

then ef is called with each reported error, and its result replaces the reported value. This permits ef to suppress or replace the error value selectively.

Calls to ef are synchronized so that it is safe for ef to manipulate local data structures without additional locking. It is safe to call OnError while tasks are active in g.

func (*Group) Run added in v0.9.2

func (g *Group) Run(task func())

Run runs task in a new goroutine in g. The resulting task reports a nil error.

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until all the goroutines currently active in the group have returned, and all reported errors have been delivered to the callback. It returns the first non-nil error reported by any of the goroutines in the group and not filtered by an OnError callback.

As with sync.WaitGroup, new tasks can be added to g during a call to Wait only if the group contains at least one active task when Wait is called and continuously thereafter until the last concurrent call to g.Go returns.

Wait may be called from at most one goroutine at a time. After Wait has returned, the group is ready for reuse.

type Result added in v0.6.0

type Result[T any] struct {
	Value T
	Err   error
}

A Result is a pair of an arbitrary value and an error.

func (Result[T]) Get added in v0.6.1

func (r Result[T]) Get() (T, error)

Get returns the fields of r as results. It is a convenience method for unpacking the results of a Call.

Typical usage:

s := taskgroup.Call(func() (int, error) { ... })
v, err := s.Wait().Get()

type Single added in v0.4.0

type Single[T any] struct {
	// contains filtered or unexported fields
}

A Single manages a single background goroutine. The task is started when the value is first created, and the caller can use the Wait method to block until it has exited.

Example
package main

import (
	"fmt"
	"io"
	"log"
	"time"

	"github.com/creachadair/taskgroup"
)

type slowReader struct {
	n int
	d time.Duration
}

func (s *slowReader) Read(data []byte) (int, error) {
	if s.n == 0 {
		return 0, io.EOF
	}
	time.Sleep(s.d)
	nr := min(len(data), s.n)
	s.n -= nr
	for i := range nr {
		data[i] = 'x'
	}
	return nr, nil
}

func main() {
	// A fake reader to simulate a slow file read.
	// 2500 bytes and each read takes 50ms.
	sr := &slowReader{2500, 50 * time.Millisecond}

	// Start a task to read te "file" in the background.
	fmt.Println("start")
	s := taskgroup.Call(func() ([]byte, error) {
		return io.ReadAll(sr)
	})

	fmt.Println("work, work")
	data, err := s.Wait().Get()
	if err != nil {
		log.Fatalf("Read failed: %v", err)
	}
	fmt.Println("done")
	fmt.Println(len(data), "bytes")

}
Output:

start
work, work
done
2500 bytes

func Call added in v0.6.0

func Call[T any](task func() (T, error)) *Single[Result[T]]

Call starts task in a new goroutine. The caller must call Wait to wait for the task to return and collect its result.

func Go added in v0.5.0

func Go[T any](task func() T) *Single[T]

Go runs task in a new goroutine. The caller must call Wait to wait for the task to return and collect its value.

func Run added in v0.9.2

func Run(task func()) *Single[error]

Run runs task in a new goroutine. The caller must call Wait to wait for the task to return. The error reported by Wait is always nil.

func (*Single[T]) Wait added in v0.5.0

func (s *Single[T]) Wait() T

Wait blocks until the task monitored by s has completed and returns the value it reported.

type StartFunc added in v0.13.1

type StartFunc func(Task)

A StartFunc executes each Task passed to it in a Group.

func (StartFunc) Go added in v0.13.1

func (s StartFunc) Go(task Task)

Go is a legibility shorthand for calling s with task.

func (StartFunc) Run added in v0.13.1

func (s StartFunc) Run(f func())

Run is a legibility shorthand for calling s with a task that runs f and reports a nil error.

type Task

type Task func() error

A Task function is the basic unit of work in a Group. Errors reported by tasks are collected and reported by the group.

func NoError deprecated added in v0.5.1

func NoError(f func()) Task

NoError adapts f to a Task that executes f and reports a nil error.

Deprecated: Use Group.Run or taskgroup.Run instead.

type Throttle added in v0.11.0

type Throttle struct {
	// contains filtered or unexported fields
}

A Throttle rate-limits the number of concurrent goroutines that can execute in parallel to some fixed number. A zero Throttle is ready for use, but imposes no limit on parallel execution.

func NewThrottle added in v0.11.0

func NewThrottle(n int) Throttle

NewThrottle constructs a Throttle with a capacity of n goroutines. If n ≤ 0, the resulting Throttle imposes no limit.

func (Throttle) Limit added in v0.11.0

func (t Throttle) Limit(g *Group) StartFunc

Limit returns a function that starts each Task passed to it in g, respecting the rate limit imposed by t. Each call to Limit yields a fresh start function, and all the functions returned share the capacity of t.

Directories

Path Synopsis
examples
copytree
Binary copytree is an example program to demonstrate the use of the group and throttle packages to manage concurrency.
Binary copytree is an example program to demonstrate the use of the group and throttle packages to manage concurrency.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL