streams

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: Apache-2.0 Imports: 2 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Await

func Await[T any](ctx context.Context, in <-chan T, errs <-chan error) (T, error)

Await awaits the next element or error (whatever happens first) from the provided channels. Await returns either an element OR an error, never both. If ctx is canceled before an element is received, ctx.Err() is returned.

func Before

func Before[Value any](in <-chan Value, fn func(Value) []Value) <-chan Value

Before returns a new channel that is filled with the elements from the input channel. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.

If the input channel or fn is nil, the input channel is returned directly.

func BeforeContext

func BeforeContext[Value any](ctx context.Context, in <-chan Value, fn func(Value) []Value) <-chan Value

BeforeContext returns a new channel that is filled with the elements from the input channel. The returned channel is closed when the input channel is closed, or when ctx is canceled. Before sending an element into the returned channel, fn(el) is called. The values returned by fn are first sent into the returned channel, then the element from the input channel.

If the input channel or fn is nil, the input channel is returned directly.

func Drain

func Drain[T any](ctx context.Context, in <-chan T, errs ...<-chan error) ([]T, error)

Drain drains the given channel and returns its elements.

Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error, the already drained elements and the error are returned. Similarly, when ctx is canceled, the drained elements and ctx.Err() are returned.

Drain returns when the input channel is closed or if it encounters an error and does not wait for the error channels to be closed.

func FanIn

func FanIn[T any](in ...<-chan T) (_ <-chan T, stop func())

FanIn returns a single receive-only channel from multiple receive-only channels. When the returned stop function is called or every input channel is closed, the returned channel is closed.

If len(in) == 0, FanIn returns a closed channel.

Multiple calls to stop have no effect.

func FanInAll

func FanInAll[T any](in ...<-chan T) <-chan T

FanInAll returns FanIn(in...) but without the stop function.

func FanInContext

func FanInContext[T any](ctx context.Context, in ...<-chan T) <-chan T

FanInContext returns a single receive-only channel from multiple receive-only channels. When the provided ctx is canceled or every input channel is closed, the returned channel is closed.

If len(in) == 0, FanInContext returns a closed channel.

func Filter

func Filter[T any](in <-chan T, filters ...func(T) bool) <-chan T

Filter returns a new channel with the same type as the input channel and fills it with the elements from the input channel. The provided filter functions are called for every element. If any of the filters returns false for an element, that element is not pushed into the returned channel.

func ForEach

func ForEach[T any](
	ctx context.Context,
	fn func(el T),
	errFn func(error),
	in <-chan T,
	errs ...<-chan error,
)

ForEach iterates over the provided channels and for every element e calls calls fn(e) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.

func Map

func Map[To, From any](ctx context.Context, in <-chan From, mapper func(From) To) <-chan To

Map maps the elements from the provided `in` channel using the provided `mapper` and sends the mapped values to the returned channel. The returned channel is closed when the input channel is closed or ctx is canceled.

func New

func New[T any](in []T) <-chan T

New returns a channel that is filled with the given values. The channel is closed after all elements have been pushed into the channel.

func Walk

func Walk[T any](
	ctx context.Context,
	walkFn func(T) error,
	in <-chan T,
	errs ...<-chan error,
) error

Walk receives from the given channel until it and and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every element e that is received from the input channel, walkFn(e) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Example:

var bus event.Bus
in, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")
// handle err
err := Walk(context.TODO(), func(e event) {
	log.Println(fmt.Sprintf("Received %q event: %v", e.Name(), e))
}, in, errs)
// handle err

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL