Documentation ¶
Overview ¶
Package parallel contains generic typesafe functions to manage concurrent logic of various kinds.
Index ¶
- func Consumers[F ~func(context.Context, int, T) error, T any](ctx context.Context, n int, f F) (func(T) error, func() error)
- func Pool[F ~func(T) (U, error), T, U any](n int, f F) func(T) (U, error)
- func Producers[F ~func(context.Context, int, func(T) error) error, T any](ctx context.Context, n int, f F) iter.Of[T]
- func Protect[T any](val T) (reader func() T, writer func(T), closer func())
- func Values[F ~func(context.Context, int) (T, error), T any](ctx context.Context, n int, f F) ([]T, error)
- type Error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consumers ¶
func Consumers[F ~func(context.Context, int, T) error, T any](ctx context.Context, n int, f F) (func(T) error, func() error)
Consumers launches n parallel workers each consuming values supplied by the caller.
When a value is available, an available worker calls the function f to consume it. This callback receives the worker's number (in the range 0 through n-1) and the value.
The caller receives two callbacks: one for sending a value to the workers via an internal channel, and one for closing that channel, signaling the end of input and causing the workers to exit normally.
The value-sending callback may block until a worker is available to consume the value.
An error from any worker cancels them all. This error is returned from the close-channel callback. After any error, the value-sending callback will return an error. (Not the original error, however. For that, the caller should still invoke the close callback.)
Example ¶
package main import ( "context" "fmt" "github.com/bobg/go-generics/v3/parallel" ) func main() { ctx := context.Background() // One of three goroutines prints incoming values. send, closefn := parallel.Consumers(ctx, 3, func(_ context.Context, _, val int) error { fmt.Println(val) return nil }) // Caller produces values. for i := 1; i <= 5; i++ { err := send(i) if err != nil { panic(err) } } if err := closefn(); err != nil { panic(err) } }
Output: 1 2 3 4 5
func Pool ¶
Pool permits up to n concurrent calls to a function f. The caller receives a callback for requesting a worker from this pool. When no worker is available, the callback blocks until one becomes available. Then it invokes f and returns the result.
Each call of the callback is synchronous. Any desired concurrency is the responsibility of the caller.
Example ¶
package main import ( "fmt" "sync" "github.com/bobg/go-generics/v3/parallel" ) func main() { // Three workers available, each negating its input. pool := parallel.Pool(3, func(n int) (int, error) { return -n, nil }) var wg sync.WaitGroup // Ten goroutines requesting work from those three workers. for i := 1; i <= 10; i++ { i := i // Go loop-var pitfall wg.Add(1) go func() { neg, err := pool(i) if err != nil { panic(err) } fmt.Println(neg) wg.Done() }() } wg.Wait() }
Output: -1 -2 -3 -4 -5 -6 -7 -8 -9 -10
func Producers ¶
func Producers[F ~func(context.Context, int, func(T) error) error, T any](ctx context.Context, n int, f F) iter.Of[T]
Producers launches n parallel workers each running the function f.
Each worker receives its worker number (in the range 0 through n-1) and a callback to use for producing a value. If the callback returns an error, the worker should exit with that error.
The callback that the worker uses to produce a value may block until the caller is able to consume the value.
An error from any worker cancels them all.
The caller gets an iterator over the values produced.
Example ¶
package main import ( "context" "fmt" "github.com/bobg/go-generics/v3/parallel" ) func main() { ctx := context.Background() // Five goroutines each produce their worker number and then exit. it := parallel.Producers(ctx, 5, func(_ context.Context, n int, send func(int) error) error { return send(n) }) // Caller consumes the produced values. for it.Next() { fmt.Println(it.Val()) } if err := it.Err(); err != nil { panic(err) } }
Output: 0 1 2 3 4
func Protect ¶
func Protect[T any](val T) (reader func() T, writer func(T), closer func())
Protect offers safe concurrent access to a protected value. It is a "share memory by communicating" alternative to protecting the value with sync.RWMutex.
The caller gets back three functions: a reader for getting the protected value, a writer for updating it, and a closer for releasing resources when no further reads or writes are needed.
Any number of calls to the reader may run concurrently. If T is a "reference type" (see below) then the caller should not make any changes to the value it receives from the reader.
A call to the writer prevents other reader and writer calls from running until it is done. It waits for pending calls to finish before it executes. After a call to the writer, future reader calls will receive the updated value.
The closer should be called to release resources when no more reader or writer calls are needed. Calling any of the functions (reader, writer, or closer) after a call to the closer may cause a panic.
The term "reference type" here means a type (such as pointer, slice, map, channel, function, and interface) that allows a caller C to make changes that will be visible to other callers outside of C's scope. In other words, if the type is int and caller A does this:
val := reader() val++
it will not affect the value that caller B sees when it does its own call to reader(). But if the type is *int and caller A does this:
val := reader() *val++
then the change in the pointed-to value _will_ be seen by caller B.
For more on the fuzzy concept of "reference types" in Go, see https://github.com/go101/go101/wiki/About-the-terminology-%22reference-type%22-in-Go
Example ¶
package main import ( "fmt" "sync" "github.com/bobg/go-generics/v3/parallel" ) func main() { // A caller is supplied with a reader and a writer // for purposes of accessing and updating the protected value safely // (in this case an int, initially 4). reader, writer, closer := parallel.Protect(4) defer closer() // Call the reader in three concurrent goroutines, each printing the protected value. var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func() { fmt.Println(reader()) wg.Done() }() } wg.Wait() // Increment the protected value. writer(reader() + 1) // Call the reader in three concurrent goroutines, each printing the protected value. for i := 0; i < 3; i++ { wg.Add(1) go func() { fmt.Println(reader()) wg.Done() }() } wg.Wait() }
Output: 4 4 4 5 5 5
func Values ¶
func Values[F ~func(context.Context, int) (T, error), T any](ctx context.Context, n int, f F) ([]T, error)
Values produces a slice of n values using n parallel workers each running the function f.
Each worker receives its worker number (in the range 0 through n-1).
An error from any worker cancels them all. The first error is returned to the caller.
The resulting slice has length n. The value at position i comes from worker i.
Example ¶
package main import ( "context" "fmt" "github.com/bobg/go-generics/v3/parallel" ) func main() { ctx := context.Background() // Five goroutines, each placing its worker number in the corresponding slot of the result slice. values, err := parallel.Values(ctx, 5, func(_ context.Context, n int) (int, error) { return n, nil }) if err != nil { panic(err) } fmt.Println(values) }
Output: [0 1 2 3 4]