stream

package
v0.0.0-...-67058d9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: Apache-2.0 Imports: 3 Imported by: 10

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](stream Stream[T]) ([]T, error)

Collect aggregates a stream into a slice. If an error is hit, the items observed thus far are still returned, but they may not represent the complete set.

func CollectPages

func CollectPages[T any](stream Stream[[]T]) ([]T, error)

CollectPages aggregates a paginated stream into a slice. If an error is hit, the pages observed thus far are still returned, but they may not represent the complete set.

func Drain

func Drain[T any](stream Stream[T]) error

Drain consumes a stream to completion.

func Take

func Take[T any](stream Stream[T], n int) ([]T, bool)

Take takes the next n items from a stream. It returns a slice of the items and the result of the last call to stream.Next().

Types

type Stream

type Stream[T any] interface {
	// Next attempts to advance the stream to the next item. If false is returned,
	// then no more items are available. Next() and Item() must not be called after the
	// first time Next() returns false.
	Next() bool
	// Item gets the current item. Invoking Item() is only safe if Next was previously
	// invoked *and* returned true. Invoking Item() before invoking Next(), or after Next()
	// returned false may cause panics or other unpredictable behavior. Whether or not the
	// item returned is safe for access after the stream is advanced again is dependent
	// on the implementation and should be documented (e.g. an I/O based stream might
	// re-use an underlying buffer).
	Item() T
	// Done checks for any errors that occurred during streaming and informs the stream
	// that we've finished consuming items from it. Invoking Next() or Item() after Done()
	// has been called is not permitted. Done may trigger cleanup operations, but unlike Close()
	// the error reported is specifically related to failures that occurred *during* streaming,
	// meaning that if Done() returns an error, there is a high likelihood that the complete
	// set of values was not observed. For this reason, Done() should always be checked explicitly
	// rather than deferred as Close() might be.
	Done() error
}

Stream is a generic interface for streaming APIs. This package was built with the intention of making it easier to write streaming resource getters, and may not be be suitable for applications outside of that specific usecase. Streams may panic if misused. See the Collect function for an example of the correct consumption pattern.

NOTE: streams almost always perform worse than slices in go. unless you're dealing with a resource that scales linearly with cluster size, you are probably better off just working with slices.

func Chain

func Chain[T any](streams ...Stream[T]) Stream[T]

Chain joins multiple streams in order, fully consuming one before moving to the next.

func Empty

func Empty[T any]() Stream[T]

Empty creates an empty stream (equivalent to Fail(nil)).

func Fail

func Fail[T any](err error) Stream[T]

Fail creates an empty stream that fails immediately with the supplied error.

func FilterMap

func FilterMap[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]

FilterMap maps a stream of type A into a stream of type B, filtering out items when fn returns false.

func Flatten

func Flatten[T any](stream Stream[Stream[T]]) Stream[T]

Flatten flattens a stream of streams into a single stream of items.

func Func

func Func[T any](fn func() (T, error), doneFuncs ...func()) Stream[T]

Func builds a stream from a closure. The supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may cause infinite loops. Cleanup functions may be optionally provided which will be run on close. If wrapping a paginated API, consider using PageFunc instead.

func MapErr

func MapErr[T any](stream Stream[T], fn func(error) error) Stream[T]

MapErr maps over the error returned by Done(). The supplied function is called for all invocations of Done(), meaning that it can change, suppress, or create errors as needed.

func MapWhile

func MapWhile[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]

MapWhile maps a stream of type A into a stream of type B, halting early if fn returns false.

func MergeStreams

func MergeStreams[T any, U, V any](
	streamA Stream[T],
	streamB Stream[U],
	less func(a T, b U) bool,
	convertA func(item T) V,
	convertB func(item U) V,
) Stream[V]

MergeStreams merges two streams and returns a single stream which uses the provided less function to determine which item to yield.

func Once

func Once[T any](item T) Stream[T]

Once creates a stream that yields a single item.

func OnceFunc

func OnceFunc[T any](fn func() (T, error)) Stream[T]

OnceFunc builds a stream from a closure that will yield exactly zero or one items. This stream is the lazy equivalent of the Once/Fail/Empty combinators. A nil error value results in a single-element stream. An error value of io.EOF results in an empty stream. All other error values result in a failing stream.

func PageFunc

func PageFunc[T any](fn func() ([]T, error), doneFuncs ...func()) Stream[T]

PageFunc is equivalent to Func except that it performs internal depagination. As with Func, the supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may result in infinite loops.

func RateLimit

func RateLimit[T any](stream Stream[T], wait func() error) Stream[T]

RateLimit applies a rate-limiting function to a stream s.t. calls to Next() block on the supplied function before calling the inner stream. If the function returns an error, the inner stream is not polled and Next() returns false. The wait function may return io.EOF to indicate a graceful/expected halting condition. Any other error value is treated as unexpected and will be bubbled up via Done() unless an error from the inner stream takes precedence.

func Skip

func Skip[T any](stream Stream[T], n int) Stream[T]

Skip skips the first n items from a stream. Zero/negative values of n have no effect.

func Slice

func Slice[T any](items []T) Stream[T]

Slice constructs a stream from a slice.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL