Documentation ¶
Overview ¶
The stream package provides utilities for working with observable streams. Any type that implements the Observable interface can be transformed and consumed with these utilities.
Index ¶
- Variables
- func AlwaysRetry(err error) bool
- func Discard[T any](ctx context.Context, src Observable[T])
- func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
- func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), ...)
- func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
- func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
- func ToTruncatingChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
- type FuncObservable
- type MulticastOpt
- type Observable
- func Buffer[Buf any, T any](src Observable[T], bufferSize int, waitTime time.Duration, ...) Observable[Buf]
- func Concat[T any](srcs ...Observable[T]) Observable[T]
- func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]
- func Distinct[T comparable](src Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func Error[T any](err error) Observable[T]
- func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]
- func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
- func FromChannel[T any](in <-chan T) Observable[T]
- func FromSlice[T any](items []T) Observable[T]
- func Just[T any](item T) Observable[T]
- func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
- func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))
- func Range(from, to int) Observable[int]
- func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]
- func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
- func Stuck[T any]() Observable[T]
- func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
- func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))
- type RetryFunc
- type ToChannelOpt
Constants ¶
This section is empty.
Variables ¶
var ( // Emit the latest seen item when subscribing. EmitLatest = func(o *mcastOpts) { o.emitLatest = true } )
Multicast options
Functions ¶
func AlwaysRetry ¶
AlwaysRetry always asks for a retry regardless of the error.
func Discard ¶
func Discard[T any](ctx context.Context, src Observable[T])
Discard discards all items from 'src'.
func First ¶
func First[T any](ctx context.Context, src Observable[T]) (item T, err error)
First returns the first item from 'src' observable and then cancels the subscription. Blocks until first item is observed or the stream is completed. If the observable completes without emitting items then io.EOF error is returned.
func Last ¶
func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)
Last returns the last item from 'src' observable. Blocks until the stream has been completed. If no items are observed then io.EOF error is returned.
func ObserveWithWaitGroup ¶
func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), complete func(error))
ObserveWithWaitGroup is like Observe(), but adds to a WaitGroup and calls Done() when complete.
func ToChannel ¶
func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
ToChannel converts an observable into a channel. When the provided context is cancelled the underlying subscription is cancelled and the channel is closed. To receive completion errors use WithErrorChan.
items <- ToChannel(ctx, Range(1,4)) a := <- items b := <- items c := <- items _, ok := <- items => a=1, b=2, c=3, ok=false
func ToSlice ¶
func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
ToSlice converts an Observable into a slice.
ToSlice(ctx, Range(1,4)) => ([]int{1,2,3}, nil)
func ToTruncatingChannel ¶
func ToTruncatingChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
ToTruncatingChannel is like ToChannel but with a local buffer to decouple the source observable from the observer. It is useful when the source observable cannot be delayed by a slow consumer and it is safe for the consumer to lose intermediate items while busy.
Types ¶
type FuncObservable ¶
FuncObservable implements the Observable interface with a function.
This provides a convenient way of creating new observables without having to introduce a new type:
var Ones Observable[int] = FuncObservable[int]( func(ctx context.Context, next func(int), complete func(error)) { go func() { defer complete(nil) for ctx.Err() == nil { next(1) } }() })
versus with a new type:
type onesObservable struct {} func (o onesObservable) Observe(ctx context.Context, next func(int), complete func(error)) { go func() { defer complete(nil) for ctx.Err() == nil { next(1) } }() }
type MulticastOpt ¶
type MulticastOpt func(o *mcastOpts)
type Observable ¶
type Observable[T any] interface { // Observe a stream of values as long as the given context is valid. // 'next' is called for each item, and finally 'complete' is called // when the stream is complete, or an error has occurred. // // Observable implementations are allowed to call 'next' and 'complete' // from any goroutine, but never concurrently. Observe(ctx context.Context, next func(T), complete func(error)) }
Observable defines the Observe method for observing a stream of values.
Also see https://reactivex.io/documentation/observable.html for in-depth description of observables.
For interactive diagrams see https://rxmarbles.com/.
func Buffer ¶
func Buffer[Buf any, T any]( src Observable[T], bufferSize int, waitTime time.Duration, bufferItem func(Buf, T) Buf) Observable[Buf]
Buffer collects items into a buffer using the given buffering function and emits the buffer when 'waitTime' has elapsed. Buffer does not emit empty buffers.
In: a b c |-> Out: [a,b] [c] |->
func Concat ¶
func Concat[T any](srcs ...Observable[T]) Observable[T]
Concat takes one or more observable of the same type and emits the items from each of them in order.
func Debounce ¶
func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]
Debounce emits an item only after the specified duration has lapsed since the previous item was emitted. Only the latest item is emitted.
In: a b c d e |-> Out: a d e |->
func Distinct ¶
func Distinct[T comparable](src Observable[T]) Observable[T]
Distinct skips adjacent equal values.
Distinct(FromSlice([]int{1,1,2,2,3}) => [1,2,3]
func Empty ¶
func Empty[T any]() Observable[T]
Empty creates an "empty" observable that completes immediately.
xs, err := ToSlice(Empty[int]()) => xs == []int{}, err == nil
func Error ¶
func Error[T any](err error) Observable[T]
Error creates an observable that fails immediately with given error.
failErr = errors.New("fail") xs, err := ToSlice(ctx, Error[int](failErr)) => xs == []int{}, err == failErr
func Filter ¶
func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]
Filter only emits the values for which the provided predicate returns true.
Filter(Range(1,4), func(x int) int { return x%2 == 0 }) => [2]
func FlatMap ¶
func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the 'apply' function is flattened to produce a flat stream of Bs.
func FromChannel ¶
func FromChannel[T any](in <-chan T) Observable[T]
FromChannel creates an observable from a channel. The channel is consumed by the first observer.
values := make(chan int) go func() { values <- 1 values <- 2 values <- 3 close(values) }() obs := FromChannel(values) xs, err := ToSlice(ctx, obs) => xs == []int{1,2,3}, err == nil xs, err = ToSlice(ctx, obs) => xs == []int{}, err == nil
func FromSlice ¶
func FromSlice[T any](items []T) Observable[T]
FromSlice converts a slice into an Observable.
ToSlice(ctx, FromSlice([]int{1,2,3}) => []int{1,2,3}
func Just ¶
func Just[T any](item T) Observable[T]
Just creates an observable that emits a single item and completes.
xs, err := ToSlice(ctx, Just(1)) => xs == []int{1}, err == nil
func Map ¶
func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map applies a function onto values of an observable and emits the resulting values.
Map(Range(1,4), func(x int) int { return x * 2}) => [2,4,6]
func Multicast ¶
func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))
Multicast creates an observable that "multicasts" the emitted items to all observers.
mcast, next, complete := Multicast[int]() next(1) // no observers, none receives this sub1 := ToChannel(ctx, mcast, WithBufferSize(10)) sub2 := ToChannel(ctx, mcast, WithBufferSize(10)) next(2) next(3) complete(nil) => sub1 == sub2 == [2,3] mcast, next, complete = Multicast[int](EmitLatest) next(1) next(2) // "EmitLatest" tells Multicast to keep this x, err := First(ctx, mcast) => x == 2, err == nil
func Range ¶
func Range(from, to int) Observable[int]
Range creates an observable that emits integers in range from...to-1.
ToSlice(ctx, Range(1,2,3)) => []int{1,2,3}
func Reduce ¶
func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]
Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single item: the state produced by the last call to 'reduce'.
Reduce(Range(1,4), 0, func(sum, item int) int { return sum + item }) => [(0+1+2+3)] => [6]
func Retry ¶
func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]
Retry resubscribes to the observable if it completes with an error.
func Stuck ¶
func Stuck[T any]() Observable[T]
Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.
func Throttle ¶
func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]
Throttle limits the rate at which items are emitted.
func ToMulticast ¶
func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))
ToMulticast makes 'src' a multicast observable, e.g. each observer will observe the same sequence. Useful for fanning out items to multiple observers from a source that is consumed by the act of observing.
mcast, connect := ToMulticast(FromChannel(values)) a := ToSlice(mcast) b := ToSlice(mcast) connect(ctx) // start! => a == b
type RetryFunc ¶
RetryFunc decides whether the processing should be retried given the error
func BackoffRetry ¶
BackoffRetry retries with an exponential backoff.
func LimitRetries ¶
LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)
type ToChannelOpt ¶
type ToChannelOpt func(*toChannelOpts)
func WithBufferSize ¶
func WithBufferSize(n int) ToChannelOpt
WithBufferSize sets the buffer size of the channel returned by ToChannel.
func WithErrorChan ¶
func WithErrorChan(errCh chan error) ToChannelOpt
WithErrorChan asks ToChannel to send completion error to the provided channel.