stream

package
v1.14.0-snapshot.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

The stream package provides utilities for working with observable streams. Any type that implements the Observable interface can be transformed and consumed with these utilities.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Emit the latest seen item when subscribing.
	EmitLatest = func(o *mcastOpts) { o.emitLatest = true }
)

Multicast options

Functions

func AlwaysRetry

func AlwaysRetry(err error) bool

AlwaysRetry always asks for a retry regardless of the error.

func Discard

func Discard[T any](ctx context.Context, src Observable[T])

Discard discards all items from 'src'.

func First

func First[T any](ctx context.Context, src Observable[T]) (item T, err error)

First returns the first item from 'src' observable and then cancels the subscription. Blocks until first item is observed or the stream is completed. If the observable completes without emitting items then io.EOF error is returned.

func Last

func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)

Last returns the last item from 'src' observable. Blocks until the stream has been completed. If no items are observed then io.EOF error is returned.

func ObserveWithWaitGroup

func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), complete func(error))

ObserveWithWaitGroup is like Observe(), but adds to a WaitGroup and calls Done() when complete.

func ToChannel

func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T

ToChannel converts an observable into a channel. When the provided context is cancelled the underlying subscription is cancelled and the channel is closed. To receive completion errors use WithErrorChan.

items <- ToChannel(ctx, Range(1,4))
a := <- items
b := <- items
c := <- items
_, ok := <- items
  => a=1, b=2, c=3, ok=false

func ToSlice

func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)

ToSlice converts an Observable into a slice.

ToSlice(ctx, Range(1,4))
  => ([]int{1,2,3}, nil)

Types

type FuncObservable

type FuncObservable[T any] func(context.Context, func(T), func(error))

FuncObservable implements the Observable interface with a function.

This provides a convenient way of creating new observables without having to introduce a new type:

 var Ones Observable[int] =
 	FuncObservable[int](
		func(ctx context.Context, next func(int), complete func(error)) {
			go func() {
				defer complete(nil)
				for ctx.Err() == nil {
					next(1)
				}
			}()
		})

versus with a new type:

type onesObservable struct {}

func (o onesObservable) Observe(ctx context.Context, next func(int), complete func(error)) {
	go func() {
		defer complete(nil)
		for ctx.Err() == nil {
			next(1)
		}
	}()
}

func (FuncObservable[T]) Observe

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T), complete func(error))

type MulticastOpt

type MulticastOpt func(o *mcastOpts)

type Observable

type Observable[T any] interface {
	// Observe a stream of values as long as the given context is valid.
	// 'next' is called for each item, and finally 'complete' is called
	// when the stream is complete, or an error has occurred.
	//
	// Observable implementations are allowed to call 'next' and 'complete'
	// from any goroutine, but never concurrently.
	Observe(ctx context.Context, next func(T), complete func(error))
}

Observable defines the Observe method for observing a stream of values.

Also see https://reactivex.io/documentation/observable.html for in-depth description of observables.

For interactive diagrams see https://rxmarbles.com/.

func Debounce

func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]

Debounce emits an item only after the specified duration has lapsed since the previous item was emitted. Only the latest item is emitted.

In: a b c d e |-> Out: a d e |->

func Distinct

func Distinct[T comparable](src Observable[T]) Observable[T]

Distinct skips adjacent equal values.

Distinct(FromSlice([]int{1,1,2,2,3})
  => [1,2,3]

func Empty

func Empty[T any]() Observable[T]

Empty creates an "empty" observable that completes immediately.

xs, err := ToSlice(Empty[int]())
  => xs == []int{}, err == nil

func Error

func Error[T any](err error) Observable[T]

Error creates an observable that fails immediately with given error.

failErr = errors.New("fail")
xs, err := ToSlice(ctx, Error[int](failErr))
  => xs == []int{}, err == failErr

func Filter

func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]

Filter only emits the values for which the provided predicate returns true.

Filter(Range(1,4), func(x int) int { return x%2 == 0 })
  => [2]

func FromChannel

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an observable from a channel. The channel is consumed by the first observer.

values := make(chan int)
go func() {
	values <- 1
	values <- 2
	values <- 3
	close(values)
}()
obs := FromChannel(values)
xs, err := ToSlice(ctx, obs)
  => xs == []int{1,2,3}, err == nil

xs, err = ToSlice(ctx, obs)
  => xs == []int{}, err == nil

func FromSlice

func FromSlice[T any](items []T) Observable[T]

FromSlice converts a slice into an Observable.

ToSlice(ctx, FromSlice([]int{1,2,3})
  => []int{1,2,3}

func Just

func Just[T any](item T) Observable[T]

Just creates an observable that emits a single item and completes.

xs, err := ToSlice(ctx, Just(1))
  => xs == []int{1}, err == nil

func Map

func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]

Map applies a function onto values of an observable and emits the resulting values.

Map(Range(1,4), func(x int) int { return x * 2})
  => [2,4,6]

func Multicast

func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))

Multicast creates an observable that "multicasts" the emitted items to all observers.

mcast, next, complete := Multicast[int]()
next(1) // no observers, none receives this
sub1 := ToChannel(ctx, mcast, WithBufferSize(10))
sub2 := ToChannel(ctx, mcast, WithBufferSize(10))
next(2)
next(3)
complete(nil)
  => sub1 == sub2 == [2,3]

mcast, next, complete = Multicast[int](EmitLatest)
next(1)
next(2) // "EmitLatest" tells Multicast to keep this
x, err := First(ctx, mcast)
  => x == 2, err == nil

func Range

func Range(from, to int) Observable[int]

Range creates an observable that emits integers in range from...to-1.

ToSlice(ctx, Range(1,2,3)) => []int{1,2,3}

func Reduce

func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]

Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single item: the state produced by the last call to 'reduce'.

Reduce(Range(1,4), 0, func(sum, item int) int { return sum + item })
  => [(0+1+2+3)] => [6]

func Retry

func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]

Retry resubscribes to the observable if it completes with an error.

func Stuck

func Stuck[T any]() Observable[T]

Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.

func Throttle

func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]

Throttle limits the rate at which items are emitted.

func ToMulticast

func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))

ToMulticast makes 'src' a multicast observable, e.g. each observer will observe the same sequence. Useful for fanning out items to multiple observers from a source that is consumed by the act of observing.

mcast, connect := ToMulticast(FromChannel(values))
a := ToSlice(mcast)
b := ToSlice(mcast)
connect(ctx) // start!
  => a == b

type RetryFunc

type RetryFunc func(err error) bool

RetryFunc decides whether the processing should be retried given the error

func BackoffRetry

func BackoffRetry(shouldRetry RetryFunc, minBackoff, maxBackoff time.Duration) RetryFunc

BackoffRetry retries with an exponential backoff.

func LimitRetries

func LimitRetries(shouldRetry RetryFunc, numRetries int) RetryFunc

LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)

type ToChannelOpt

type ToChannelOpt func(*toChannelOpts)

func WithBufferSize

func WithBufferSize(n int) ToChannelOpt

WithBufferSize sets the buffer size of the channel returned by ToChannel.

func WithErrorChan

func WithErrorChan(errCh chan error) ToChannelOpt

WithErrorChan asks ToChannel to send completion error to the provided channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL