Documentation ¶
Index ¶
- func All[T any](in <-chan T, errs ...<-chan error) ([]T, error)
- func Await[T any](ctx context.Context, in <-chan T, errs <-chan error) (T, error)
- func Before[Value any](in <-chan Value, fn func(Value) []Value) <-chan Value
- func BeforeContext[Value any](ctx context.Context, in <-chan Value, fn func(Value) []Value) <-chan Value
- func Concurrent[T any](c chan T) func(context.Context, ...T) error
- func ConcurrentContext[T any](ctx context.Context, c chan T) func(...T) error
- func Drain[T any](ctx context.Context, in <-chan T, errs ...<-chan error) ([]T, error)
- func FanIn[T any](in ...<-chan T) (_ <-chan T, stop func())
- func FanInAll[T any](in ...<-chan T) <-chan T
- func FanInContext[T any](ctx context.Context, in ...<-chan T) <-chan T
- func Filter[T any](in <-chan T, filters ...func(T) bool) <-chan T
- func ForEach[T any](ctx context.Context, fn func(el T), errFn func(error), in <-chan T, ...)
- func Map[To, From any](ctx context.Context, in <-chan From, mapper func(From) To) <-chan To
- func New[T any](in []T) <-chan T
- func NewConcurrent[T any](vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func())
- func NewConcurrentContext[T any](ctx context.Context, vals ...T) (_ <-chan T, _push func(...T) error, _close func())
- func Take[T any](ctx context.Context, n int, in <-chan T, errs ...<-chan error) ([]T, error)
- func Walk[T any](ctx context.Context, walkFn func(T) error, in <-chan T, errs ...<-chan error) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func All ¶ added in v0.2.0
All drains the given channel and returns its elements. All is an alias for Drain(context.Background(), in, errs...).
func Await ¶
Await awaits the next element or error (whatever happens first) from the provided channels. Await returns either an element OR an error, never both. If ctx is canceled before an element is received, ctx.Err() is returned.
func Before ¶
func Before[Value any](in <-chan Value, fn func(Value) []Value) <-chan Value
Before returns a new channel that is filled with the elements from the input channel. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.
If the input channel or fn is nil, the input channel is returned directly.
func BeforeContext ¶
func BeforeContext[Value any](ctx context.Context, in <-chan Value, fn func(Value) []Value) <-chan Value
BeforeContext returns a new channel that is filled with the elements from the input channel. The returned channel is closed when the input channel is closed, or when ctx is canceled. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.
If the input channel or fn is nil, the input channel is returned directly.
func Concurrent ¶ added in v0.2.0
Concurrent returns a `push` function for the provided channel. The `push` function tries to push values into the channel and accepts a Context that can cancel the push operation.
func ConcurrentContext ¶ added in v0.2.0
ConcurrentContext does the same as Concurrent, but uses the provided Context for every push call.
func Drain ¶
Drain drains the given channel and returns its elements.
Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error, the already drained elements and the error are returned. Similarly, when ctx is canceled, the drained elements and ctx.Err() are returned.
Drain returns when the input channel is closed or if it encounters an error and does not wait for the error channels to be closed.
func FanIn ¶
func FanIn[T any](in ...<-chan T) (_ <-chan T, stop func())
FanIn returns a single receive-only channel from multiple receive-only channels. When the returned stop function is called or every input channel is closed, the returned channel is closed.
If len(in) == 0, FanIn returns a closed channel.
Multiple calls to stop have no effect.
func FanInAll ¶
func FanInAll[T any](in ...<-chan T) <-chan T
FanInAll returns FanIn(in...) but without the stop function.
func FanInContext ¶
FanInContext returns a single receive-only channel from multiple receive-only channels. When the provided ctx is canceled or every input channel is closed, the returned channel is closed.
If len(in) == 0, FanInContext returns a closed channel.
func Filter ¶
Filter returns a new channel with the same type as the input channel and fills it with the elements from the input channel. The provided filter functions are called for every element. If any of the filters returns false for an element, that element is not pushed into the returned channel.
func ForEach ¶
func ForEach[T any]( ctx context.Context, fn func(el T), errFn func(error), in <-chan T, errs ...<-chan error, )
ForEach iterates over the provided channels and for every element e calls calls fn(e) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.
func Map ¶
Map maps the elements from the provided `in` channel using the provided `mapper` and sends the mapped values to the returned channel. The returned channel is closed when the input channel is closed or ctx is canceled.
func New ¶
func New[T any](in []T) <-chan T
New returns a channel that is filled with the given values. The channel is closed after all elements have been pushed into the channel.
func NewConcurrent ¶ added in v0.2.0
func NewConcurrent[T any](vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func())
NewConcurrent creates a channel of the given type and returns the channel and a `push` function. The `push` function tries to push a value into the channel and accepts a Context to cancel the push operation. The returned `close` function closes the channel when called. The `close` function is thread-safe and may be called multiple times. If values are provided to NewConcurrent, the buffer of the channel is set to the number of values and the values are pushed into the channel before returning.
str, push, close := NewConcurrent(1, 2, 3) push(context.TODO(), 4, 5, 6) vals, err := All(str) // handle err // vals == []int{1, 2, 3, 4, 5, 6}
Use the Concurrent function to create a `push` function for an existing channel.
func NewConcurrentContext ¶ added in v0.2.0
func NewConcurrentContext[T any](ctx context.Context, vals ...T) (_ <-chan T, _push func(...T) error, _close func())
NewConcurrentContext does the same as NewConcurrent, but uses the provided Context for every push call.
func Take ¶ added in v0.3.7
Take receives elements from the input channel until it has received n elements or the input channel is closed. It returns a slice containing the received elements. If any error occurs during the process, it is returned as the second return value.
func Walk ¶
func Walk[T any]( ctx context.Context, walkFn func(T) error, in <-chan T, errs ...<-chan error, ) error
Walk receives from the given channel until it and and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every element e that is received from the input channel, walkFn(e) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.
Example:
var bus event.Bus in, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz") // handle err err := Walk(context.TODO(), func(e event) { log.Println(fmt.Sprintf("Received %q event: %v", e.Name(), e)) }, in, errs) // handle err
Types ¶
This section is empty.