stream

package
v1.13.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

The stream package provides utilities for working with observable streams. Any type that implements the Observable interface can be transformed and consumed with these utilities.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Emit the latest seen item when subscribing.
	EmitLatest = func(o *mcastOpts) { o.emitLatest = true }
)

Multicast options

Functions

func AlwaysRetry

func AlwaysRetry(err error) bool

AlwaysRetry always asks for a retry regardless of the error.

func Discard

func Discard[T any](ctx context.Context, src Observable[T])

Discard discards all items from 'src'.

func First

func First[T any](ctx context.Context, src Observable[T]) (item T, err error)

First returns the first item from 'src' observable and then cancels the subscription. If the observable completes without emitting items then io.EOF error is returned.

func Last

func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)

Last returns the last item from 'src' observable.

func ObserveWithWaitGroup

func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), complete func(error))

ObserveWithWaitGroup is like Observe(), but adds to a WaitGroup and calls Done() when complete.

func ToChannel

func ToChannel[T any](ctx context.Context, errs chan<- error, src Observable[T]) <-chan T

ToChannel converts an observable into an item of channels. Error, or nil if normal completion, is delivered to the supplied error channel.

func ToSlice

func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)

ToSlice converts an Observable into a slice.

Types

type FuncObservable

type FuncObservable[T any] func(context.Context, func(T), func(error))

FuncObservable implements the Observable interface with a function.

func (FuncObservable[T]) Observe

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T), complete func(error))

type MulticastOpt

type MulticastOpt func(o *mcastOpts)

type Observable

type Observable[T any] interface {
	// Observe a stream of values as long as the given context is valid.
	// 'next' is called for each item, and finally 'complete' is called
	// when the stream is complete, or an error has occurred.
	//
	// Observable implementations are allowed to call 'next' and 'complete'
	// from any goroutine, but never concurrently.
	Observe(ctx context.Context, next func(T), complete func(error))
}

Observable defines the Observe method for observing a stream of values.

func Debounce

func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]

Debounce emits an item only after the specified duration has lapsed since the previous item was emitted. Only the latest item is emitted.

In: a b c d e |-> Out: a d e |->

func Distinct

func Distinct[T comparable](src Observable[T]) Observable[T]

Distinct skips adjacent equal values.

func Empty

func Empty[T any]() Observable[T]

Empty creates an empty observable that completes immediately.

func Error

func Error[T any](err error) Observable[T]

Error creates an observable that fails immediately with given error.

func Filter

func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]

Filter only emits the values for which the provided predicate returns true.

func FromChannel

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an observable from a channel. The channel is consumed by the first observer.

func FromSlice

func FromSlice[T any](items []T) Observable[T]

FromSlice converts a slice into an Observable.

func Just

func Just[T any](item T) Observable[T]

Just creates an observable with a single item.

func Map

func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]

Map applies a function onto values of an observable and emits the resulting values.

func Multicast

func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))

Multicast creates an observable that "multicasts" the emitted items to all observers.

func Range

func Range(from, to int) Observable[int]

Range creates an observable that emits integers in range from...to-1.

func Reduce

func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]

Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single item: the state produced by the last call to 'reduce'.

func Retry

func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]

Retry resubscribes to the observable if it completes with an error.

func Stuck

func Stuck[T any]() Observable[T]

Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.

func Throttle

func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]

Throttle limits the rate at which items are emitted.

func ToMulticast

func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))

ToMulticast makes 'src' a multicast observable, e.g. each observer will observe the same sequence.

type RetryFunc

type RetryFunc func(err error) bool

RetryFunc decides whether the processing should be retried given the error

func BackoffRetry

func BackoffRetry(shouldRetry RetryFunc, minBackoff, maxBackoff time.Duration) RetryFunc

BackoffRetry retries with an exponential backoff.

func LimitRetries

func LimitRetries(shouldRetry RetryFunc, numRetries int) RetryFunc

LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL