Documentation ¶
Overview ¶
The stream package provides utilities for working with observable streams. Any type that implements the Observable interface can be transformed and consumed with these utilities.
Index ¶
- Variables
- func AlwaysRetry(err error) bool
- func Discard[T any](ctx context.Context, src Observable[T])
- func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), ...)
- func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
- func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
- type FuncObservable
- type MulticastOpt
- type Observable
- func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]
- func Distinct[T comparable](src Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func Error[T any](err error) Observable[T]
- func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]
- func FromChannel[T any](in <-chan T) Observable[T]
- func FromSlice[T any](items []T) Observable[T]
- func Just[T any](item T) Observable[T]
- func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
- func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))
- func Range(from, to int) Observable[int]
- func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]
- func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
- func Stuck[T any]() Observable[T]
- func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
- func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))
- type RetryFunc
- type ToChannelOpt
Constants ¶
This section is empty.
Variables ¶
var ( // Emit the latest seen item when subscribing. EmitLatest = func(o *mcastOpts) { o.emitLatest = true } )
Multicast options
Functions ¶
func AlwaysRetry ¶
AlwaysRetry always asks for a retry regardless of the error.
func Discard ¶
func Discard[T any](ctx context.Context, src Observable[T])
Discard discards all items from 'src'.
func First ¶
func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
First returns the first item from 'src' observable and then cancels the subscription. If the observable completes without emitting items then io.EOF error is returned.
func Last ¶
func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
Last returns the last item from 'src' observable.
func ObserveWithWaitGroup ¶
func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), complete func(error))
ObserveWithWaitGroup is like Observe(), but adds to a WaitGroup and calls Done() when complete.
func ToChannel ¶
func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
ToChannel converts an observable into a channel.
Types ¶
type FuncObservable ¶
FuncObservable implements the Observable interface with a function.
type MulticastOpt ¶
type MulticastOpt func(o *mcastOpts)
type Observable ¶
type Observable[T any] interface { // Observe a stream of values as long as the given context is valid. // 'next' is called for each item, and finally 'complete' is called // when the stream is complete, or an error has occurred. // // Observable implementations are allowed to call 'next' and 'complete' // from any goroutine, but never concurrently. Observe(ctx context.Context, next func(T), complete func(error)) }
Observable defines the Observe method for observing a stream of values.
func Debounce ¶
func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]
Debounce emits an item only after the specified duration has lapsed since the previous item was emitted. Only the latest item is emitted.
In: a b c d e |-> Out: a d e |->
func Distinct ¶
func Distinct[T comparable](src Observable[T]) Observable[T]
Distinct skips adjacent equal values.
func Empty ¶
func Empty[T any]() Observable[T]
Empty creates an empty observable that completes immediately.
func Error ¶
func Error[T any](err error) Observable[T]
Error creates an observable that fails immediately with given error.
func Filter ¶
func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]
Filter only emits the values for which the provided predicate returns true.
func FromChannel ¶
func FromChannel[T any](in <-chan T) Observable[T]
FromChannel creates an observable from a channel. The channel is consumed by the first observer.
func FromSlice ¶
func FromSlice[T any](items []T) Observable[T]
FromSlice converts a slice into an Observable.
func Map ¶
func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map applies a function onto values of an observable and emits the resulting values.
func Multicast ¶
func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))
Multicast creates an observable that "multicasts" the emitted items to all observers.
func Range ¶
func Range(from, to int) Observable[int]
Range creates an observable that emits integers in range from...to-1.
func Reduce ¶
func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]
Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single item: the state produced by the last call to 'reduce'.
func Retry ¶
func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
Retry resubscribes to the observable if it completes with an error.
func Stuck ¶
func Stuck[T any]() Observable[T]
Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.
func Throttle ¶
func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
Throttle limits the rate at which items are emitted.
func ToMulticast ¶
func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))
ToMulticast makes 'src' a multicast observable, e.g. each observer will observe the same sequence.
type RetryFunc ¶
RetryFunc decides whether the processing should be retried given the error
func BackoffRetry ¶
BackoffRetry retries with an exponential backoff.
func LimitRetries ¶
LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)
type ToChannelOpt ¶
type ToChannelOpt func(*toChannelOpts)
func WithBufferSize ¶
func WithBufferSize(n int) ToChannelOpt
WithBufferSize sets the buffer size of the channel returned by ToChannel.
func WithErrorChan ¶
func WithErrorChan(errCh chan error) ToChannelOpt
WithErrorChan asks ToChannel to send completion error to the provided channel.