Documentation
¶
Overview ¶
Package stream provides a concurrent, ordered stream implementation.
Index ¶
- func All(parentCtx context.Context, execs ...Executable) ([]any, error)
- func ForEach[T any](input []T, f func(*T))
- func ForEachIdx[T any](input []T, f func(int, *T))
- func Last(parentCtx context.Context, num int, execs ...Executable) ([]any, error)
- func Map[T, R any](input []T, f func(*T) R) []R
- func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)
- func Retry(parentCtx context.Context, retries int, fn Executable) (any, error)
- func Take(parentCtx context.Context, num int, execs ...Executable) ([]any, error)
- func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (any, error)
- type Callback
- type ContextPool
- func (p *ContextPool) Exec(f func(ctx context.Context) error)
- func (p *ContextPool) Wait() error
- func (p *ContextPool) WithCancelOnError() *ContextPool
- func (p *ContextPool) WithFailFast() *ContextPool
- func (p *ContextPool) WithFirstError() *ContextPool
- func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool
- type ErrorPool
- type Executable
- type ExecutableInSequence
- type IndexedExecutableOutput
- type IndexedValue
- type Iterator
- type Mapper
- type MaxRetriesExceededError
- type Pool
- type ResultContextPool
- func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error))
- func (p *ResultContextPool[T]) Wait() ([]T, error)
- func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T]
- func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T]
- func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T]
- func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T]
- func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T]
- type ResultErrorPool
- func (p *ResultErrorPool[T]) Go(f func() (T, error))
- func (p *ResultErrorPool[T]) Wait() ([]T, error)
- func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T]
- func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]
- func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T]
- func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T]
- type ResultPool
- func (p *ResultPool[T]) Go(f func() T)
- func (p *ResultPool[T]) MaxGoroutines() int
- func (p *ResultPool[T]) Wait() []T
- func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]
- func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T]
- func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T]
- type Stream
- type Task
- type WaitGroup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func All ¶
func All(parentCtx context.Context, execs ...Executable) ([]any, error)
All returns all the outputs from all Executables, order guaranteed.
func ForEach ¶
func ForEach[T any](input []T, f func(*T))
ForEach executes f in parallel over each element in input.
It is safe to mutate the input parameter, which makes it possible to map in place.
ForEach always uses at most runtime.GOMAXPROCS goroutines. It takes roughly 2µs to start up the goroutines and adds an overhead of roughly 50ns per element of input. For a configurable goroutine limit, use a custom Iterator.
func ForEachIdx ¶
ForEachIdx is the same as ForEach except it also provides the index of the element to the callback.
func Map ¶
func Map[T, R any](input []T, f func(*T) R) []R
Map applies f to each element of input, returning the mapped result.
Map always uses at most runtime.GOMAXPROCS goroutines. For a configurable goroutine limit, use a custom Mapper.
func MapErr ¶
MapErr applies f to each element of the input, returning the mapped result and a combined error of all returned errors.
Map always uses at most runtime.GOMAXPROCS goroutines. For a configurable goroutine limit, use a custom Mapper.
func Retry ¶
Retry attempts to get a value from an Executable instead of an Error. It will keeps re-running the Executable when failed no more than `retries` times. Also, when the parent Context canceled, it returns the `Err()` of it immediately.
func Waterfall ¶
func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (any, error)
Waterfall runs `ExecutableInSequence`s one by one, passing previous result to next Executable as input. When an error occurred, it stop the process then returns the error. When the parent Context canceled, it returns the `Err()` of it immediately.
Types ¶
type Callback ¶
type Callback func()
Callback is a function that is returned by a Task. Callbacks are called in the same order that tasks are submitted.
type ContextPool ¶
type ContextPool struct {
// contains filtered or unexported fields
}
ContextPool is a pool that runs tasks that take a context. A new ContextPool should be created with `New().WithContext(ctx)`.
The configuration methods (With*) will panic if they are used after calling Exec() for the first time.
func (*ContextPool) Exec ¶
func (p *ContextPool) Exec(f func(ctx context.Context) error)
Exec submits a task. If it returns an error, the error will be collected and returned by Wait(). If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.
func (*ContextPool) Wait ¶
func (p *ContextPool) Wait() error
Wait cleans up all spawned goroutines, propagates any panics, and returns an error if any of the tasks errored.
func (*ContextPool) WithCancelOnError ¶
func (p *ContextPool) WithCancelOnError() *ContextPool
WithCancelOnError configures the pool to cancel its context as soon as any task returns an error or panics. By default, the pool's context is not canceled until the parent context is canceled.
In this case, all errors returned from the pool after the first will likely be context.Canceled - you may want to also use (*ContextPool).WithFirstError() to configure the pool to only return the first error.
Example ¶
p := NewPool(). WithMaxGoroutines(4). WithContext(context.Background()). WithCancelOnError() for i := 0; i < 3; i++ { i := i p.Exec(func(ctx context.Context) error { if i == 2 { return errors.New("I will cancel all other tasks!") } <-ctx.Done() return nil }) } err := p.Wait() fmt.Println(err)
Output: I will cancel all other tasks!
func (*ContextPool) WithFailFast ¶
func (p *ContextPool) WithFailFast() *ContextPool
WithFailFast is an alias for the combination of WithFirstError and WithCancelOnError. By default, the errors from all tasks are returned and the pool's context is not canceled until the parent context is canceled.
func (*ContextPool) WithFirstError ¶
func (p *ContextPool) WithFirstError() *ContextPool
WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error. This is particularly useful for (*ContextPool).WithCancelOnError(), where all errors after the first are likely to be context.Canceled.
func (*ContextPool) WithMaxGoroutines ¶
func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type ErrorPool ¶
type ErrorPool struct {
// contains filtered or unexported fields
}
ErrorPool is a pool that runs tasks that may return an error. Errors are collected and returned by Wait().
The configuration methods (With*) will panic if they are used after calling Go() for the first time.
A new ErrorPool should be created using `New().WithErrors()`.
Example ¶
p := NewPool().WithErrors() for i := 0; i < 3; i++ { i := i p.Exec(func() error { if i == 2 { return errors.New("oh no!") } return nil }) } err := p.Wait() fmt.Println(err)
Output: oh no!
func (*ErrorPool) Exec ¶
Exec submits a task to the pool. If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.
func (*ErrorPool) Wait ¶
Wait cleans up any spawned goroutines, propagating any panics and returning any errors from tasks.
func (*ErrorPool) WithContext ¶
func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool
WithContext converts the pool to a ContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.
func (*ErrorPool) WithFirstError ¶
WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.
func (*ErrorPool) WithMaxGoroutines ¶
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type Executable ¶
Executable represents a singular logic block. It can be used with several functions.
type ExecutableInSequence ¶
ExecutableInSequence represents one of a sequence of logic blocks.
type IndexedExecutableOutput ¶
type IndexedExecutableOutput struct { Value IndexedValue Err error }
IndexedExecutableOutput stores both output and error values from a Excetable.
type IndexedValue ¶
IndexedValue stores the output of Executables, along with the index of the source Executable for ordering.
type Iterator ¶
type Iterator[T any] struct { // MaxGoroutines controls the maximum number of goroutines // to use on this Iterator's methods. // // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0). MaxGoroutines int }
Iterator can be used to configure the behaviour of ForEach and ForEachIdx. The zero value is safe to use with reasonable defaults.
Iterator is also safe for reuse and concurrent use.
Example ¶
input := []int{1, 2, 3, 4} iterator := Iterator[int]{ MaxGoroutines: len(input) / 2, } iterator.ForEach(input, func(v *int) { if *v%2 != 0 { *v = -1 } }) fmt.Println(input)
Output: [-1 2 -1 4]
func (Iterator[T]) ForEach ¶
func (iter Iterator[T]) ForEach(input []T, f func(*T))
ForEach executes f in parallel over each element in input, using up to the Iterator's configured maximum number of goroutines.
It is safe to mutate the input parameter, which makes it possible to map in place.
It takes roughly 2µs to start up the goroutines and adds an overhead of roughly 50ns per element of input.
func (Iterator[T]) ForEachIdx ¶
ForEachIdx is the same as ForEach except it also provides the index of the element to the callback.
type Mapper ¶
Mapper is an Iterator with a result type R. It can be used to configure the behaviour of Map and MapErr. The zero value is safe to use with reasonable defaults.
Mapper is also safe for reuse and concurrent use.
Example ¶
input := []int{1, 2, 3, 4} mapper := Mapper[int, bool]{ MaxGoroutines: len(input) / 2, } results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 }) fmt.Println(results)
Output: [false true false true]
type MaxRetriesExceededError ¶
type MaxRetriesExceededError struct {
// contains filtered or unexported fields
}
MaxRetriesExceededError stores how many times did an Execution run before exceeding the limit. The retries field holds the value.
func (MaxRetriesExceededError) Error ¶
func (err MaxRetriesExceededError) Error() string
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a pool of goroutines used to execute tasks concurrently.
Tasks are submitted with Go(). Once all your tasks have been submitted, you must call Wait() to clean up any spawned goroutines and propagate any panics.
Goroutines are started lazily, so creating a new pool is cheap. There will never be more goroutines spawned than there are tasks submitted.
The configuration methods (With*) will panic if they are used after calling Go() for the first time.
Pool is efficient, but not zero cost. It should not be used for very short tasks. Startup and teardown come with an overhead of around 1µs, and each task has an overhead of around 300ns.
Example ¶
p := NewPool().WithMaxGoroutines(3) for i := 0; i < 5; i++ { p.Exec(func() { fmt.Println("conc") }) } p.Wait()
Output: conc conc conc conc conc
func (*Pool) Exec ¶
func (p *Pool) Exec(f func())
Exec submits a task to be run in the pool. If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.
func (*Pool) MaxGoroutines ¶
MaxGoroutines returns the maximum size of the pool.
func (*Pool) Wait ¶
func (p *Pool) Wait()
Wait cleans up spawned goroutines, propagating any panics that were raised by a tasks.
func (*Pool) WithContext ¶
func (p *Pool) WithContext(ctx context.Context) *ContextPool
WithContext converts the pool to a ContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.
func (*Pool) WithErrors ¶
WithErrors converts the pool to an ErrorPool so the submitted tasks can return errors.
func (*Pool) WithMaxGoroutines ¶
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type ResultContextPool ¶
type ResultContextPool[T any] struct { // contains filtered or unexported fields }
ResultContextPool is a pool that runs tasks that take a context and return a result. The context passed to the task will be canceled if any of the tasks return an error, which makes its functionality different than just capturing a context with the task closure.
The configuration methods (With*) will panic if they are used after calling Go() for the first time.
func (*ResultContextPool[T]) Go ¶
func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error))
Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.
func (*ResultContextPool[T]) Wait ¶
func (p *ResultContextPool[T]) Wait() ([]T, error)
Wait cleans up all spawned goroutines, propagates any panics, and returns an error if any of the tasks errored.
func (*ResultContextPool[T]) WithCancelOnError ¶
func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T]
WithCancelOnError configures the pool to cancel its context as soon as any task returns an error. By default, the pool's context is not canceled until the parent context is canceled.
func (*ResultContextPool[T]) WithCollectErrored ¶
func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T]
WithCollectErrored configures the pool to still collect the result of a task even if the task returned an error. By default, the result of tasks that errored are ignored and only the error is collected.
func (*ResultContextPool[T]) WithFailFast ¶
func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T]
WithFailFast is an alias for the combination of WithFirstError and WithCancelOnError. By default, the errors from all tasks are returned and the pool's context is not canceled until the parent context is canceled.
func (*ResultContextPool[T]) WithFirstError ¶
func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T]
WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.
func (*ResultContextPool[T]) WithMaxGoroutines ¶
func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T]
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type ResultErrorPool ¶
type ResultErrorPool[T any] struct { // contains filtered or unexported fields }
ResultErrorPool is a pool that executes tasks that return a generic result type and an error. Tasks are executed in the pool with Go(), then the results of the tasks are returned by Wait().
The order of the results is not guaranteed to be the same as the order the tasks were submitted. If your use case requires consistent ordering, consider using the `stream` package or `Map` from the `iter` package.
The configuration methods (With*) will panic if they are used after calling Go() for the first time.
func (*ResultErrorPool[T]) Go ¶
func (p *ResultErrorPool[T]) Go(f func() (T, error))
Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.
func (*ResultErrorPool[T]) Wait ¶
func (p *ResultErrorPool[T]) Wait() ([]T, error)
Wait cleans up any spawned goroutines, propagating any panics and returning the results and any errors from tasks.
func (*ResultErrorPool[T]) WithCollectErrored ¶
func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T]
WithCollectErrored configures the pool to still collect the result of a task even if the task returned an error. By default, the result of tasks that errored are ignored and only the error is collected.
func (*ResultErrorPool[T]) WithContext ¶
func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]
WithContext converts the pool to a ResultContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.
func (*ResultErrorPool[T]) WithFirstError ¶
func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T]
WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.
func (*ResultErrorPool[T]) WithMaxGoroutines ¶
func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T]
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type ResultPool ¶
type ResultPool[T any] struct { // contains filtered or unexported fields }
ResultPool is a pool that executes tasks that return a generic result type. Tasks are executed in the pool with Go(), then the results of the tasks are returned by Wait().
The order of the results is not guaranteed to be the same as the order the tasks were submitted. If your use case requires consistent ordering, consider using the `stream` package or `Map` from the `iter` package.
Example ¶
p := NewWithResults[int]() for i := 0; i < 10; i++ { i := i p.Go(func() int { return i * 2 }) } res := p.Wait() // Result order is nondeterministic, so sort them first sort.Ints(res) fmt.Println(res)
Output: [0 2 4 6 8 10 12 14 16 18]
func NewWithResults ¶
func NewWithResults[T any]() *ResultPool[T]
NewWithResults creates a new ResultPool for tasks with a result of type T.
The configuration methods (With*) will panic if they are used after calling Go() for the first time.
func (*ResultPool[T]) Go ¶
func (p *ResultPool[T]) Go(f func() T)
Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.
func (*ResultPool[T]) MaxGoroutines ¶
func (p *ResultPool[T]) MaxGoroutines() int
MaxGoroutines returns the maximum size of the pool.
func (*ResultPool[T]) Wait ¶
func (p *ResultPool[T]) Wait() []T
Wait cleans up all spawned goroutines, propagating any panics, and returning a slice of results from tasks that did not panic.
func (*ResultPool[T]) WithContext ¶
func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]
WithContext converts the pool to a ResultContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.
func (*ResultPool[T]) WithErrors ¶
func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T]
WithErrors converts the pool to an ResultErrorPool so the submitted tasks can return errors.
func (*ResultPool[T]) WithMaxGoroutines ¶
func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T]
WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is used to execute a stream of tasks concurrently while maintaining the order of the results.
To use a stream, you submit some number of `Task`s, each of which return a callback. Each task will be executed concurrently in the stream's associated Pool, and the callbacks will be executed sequentially in the order the tasks were submitted.
Once all your tasks have been submitted, Wait() must be called to clean up running goroutines and propagate any panics.
In the case of panic during execution of a task or a callback, all other tasks and callbacks will still execute. The panic will be propagated to the caller when Wait() is called.
A Stream is efficient, but not zero cost. It should not be used for very short tasks. Startup and teardown adds an overhead of a couple of microseconds, and the overhead for each task is roughly 500ns. It should be good enough for any task that requires a network call.
Example ¶
times := []int{20, 52, 16, 45, 4, 80} stream := NewStream() for _, millis := range times { dur := time.Duration(millis) * time.Millisecond stream.Exec(func() Callback { time.Sleep(dur) // This will print in the order the tasks were submitted return func() { fmt.Println(dur) } }) } stream.Wait()
Output: 20ms 52ms 16ms 45ms 4ms 80ms
func (*Stream) Exec ¶
Exec schedules a task to be run in the stream's pool. All submitted tasks will be executed concurrently in worker goroutines. Then, the callbacks returned by the tasks will be executed in the order that the tasks were submitted. All callbacks will be executed by the same goroutine, so no synchronization is necessary between callbacks. If all goroutines in the stream's pool are busy, a call to Go() will block until the task can be started.
func (*Stream) Wait ¶
func (s *Stream) Wait()
Wait signals to the stream that all tasks have been submitted. Wait will not return until all tasks and callbacks have been run.
func (*Stream) WithMaxGoroutines ¶
type Task ¶
type Task func() Callback
Task is a task that is submitted to the stream. Submitted tasks will be executed concurrently. It returns a callback that will be called after the task has completed.
type WaitGroup ¶
type WaitGroup struct {
// contains filtered or unexported fields
}
WaitGroup is the primary building block for scoped concurrency. Goroutines can be spawned in the WaitGroup with the Go method, and calling Wait() will ensure that each of those goroutines exits before continuing. Any panics in a child goroutine will be caught and propagated to the caller of Wait().
The zero value of WaitGroup is usable, just like sync.WaitGroup. Also like sync.WaitGroup, it must not be copied after first use.
func (*WaitGroup) Exec ¶
func (h *WaitGroup) Exec(f func())
Exec spawns a new goroutine in the WaitGroup.
func (*WaitGroup) Wait ¶
func (h *WaitGroup) Wait()
Wait will block until all goroutines spawned with Go exit and will propagate any panics spawned in a child goroutine.
func (*WaitGroup) WaitAndRecover ¶
WaitAndRecover will block until all goroutines spawned with Go exit and will return a *panics.Recovered if one of the child goroutines panics.