Documentation ¶
Index ¶
- func Collect[T any](stream Stream[T]) ([]T, error)
- func CollectPages[T any](stream Stream[[]T]) ([]T, error)
- func Drain[T any](stream Stream[T]) error
- func Take[T any](stream Stream[T], n int) ([]T, bool)
- type Stream
- func Chain[T any](streams ...Stream[T]) Stream[T]
- func Empty[T any]() Stream[T]
- func Fail[T any](err error) Stream[T]
- func FilterMap[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]
- func Flatten[T any](stream Stream[Stream[T]]) Stream[T]
- func Func[T any](fn func() (T, error), doneFuncs ...func()) Stream[T]
- func MapErr[T any](stream Stream[T], fn func(error) error) Stream[T]
- func MapWhile[A, B any](stream Stream[A], fn func(A) (B, bool)) Stream[B]
- func MergeStreams[T any, U, V any](streamA Stream[T], streamB Stream[U], less func(a T, b U) bool, ...) Stream[V]
- func Once[T any](item T) Stream[T]
- func OnceFunc[T any](fn func() (T, error)) Stream[T]
- func PageFunc[T any](fn func() ([]T, error), doneFuncs ...func()) Stream[T]
- func RateLimit[T any](stream Stream[T], wait func() error) Stream[T]
- func Skip[T any](stream Stream[T], n int) Stream[T]
- func Slice[T any](items []T) Stream[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Collect ¶
Collect aggregates a stream into a slice. If an error is hit, the items observed thus far are still returned, but they may not represent the complete set.
func CollectPages ¶
CollectPages aggregates a paginated stream into a slice. If an error is hit, the pages observed thus far are still returned, but they may not represent the complete set.
Types ¶
type Stream ¶
type Stream[T any] interface { // Next attempts to advance the stream to the next item. If false is returned, // then no more items are available. Next() and Item() must not be called after the // first time Next() returns false. Next() bool // Item gets the current item. Invoking Item() is only safe if Next was previously // invoked *and* returned true. Invoking Item() before invoking Next(), or after Next() // returned false may cause panics or other unpredictable behavior. Whether or not the // item returned is safe for access after the stream is advanced again is dependent // on the implementation and should be documented (e.g. an I/O based stream might // re-use an underlying buffer). Item() T // Done checks for any errors that occurred during streaming and informs the stream // that we've finished consuming items from it. Invoking Next() or Item() after Done() // has been called is not permitted. Done may trigger cleanup operations, but unlike Close() // the error reported is specifically related to failures that occurred *during* streaming, // meaning that if Done() returns an error, there is a high likelihood that the complete // set of values was not observed. For this reason, Done() should always be checked explicitly // rather than deferred as Close() might be. Done() error }
Stream is a generic interface for streaming APIs. This package was built with the intention of making it easier to write streaming resource getters, and may not be be suitable for applications outside of that specific usecase. Streams may panic if misused. See the Collect function for an example of the correct consumption pattern.
NOTE: streams almost always perform worse than slices in go. unless you're dealing with a resource that scales linearly with cluster size, you are probably better off just working with slices.
func FilterMap ¶
FilterMap maps a stream of type A into a stream of type B, filtering out items when fn returns false.
func Func ¶
Func builds a stream from a closure. The supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may cause infinite loops. Cleanup functions may be optionally provided which will be run on close. If wrapping a paginated API, consider using PageFunc instead.
func MapErr ¶
MapErr maps over the error returned by Done(). The supplied function is called for all invocations of Done(), meaning that it can change, suppress, or create errors as needed.
func MapWhile ¶
MapWhile maps a stream of type A into a stream of type B, halting early if fn returns false.
func MergeStreams ¶
func MergeStreams[T any, U, V any]( streamA Stream[T], streamB Stream[U], less func(a T, b U) bool, convertA func(item T) V, convertB func(item U) V, ) Stream[V]
MergeStreams merges two streams and returns a single stream which uses the provided less function to determine which item to yield.
func OnceFunc ¶
OnceFunc builds a stream from a closure that will yield exactly zero or one items. This stream is the lazy equivalent of the Once/Fail/Empty combinators. A nil error value results in a single-element stream. An error value of io.EOF results in an empty stream. All other error values result in a failing stream.
func PageFunc ¶
PageFunc is equivalent to Func except that it performs internal depagination. As with Func, the supplied closure *must* return io.EOF if no more items are available. Failure to return io.EOF (or some other error) may result in infinite loops.
func RateLimit ¶
RateLimit applies a rate-limiting function to a stream s.t. calls to Next() block on the supplied function before calling the inner stream. If the function returns an error, the inner stream is not polled and Next() returns false. The wait function may return io.EOF to indicate a graceful/expected halting condition. Any other error value is treated as unexpected and will be bubbled up via Done() unless an error from the inner stream takes precedence.