async

package
v2.0.0-...-4e8a7ef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package stream provides a concurrent, ordered stream implementation.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func All

func All(parentCtx context.Context, execs ...Executable) ([]any, error)

All returns all the outputs from all Executables, order guaranteed.

func ForEach

func ForEach[T any](input []T, f func(*T))

ForEach executes f in parallel over each element in input.

It is safe to mutate the input parameter, which makes it possible to map in place.

ForEach always uses at most runtime.GOMAXPROCS goroutines. It takes roughly 2µs to start up the goroutines and adds an overhead of roughly 50ns per element of input. For a configurable goroutine limit, use a custom Iterator.

func ForEachIdx

func ForEachIdx[T any](input []T, f func(int, *T))

ForEachIdx is the same as ForEach except it also provides the index of the element to the callback.

func Last

func Last(parentCtx context.Context, num int, execs ...Executable) ([]any, error)

Last returns the last `num` values outputted by the Executables.

func Map

func Map[T, R any](input []T, f func(*T) R) []R

Map applies f to each element of input, returning the mapped result.

Map always uses at most runtime.GOMAXPROCS goroutines. For a configurable goroutine limit, use a custom Mapper.

func MapErr

func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)

MapErr applies f to each element of the input, returning the mapped result and a combined error of all returned errors.

Map always uses at most runtime.GOMAXPROCS goroutines. For a configurable goroutine limit, use a custom Mapper.

func Retry

func Retry(parentCtx context.Context, retries int, fn Executable) (any, error)

Retry attempts to get a value from an Executable instead of an Error. It will keeps re-running the Executable when failed no more than `retries` times. Also, when the parent Context canceled, it returns the `Err()` of it immediately.

func Take

func Take(parentCtx context.Context, num int, execs ...Executable) ([]any, error)

Take returns the first `num` values outputted by the Executables.

func Waterfall

func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (any, error)

Waterfall runs `ExecutableInSequence`s one by one, passing previous result to next Executable as input. When an error occurred, it stop the process then returns the error. When the parent Context canceled, it returns the `Err()` of it immediately.

Types

type Callback

type Callback func()

Callback is a function that is returned by a Task. Callbacks are called in the same order that tasks are submitted.

type ContextPool

type ContextPool struct {
	// contains filtered or unexported fields
}

ContextPool is a pool that runs tasks that take a context. A new ContextPool should be created with `New().WithContext(ctx)`.

The configuration methods (With*) will panic if they are used after calling Exec() for the first time.

func (*ContextPool) Exec

func (p *ContextPool) Exec(f func(ctx context.Context) error)

Exec submits a task. If it returns an error, the error will be collected and returned by Wait(). If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.

func (*ContextPool) Wait

func (p *ContextPool) Wait() error

Wait cleans up all spawned goroutines, propagates any panics, and returns an error if any of the tasks errored.

func (*ContextPool) WithCancelOnError

func (p *ContextPool) WithCancelOnError() *ContextPool

WithCancelOnError configures the pool to cancel its context as soon as any task returns an error or panics. By default, the pool's context is not canceled until the parent context is canceled.

In this case, all errors returned from the pool after the first will likely be context.Canceled - you may want to also use (*ContextPool).WithFirstError() to configure the pool to only return the first error.

Example
p := NewPool().
	WithMaxGoroutines(4).
	WithContext(context.Background()).
	WithCancelOnError()
for i := 0; i < 3; i++ {
	i := i
	p.Exec(func(ctx context.Context) error {
		if i == 2 {
			return errors.New("I will cancel all other tasks!")
		}
		<-ctx.Done()
		return nil
	})
}
err := p.Wait()
fmt.Println(err)
Output:

I will cancel all other tasks!

func (*ContextPool) WithFailFast

func (p *ContextPool) WithFailFast() *ContextPool

WithFailFast is an alias for the combination of WithFirstError and WithCancelOnError. By default, the errors from all tasks are returned and the pool's context is not canceled until the parent context is canceled.

func (*ContextPool) WithFirstError

func (p *ContextPool) WithFirstError() *ContextPool

WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error. This is particularly useful for (*ContextPool).WithCancelOnError(), where all errors after the first are likely to be context.Canceled.

func (*ContextPool) WithMaxGoroutines

func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type ErrorPool

type ErrorPool struct {
	// contains filtered or unexported fields
}

ErrorPool is a pool that runs tasks that may return an error. Errors are collected and returned by Wait().

The configuration methods (With*) will panic if they are used after calling Go() for the first time.

A new ErrorPool should be created using `New().WithErrors()`.

Example
p := NewPool().WithErrors()
for i := 0; i < 3; i++ {
	i := i
	p.Exec(func() error {
		if i == 2 {
			return errors.New("oh no!")
		}
		return nil
	})
}
err := p.Wait()
fmt.Println(err)
Output:

oh no!

func (*ErrorPool) Exec

func (p *ErrorPool) Exec(f func() error)

Exec submits a task to the pool. If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.

func (*ErrorPool) Wait

func (p *ErrorPool) Wait() error

Wait cleans up any spawned goroutines, propagating any panics and returning any errors from tasks.

func (*ErrorPool) WithContext

func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool

WithContext converts the pool to a ContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.

func (*ErrorPool) WithFirstError

func (p *ErrorPool) WithFirstError() *ErrorPool

WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.

func (*ErrorPool) WithMaxGoroutines

func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type Executable

type Executable func(context.Context) (any, error)

Executable represents a singular logic block. It can be used with several functions.

type ExecutableInSequence

type ExecutableInSequence func(context.Context, any) (any, error)

ExecutableInSequence represents one of a sequence of logic blocks.

type IndexedExecutableOutput

type IndexedExecutableOutput struct {
	Value IndexedValue
	Err   error
}

IndexedExecutableOutput stores both output and error values from a Excetable.

type IndexedValue

type IndexedValue struct {
	Index int
	Value any
}

IndexedValue stores the output of Executables, along with the index of the source Executable for ordering.

type Iterator

type Iterator[T any] struct {
	// MaxGoroutines controls the maximum number of goroutines
	// to use on this Iterator's methods.
	//
	// If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
	MaxGoroutines int
}

Iterator can be used to configure the behaviour of ForEach and ForEachIdx. The zero value is safe to use with reasonable defaults.

Iterator is also safe for reuse and concurrent use.

Example
input := []int{1, 2, 3, 4}
iterator := Iterator[int]{
	MaxGoroutines: len(input) / 2,
}

iterator.ForEach(input, func(v *int) {
	if *v%2 != 0 {
		*v = -1
	}
})

fmt.Println(input)
Output:

[-1 2 -1 4]

func (Iterator[T]) ForEach

func (iter Iterator[T]) ForEach(input []T, f func(*T))

ForEach executes f in parallel over each element in input, using up to the Iterator's configured maximum number of goroutines.

It is safe to mutate the input parameter, which makes it possible to map in place.

It takes roughly 2µs to start up the goroutines and adds an overhead of roughly 50ns per element of input.

func (Iterator[T]) ForEachIdx

func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T))

ForEachIdx is the same as ForEach except it also provides the index of the element to the callback.

type Mapper

type Mapper[T, R any] Iterator[T]

Mapper is an Iterator with a result type R. It can be used to configure the behaviour of Map and MapErr. The zero value is safe to use with reasonable defaults.

Mapper is also safe for reuse and concurrent use.

Example
input := []int{1, 2, 3, 4}
mapper := Mapper[int, bool]{
	MaxGoroutines: len(input) / 2,
}

results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
Output:

[false true false true]

func (Mapper[T, R]) Map

func (m Mapper[T, R]) Map(input []T, f func(*T) R) []R

Map applies f to each element of input, returning the mapped result.

Map uses up to the configured Mapper's maximum number of goroutines.

func (Mapper[T, R]) MapErr

func (m Mapper[T, R]) MapErr(input []T, f func(*T) (R, error)) ([]R, error)

MapErr applies f to each element of the input, returning the mapped result and a combined error of all returned errors.

Map uses up to the configured Mapper's maximum number of goroutines.

type MaxRetriesExceededError

type MaxRetriesExceededError struct {
	// contains filtered or unexported fields
}

MaxRetriesExceededError stores how many times did an Execution run before exceeding the limit. The retries field holds the value.

func (MaxRetriesExceededError) Error

func (err MaxRetriesExceededError) Error() string

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of goroutines used to execute tasks concurrently.

Tasks are submitted with Go(). Once all your tasks have been submitted, you must call Wait() to clean up any spawned goroutines and propagate any panics.

Goroutines are started lazily, so creating a new pool is cheap. There will never be more goroutines spawned than there are tasks submitted.

The configuration methods (With*) will panic if they are used after calling Go() for the first time.

Pool is efficient, but not zero cost. It should not be used for very short tasks. Startup and teardown come with an overhead of around 1µs, and each task has an overhead of around 300ns.

Example
p := NewPool().WithMaxGoroutines(3)
for i := 0; i < 5; i++ {
	p.Exec(func() {
		fmt.Println("conc")
	})
}
p.Wait()
Output:

conc
conc
conc
conc
conc

func NewPool

func NewPool() *Pool

NewPool creates a new Pool.

func (*Pool) Exec

func (p *Pool) Exec(f func())

Exec submits a task to be run in the pool. If all goroutines in the pool are busy, a call to Exec() will block until the task can be started.

func (*Pool) MaxGoroutines

func (p *Pool) MaxGoroutines() int

MaxGoroutines returns the maximum size of the pool.

func (*Pool) Wait

func (p *Pool) Wait()

Wait cleans up spawned goroutines, propagating any panics that were raised by a tasks.

func (*Pool) WithContext

func (p *Pool) WithContext(ctx context.Context) *ContextPool

WithContext converts the pool to a ContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.

func (*Pool) WithErrors

func (p *Pool) WithErrors() *ErrorPool

WithErrors converts the pool to an ErrorPool so the submitted tasks can return errors.

func (*Pool) WithMaxGoroutines

func (p *Pool) WithMaxGoroutines(n int) *Pool

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type ResultContextPool

type ResultContextPool[T any] struct {
	// contains filtered or unexported fields
}

ResultContextPool is a pool that runs tasks that take a context and return a result. The context passed to the task will be canceled if any of the tasks return an error, which makes its functionality different than just capturing a context with the task closure.

The configuration methods (With*) will panic if they are used after calling Go() for the first time.

func (*ResultContextPool[T]) Go

func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error))

Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.

func (*ResultContextPool[T]) Wait

func (p *ResultContextPool[T]) Wait() ([]T, error)

Wait cleans up all spawned goroutines, propagates any panics, and returns an error if any of the tasks errored.

func (*ResultContextPool[T]) WithCancelOnError

func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T]

WithCancelOnError configures the pool to cancel its context as soon as any task returns an error. By default, the pool's context is not canceled until the parent context is canceled.

func (*ResultContextPool[T]) WithCollectErrored

func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T]

WithCollectErrored configures the pool to still collect the result of a task even if the task returned an error. By default, the result of tasks that errored are ignored and only the error is collected.

func (*ResultContextPool[T]) WithFailFast

func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T]

WithFailFast is an alias for the combination of WithFirstError and WithCancelOnError. By default, the errors from all tasks are returned and the pool's context is not canceled until the parent context is canceled.

func (*ResultContextPool[T]) WithFirstError

func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T]

WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.

func (*ResultContextPool[T]) WithMaxGoroutines

func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T]

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type ResultErrorPool

type ResultErrorPool[T any] struct {
	// contains filtered or unexported fields
}

ResultErrorPool is a pool that executes tasks that return a generic result type and an error. Tasks are executed in the pool with Go(), then the results of the tasks are returned by Wait().

The order of the results is not guaranteed to be the same as the order the tasks were submitted. If your use case requires consistent ordering, consider using the `stream` package or `Map` from the `iter` package.

The configuration methods (With*) will panic if they are used after calling Go() for the first time.

func (*ResultErrorPool[T]) Go

func (p *ResultErrorPool[T]) Go(f func() (T, error))

Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.

func (*ResultErrorPool[T]) Wait

func (p *ResultErrorPool[T]) Wait() ([]T, error)

Wait cleans up any spawned goroutines, propagating any panics and returning the results and any errors from tasks.

func (*ResultErrorPool[T]) WithCollectErrored

func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T]

WithCollectErrored configures the pool to still collect the result of a task even if the task returned an error. By default, the result of tasks that errored are ignored and only the error is collected.

func (*ResultErrorPool[T]) WithContext

func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]

WithContext converts the pool to a ResultContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.

func (*ResultErrorPool[T]) WithFirstError

func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T]

WithFirstError configures the pool to only return the first error returned by a task. By default, Wait() will return a combined error.

func (*ResultErrorPool[T]) WithMaxGoroutines

func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T]

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type ResultPool

type ResultPool[T any] struct {
	// contains filtered or unexported fields
}

ResultPool is a pool that executes tasks that return a generic result type. Tasks are executed in the pool with Go(), then the results of the tasks are returned by Wait().

The order of the results is not guaranteed to be the same as the order the tasks were submitted. If your use case requires consistent ordering, consider using the `stream` package or `Map` from the `iter` package.

Example
p := NewWithResults[int]()
for i := 0; i < 10; i++ {
	i := i
	p.Go(func() int {
		return i * 2
	})
}
res := p.Wait()
// Result order is nondeterministic, so sort them first
sort.Ints(res)
fmt.Println(res)
Output:

[0 2 4 6 8 10 12 14 16 18]

func NewWithResults

func NewWithResults[T any]() *ResultPool[T]

NewWithResults creates a new ResultPool for tasks with a result of type T.

The configuration methods (With*) will panic if they are used after calling Go() for the first time.

func (*ResultPool[T]) Go

func (p *ResultPool[T]) Go(f func() T)

Go submits a task to the pool. If all goroutines in the pool are busy, a call to Go() will block until the task can be started.

func (*ResultPool[T]) MaxGoroutines

func (p *ResultPool[T]) MaxGoroutines() int

MaxGoroutines returns the maximum size of the pool.

func (*ResultPool[T]) Wait

func (p *ResultPool[T]) Wait() []T

Wait cleans up all spawned goroutines, propagating any panics, and returning a slice of results from tasks that did not panic.

func (*ResultPool[T]) WithContext

func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T]

WithContext converts the pool to a ResultContextPool for tasks that should run under the same context, such that they each respect shared cancellation. For example, WithCancelOnError can be configured on the returned pool to signal that all goroutines should be cancelled upon the first error.

func (*ResultPool[T]) WithErrors

func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T]

WithErrors converts the pool to an ResultErrorPool so the submitted tasks can return errors.

func (*ResultPool[T]) WithMaxGoroutines

func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T]

WithMaxGoroutines limits the number of goroutines in a pool. Defaults to unlimited. Panics if n < 1.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to execute a stream of tasks concurrently while maintaining the order of the results.

To use a stream, you submit some number of `Task`s, each of which return a callback. Each task will be executed concurrently in the stream's associated Pool, and the callbacks will be executed sequentially in the order the tasks were submitted.

Once all your tasks have been submitted, Wait() must be called to clean up running goroutines and propagate any panics.

In the case of panic during execution of a task or a callback, all other tasks and callbacks will still execute. The panic will be propagated to the caller when Wait() is called.

A Stream is efficient, but not zero cost. It should not be used for very short tasks. Startup and teardown adds an overhead of a couple of microseconds, and the overhead for each task is roughly 500ns. It should be good enough for any task that requires a network call.

Example
times := []int{20, 52, 16, 45, 4, 80}

stream := NewStream()
for _, millis := range times {
	dur := time.Duration(millis) * time.Millisecond
	stream.Exec(func() Callback {
		time.Sleep(dur)
		// This will print in the order the tasks were submitted
		return func() { fmt.Println(dur) }
	})
}
stream.Wait()
Output:

20ms
52ms
16ms
45ms
4ms
80ms

func NewStream

func NewStream() *Stream

NewStream creates a new Stream with default settings.

func (*Stream) Exec

func (s *Stream) Exec(f Task)

Exec schedules a task to be run in the stream's pool. All submitted tasks will be executed concurrently in worker goroutines. Then, the callbacks returned by the tasks will be executed in the order that the tasks were submitted. All callbacks will be executed by the same goroutine, so no synchronization is necessary between callbacks. If all goroutines in the stream's pool are busy, a call to Go() will block until the task can be started.

func (*Stream) Wait

func (s *Stream) Wait()

Wait signals to the stream that all tasks have been submitted. Wait will not return until all tasks and callbacks have been run.

func (*Stream) WithMaxGoroutines

func (s *Stream) WithMaxGoroutines(n int) *Stream

type Task

type Task func() Callback

Task is a task that is submitted to the stream. Submitted tasks will be executed concurrently. It returns a callback that will be called after the task has completed.

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup is the primary building block for scoped concurrency. Goroutines can be spawned in the WaitGroup with the Go method, and calling Wait() will ensure that each of those goroutines exits before continuing. Any panics in a child goroutine will be caught and propagated to the caller of Wait().

The zero value of WaitGroup is usable, just like sync.WaitGroup. Also like sync.WaitGroup, it must not be copied after first use.

func NewWaitGroup

func NewWaitGroup() *WaitGroup

NewWaitGroup creates a new WaitGroup.

func (*WaitGroup) Exec

func (h *WaitGroup) Exec(f func())

Exec spawns a new goroutine in the WaitGroup.

func (*WaitGroup) Wait

func (h *WaitGroup) Wait()

Wait will block until all goroutines spawned with Go exit and will propagate any panics spawned in a child goroutine.

func (*WaitGroup) WaitAndRecover

func (h *WaitGroup) WaitAndRecover() *panics.Recovered

WaitAndRecover will block until all goroutines spawned with Go exit and will return a *panics.Recovered if one of the child goroutines panics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL