Documentation ¶
Overview ¶
Package taskgroup manages collections of cooperating goroutines. It defines a Group that handles waiting for goroutine termination and the propagation of error values. The caller may provide a callback to filter and respond to task errors.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Gatherer ¶ added in v0.11.0
type Gatherer[T any] struct { // contains filtered or unexported fields }
A Gatherer manages a group of Task functions that report values, and gathers the values they return.
Example ¶
const numTasks = 25 input := rand.Perm(500) // Start a bunch of tasks to find elements in the input... g := taskgroup.New(nil) var total int c := taskgroup.Gather(g.Go, func(v int) { total += v }) for i := range numTasks { target := i + 1 c.Call(func() (int, error) { for _, v := range input { if v == target { return v, nil } } return 0, errors.New("not found") }) } // Wait for the searchers to finish, then signal the collector to stop. g.Wait() // Now get the final result. fmt.Println(total)
Output: 325
func Gather ¶ added in v0.11.0
Gather creates a new empty gatherer that uses run to execute tasks returning values of type T.
If gather != nil, values reported by successful tasks are passed to the function, otherwise such values are discarded. Calls to gather are synchronized to a single goroutine.
If run == nil, Gather will panic.
func (*Gatherer[T]) Call ¶ added in v0.11.0
Call runs f in g. If f reports an error, the error is propagated to the runner; otherwise the non-error value reported by f is gathered.
func (*Gatherer[T]) Report ¶ added in v0.11.0
Report runs f in g. Any values passed to report are gathered. If f reports an error, that error is propagated to the runner. Any values sent before f returns are still gathered, even if f reports an error.
Example ¶
package main import ( "errors" "fmt" "log" "github.com/creachadair/taskgroup" ) func main() { type val struct { who string v int } g := taskgroup.New(nil) c := taskgroup.Gather(g.Go, func(z val) { fmt.Println(z.who, z.v) }) // The Report method passes its argument a function to report multiple // values to the collector. c.Report(func(report func(v val)) error { for i := range 3 { report(val{"even", 2 * i}) } return nil }) // Multiple reporters are fine. c.Report(func(report func(v val)) error { for i := range 3 { report(val{"odd", 2*i + 1}) } // An error from a reporter is propagated like any other task error. return errors.New("no bueno") }) err := g.Wait() if err == nil || err.Error() != "no bueno" { log.Fatalf("Unexpected error: %v", err) } }
Output: even 0 odd 1 even 2 odd 3 even 4 odd 5
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
A Group manages a collection of cooperating goroutines. Add new tasks to the group with Group.Go and Group.Run. Call Group.Wait to wait for the tasks to complete. A zero value is ready for use, but must not be copied after its first use.
The group collects any errors returned by the tasks in the group. The first non-nil error reported by any task (and not otherwise filtered) is returned from the Wait method.
Example ¶
package main import ( "fmt" "github.com/creachadair/taskgroup" ) func main() { msg := make(chan string) g := taskgroup.New(nil) g.Run(func() { msg <- "ping" fmt.Println(<-msg) }) g.Run(func() { fmt.Println(<-msg) msg <- "pong" }) g.Wait() fmt.Println("<done>") }
Output: ping pong <done>
func New ¶
New constructs a new empty group with the specified error filter. See Group.OnError for a description of how errors are filtered. If ef == nil, no filtering is performed.
Example (Cancel) ¶
package main import ( "context" "fmt" "log" "time" "github.com/creachadair/taskgroup" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() const badTask = 5 // Construct a group in which any task error cancels the context. g := taskgroup.New(cancel) for i := range 10 { g.Go(func() error { if i == badTask { return fmt.Errorf("task %d failed", i) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(10 * time.Second): return nil } }) } if err := g.Wait(); err == nil { log.Fatal("I expected an error here") } else { fmt.Println(err.Error()) } }
Output: task 5 failed
Example (Listen) ¶
package main import ( "errors" "fmt" "log" "strings" "github.com/creachadair/taskgroup" ) func main() { // The taskgroup itself will only report the first non-nil task error, but // you can use an error listener used to accumulate all of them. // Calls to the listener are synchronized, so we don't need a lock. var all []error g := taskgroup.New(func(e error) { all = append(all, e) }) g.Go(func() error { return errors.New("badness 1") }) g.Go(func() error { return errors.New("badness 2") }) g.Go(func() error { return errors.New("badness 3") }) if err := g.Wait(); err == nil || !strings.Contains(err.Error(), "badness") { log.Fatalf("Unexpected error: %v", err) } fmt.Println(errors.Join(all...)) }
Output: badness 1 badness 2 badness 3
func (*Group) Limit ¶
Limit returns g and a StartFunc that starts each task passed to it in g, allowing no more than n tasks to be active concurrently. If n ≤ 0, no limit is enforced.
The limiting mechanism is optional, and the underlying group is not restricted. A call to the start function will block until a slot is available, but calling g.Go directly will add a task unconditionally and will not take up a limiter slot.
This is a shorthand for constructing a Throttle with capacity n and calling its Limit method. If n ≤ 0, the start function is equivalent to g.Go, which enforces no limit. To share a throttle among multiple groups, construct the throttle separately.
Example ¶
var p peakValue g, start := taskgroup.New(nil).Limit(4) for range 100 { start.Run(func() { p.inc() defer p.dec() time.Sleep(1 * time.Microsecond) }) } g.Wait() fmt.Printf("Max active ≤ 4: %v\n", p.max <= 4)
Output: Max active ≤ 4: true
func (*Group) OnError ¶ added in v0.11.0
OnError sets the error filter for g. If ef == nil, the error filter is removed and errors are no longer filtered. Otherwise, each non-nil error reported by a task running in g is passed to ef.
The concrete type of ef must be a function with one of the following signature schemes, or OnError will panic.
If ef is:
func()
then ef is called once per reported error, and the error is not modified.
If ef is:
func(error)
then ef is called with each reported error, and the error is not modified.
If ef is:
func(error) error
then ef is called with each reported error, and its result replaces the reported value. This permits ef to suppress or replace the error value selectively.
Calls to ef are synchronized so that it is safe for ef to manipulate local data structures without additional locking. It is safe to call OnError while tasks are active in g.
func (*Group) Run ¶ added in v0.9.2
func (g *Group) Run(task func())
Run runs task in a new goroutine in g. The resulting task reports a nil error.
func (*Group) Wait ¶
Wait blocks until all the goroutines currently active in the group have returned, and all reported errors have been delivered to the callback. It returns the first non-nil error reported by any of the goroutines in the group and not filtered by an OnError callback.
As with sync.WaitGroup, new tasks can be added to g during a call to Wait only if the group contains at least one active task when Wait is called and continuously thereafter until the last concurrent call to g.Go returns.
Wait may be called from at most one goroutine at a time. After Wait has returned, the group is ready for reuse.
type Single ¶ added in v0.4.0
type Single[T any] struct { // contains filtered or unexported fields }
A Single manages a single background goroutine. The task is started when the value is first created, and the caller can use the Wait method to block until it has exited.
Example ¶
package main import ( "fmt" "io" "log" "time" "github.com/creachadair/taskgroup" ) type slowReader struct { n int d time.Duration } func (s *slowReader) Read(data []byte) (int, error) { if s.n == 0 { return 0, io.EOF } time.Sleep(s.d) nr := min(len(data), s.n) s.n -= nr for i := range nr { data[i] = 'x' } return nr, nil } func main() { // A fake reader to simulate a slow file read. // 2500 bytes and each read takes 50ms. sr := &slowReader{2500, 50 * time.Millisecond} // Start a task to read te "file" in the background. fmt.Println("start") s := taskgroup.Call(func() ([]byte, error) { return io.ReadAll(sr) }) fmt.Println("work, work") data, err := s.Wait().Get() if err != nil { log.Fatalf("Read failed: %v", err) } fmt.Println("done") fmt.Println(len(data), "bytes") }
Output: start work, work done 2500 bytes
func Call ¶ added in v0.6.0
Call starts task in a new goroutine. The caller must call Wait to wait for the task to return and collect its result.
func Go ¶ added in v0.5.0
Go runs task in a new goroutine. The caller must call Wait to wait for the task to return and collect its value.
type StartFunc ¶ added in v0.13.1
type StartFunc func(Task)
type Task ¶
type Task func() error
A Task function is the basic unit of work in a Group. Errors reported by tasks are collected and reported by the group.
func NoError
deprecated
added in
v0.5.1
func NoError(f func()) Task
NoError adapts f to a Task that executes f and reports a nil error.
Deprecated: Use Group.Run or taskgroup.Run instead.
type Throttle ¶ added in v0.11.0
type Throttle struct {
// contains filtered or unexported fields
}
A Throttle rate-limits the number of concurrent goroutines that can execute in parallel to some fixed number. A zero Throttle is ready for use, but imposes no limit on parallel execution.
func NewThrottle ¶ added in v0.11.0
NewThrottle constructs a Throttle with a capacity of n goroutines. If n ≤ 0, the resulting Throttle imposes no limit.