syncerr

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package syncerr provides synchronization utilities for working with errors generated by goroutines.

When used in this package, the term "task" defines a simple signature for a function that can be run as part of a "group" of goroutines. The purpose of the groups (ParallelGroup and CoordinatedGroup) is to handle the outcome of running these tasks in a unified fashion: you can wait for the *group* to finish and check the outcome as a single error status.

All tasks have the signature:

type task func() error

All groups share the interface:

type taskGroup interface {
	Go(func() error, ...string)
	Wait() error
}

CoordinatedGroup

If you want the tasks to run until at least one task returns an error, and receive the first error as the outcome, use CoordinatedGroup. CoordinatedGroups are modeled directly on errgroups: https://pkg.go.dev/golang.org/x/sync/errgroup.

> In fact, the taskGroup interface is implemented by > "golang.org/x/sync/errgroup".

group, ctx := errors.NewCoordinatedGroup(ctx)
group.Go(taskRunner, "task", "1")
// ...
if err := group.Wait(); err != nil {
	// ...
}

The more terse, default version of the task runner returns a CoordinatedGroup:

group, ctx := errors.NewGroup(ctx) // Same as errors.NewCoordinatedGroup.

ParallelGroup

If you want all tasks to run to completion and have their errors coalesced, use ParallelGroup:

group := new(errors.ParallelGroup)
group.Go(taskRunner, "task", "1")
// ...
err := group.Wait()
merr, _ := err.(*errors.MultiError)
fmt.Println(merr.Unwrap())

ParallelGroup includes the function WaitForMultiError that skips the step of asserting the multierror interface on the result:

group := new(errors.ParallelGroup)
group.Go(taskRunner, "task", "1")
// ...
merr := group.WaitForMultiError()
fmt.Println(merr.Unwrap())
Example (ParallelGroup)

ParallelGroups are a version of CoordinatedGroups optimized for running a set of tasks that must be completed before moving on in a routine, while retaining access to all errors generated by any task.

We can use a zero value version of the group to synchronize on all tasks completing (like in CoordinatedGroups) and wait on the result. To make it easier to range over the outcome we can return an errors.MultiError directly too.

package main

import (
	"fmt"

	"github.com/secureworks/errors"
	"github.com/secureworks/errors/syncerr"
)

func main() {
	printTask := func(workload string, i int) error {
		if 0 == i%2 {
			return errors.New("error with task: " + workload)
		}
		fmt.Print("\ntask: ", workload)
		return nil
	}

	group := new(syncerr.ParallelGroup)
	for i, name := range []string{"wk", "wk", "wk", "wk"} {
		i, name := i, name // https://golang.org/doc/faq#closures_and_goroutines
		group.Go(func() error { return printTask(name, i) })
	}
	merr := group.WaitForMultiError()

	fmt.Println()
	fmt.Println()

	for _, err := range merr.Unwrap() {
		fmt.Println(err)
	}

}
Output:

task: wk
task: wk

error with task: wk
error with task: wk
Example (Pipeline)

CoordinatedGroups are a great tool for building pipelines by adding a small layer of synchronization on top of them. The below example shows a 3-step pipeline that kills all incomplete steps if an error arrives in any one step.

package main

import (
	"context"
	"fmt"

	"github.com/secureworks/errors/syncerr"
)

func main() {
	// Define a workload unit and pipelines for messaging.
	type workload struct{ V string }
	pipelineReadIn := make(chan workload, 5)
	pipelineMapTo := make(chan workload, 5)
	pipelineResultOut := make(chan string, 5)

	group, ctx := syncerr.NewGroup(context.Background())

	// Step 1: generate values.
	group.Go(func() error {
		defer close(pipelineReadIn)
		for _, wk := range []workload{{"w"}, {"w"}, {"w"}, {"w"}, {"?"}} {
			select {
			case pipelineReadIn <- wk:
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		return nil
	})

	// Step 2: map values.
	group.Go(func() error {
		defer close(pipelineMapTo)
		for wk := range pipelineReadIn {
			select {
			case pipelineMapTo <- workload{V: wk.V + "k"}:
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		return nil
	})

	// Step 3: workers print the path.
	const numWorkers = 2
	for i := 0; i < numWorkers; i++ {
		group.Go(func() error {
			for wk := range pipelineMapTo {
				if wk.V != "wk" {
					return fmt.Errorf("pipeline broken: invalid workload found: %q", wk.V)
				}
				select {
				case pipelineResultOut <- wk.V:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		}, "worker N")
	}

	go func() { // Sync.
		_ = group.Wait()
		close(pipelineResultOut)
	}()

	for str := range pipelineResultOut {
		fmt.Println(str)
	}
	if err := group.Wait(); err != nil {
		fmt.Println(err)
	}

	// Racy, so not making testable.
}
Output:

Example (SimplestCoordinatedGroup)

CoordinatedGroups are a great, easy shorthand for running a set of tasks that must be completed before moving on in a routine: there's no need to write out the sync.WaitGroup logic.

In the example below we don't even need to set the shared cancellation context: we can use a zero value version of the group to synchronize on all tasks completing. If we are interested in retaining the error status of these tasks, use a ParallelGroup instead.

package main

import (
	"fmt"

	"github.com/secureworks/errors/syncerr"
)

func main() {
	printTask := func(workload string) {
		fmt.Print("\ntask: ", workload)
	}

	group := new(syncerr.CoordinatedGroup)
	for _, name := range []string{"wk", "wk", "wk", "wk"} {
		name := name // https://golang.org/doc/faq#closures_and_goroutines
		group.Go(func() error { printTask(name); return nil })
	}
	_ = group.Wait()

}
Output:

task: wk
task: wk
task: wk
task: wk

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CoordinatedGroup

type CoordinatedGroup struct {
	// contains filtered or unexported fields
}

CoordinatedGroup is a collection of goroutines working on subtasks that are part of the same overall task, and which coordinate to cancel the overall task when any subtask fails.

func NewCoordinatedGroup

func NewCoordinatedGroup(ctx context.Context) (group *CoordinatedGroup, innerCtx context.Context)

NewCoordinatedGroup creates a new CoordinatedGroup with the given context, and a reference to the "inner context" that is cancelled when any subtask returns an error or when all tasks are complete.

func NewGroup

func NewGroup(ctx context.Context) (group *CoordinatedGroup, innerCtx context.Context)

NewGroup creates a new CoordinatedGroup with the given context, and a reference to the "inner context" that is cancelled when any subtask returns an error or when all tasks are complete.

NewGroup is an alias of NewCoordinatedGroup.

func (*CoordinatedGroup) Go

func (g *CoordinatedGroup) Go(f func() error, taskNames ...string)

Go registers and runs a new subtask for the CoordinatedGroup. The first call to return a non-nil error cancels the group; its error will be returned by Wait.

Go also accepts a "list" of "task names" that are appended to any errors this subtask generates.

In order to keep the interface simpler, we do not enforce any parameters on the given task runner: if you want to inject the context supplied to the group, for example, pass it with a closure:

group, _ := syncerr.NewCoordinatedGroup(ctx)
group.Go(func() error {
	return someTask(ctx)
}, "someTask", "1")

Use this pattern for running tasks that need any number of parameters.

func (*CoordinatedGroup) Wait

func (g *CoordinatedGroup) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

Tasks are in charge of ending themselves if the group's context is cancelled, in the case where they may not end on their own.

type ParallelGroup

type ParallelGroup struct {
	// contains filtered or unexported fields
}

ParallelGroup is a collection of goroutines working on subtasks that are part of the same overall task, and which return errors that need to be handled or coalesced.

There is no factory function to create a ParallelGroup since the zero value is a viable instance.

group := new(syncerr.ParallelGroup)

func (*ParallelGroup) Go

func (g *ParallelGroup) Go(f func() error, taskNames ...string)

Go registers and runs a new subtask for the ParallelGroup.

Go also accepts a "list" of "task names" that are appended to any errors this subtask generates.

In order to keep the interface simpler, we do not enforce any parameters on the given task runner: if you want to inject a context to cancel the task, for example, pass it with a closure:

group := new(syncerr.ParallelGroup)
group.Go(func() error {
	return someTask(ctx)
}, "someTask", "1")

Use this pattern for running tasks that need any number of parameters.

func (*ParallelGroup) Wait

func (g *ParallelGroup) Wait() error

Wait blocks on either all workers completing or the group's context being cancelled. All errors generated by the workers are returned.

func (*ParallelGroup) WaitForMultiError

func (g *ParallelGroup) WaitForMultiError() *errors.MultiError

WaitForMultiError blocks on either all workers completing or the group's context being cancelled. All errors generated by the workers are returned as an errors.MultiError.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL