Documentation
¶
Overview ¶
// Package rxgo is the main RxGo package.
Index ¶
- Constants
- Variables
- func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ...RxAssert)
- func NewSafeSubscriber[T any](onNext OnNextFunc[T], onError OnErrorFunc, onComplete OnCompleteFunc) *safeSubscriber[T]
- func NewSubscriber[T any](bufferCount ...uint) *subscriber[T]
- func Partition[T any](source Observable[T], predicate PredicateFunc[T])
- func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, ...)
- func SwitchScan()
- type AccumulatorFunc
- type AssertPredicate
- type BackpressureStrategy
- type CloseChannelStrategy
- type Comparator
- type ComparatorFunc
- type ComparerFunc
- type CompletedFunc
- type Disposable
- type Disposed
- type Duration
- type DurationFunc
- type Either
- type ErrFunc
- type ErrorFunc
- type FinalizerFunc
- type Func
- type Func2
- type FuncN
- type GroupedObservable
- type IllegalInputError
- type IndexOutOfBoundError
- type Item
- type Iterable
- type Marshaller
- type NextFunc
- type Notification
- type NotificationKind
- type Observable
- func Defer[T any](factory func() Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func ForkJoin[T any](sources ...Observable[T]) Observable[[]T]
- func Iif[T any](condition func() bool, trueObservable Observable[T], ...) Observable[T]
- func Interval(duration time.Duration) Observable[uint]
- func Never[T any]() Observable[T]
- func Of2[T any](item T, items ...T) Observable[T]
- func Pipe[S any, O1 any](stream Observable[S], f1 OperatorFunc[S, any], f ...OperatorFunc[any, any]) Observable[any]
- func Pipe1[S any, O1 any](stream Observable[S], f1 OperatorFunc[S, O1]) Observable[O1]
- func Pipe10[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any, ...](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O10]
- func Pipe2[S any, O1 any, O2 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2]) Observable[O2]
- func Pipe3[S any, O1 any, O2 any, O3 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O3]
- func Pipe4[S any, O1 any, O2 any, O3 any, O4 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O4]
- func Pipe5[S any, O1 any, O2 any, O3 any, O4 any, O5 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O5]
- func Pipe6[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O6]
- func Pipe7[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O7]
- func Pipe8[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O8]
- func Pipe9[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any](stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ...) Observable[O9]
- func Range[T constraints.Unsigned](start, count T) Observable[T]
- func Scheduled[T any](item T, items ...T) Observable[T]
- func Throw[T any](factory ErrorFunc) Observable[T]
- func Timer[N constraints.Unsigned](startDue time.Duration, intervalDuration ...time.Duration) Observable[N]
- type ObservableFunc
- type ObservableNotification
- type ObservationStrategy
- type Observer
- type OnCompleteFunc
- type OnErrorFunc
- type OnErrorStrategy
- type OnNextFunc
- type OperatorFunc
- func Audit[T any, R any](durationSelector DurationFunc[T, R]) OperatorFunc[T, T]
- func AuditTime[T any, R any](duration time.Duration) OperatorFunc[T, T]
- func Buffer[T any, R any](closingNotifier Observable[R]) OperatorFunc[T, []T]
- func BufferCount[T any](bufferSize uint, startBufferEvery ...uint) OperatorFunc[T, []T]
- func BufferTime[T any](bufferTimeSpan time.Duration) OperatorFunc[T, []T]
- func BufferToggle[T any, O any](openings Observable[O], closingSelector func(value O) Observable[O]) OperatorFunc[T, []T]
- func BufferWhen[T any, R any](closingSelector func() Observable[R]) OperatorFunc[T, []T]
- func Catch[T any](catch func(err error, caught Observable[T]) Observable[T]) OperatorFunc[T, T]
- func CombineLatestAll[T any, R any](project func(values []T) R) OperatorFunc[Observable[T], R]
- func CombineLatestWith[T any](sources ...Observable[T]) OperatorFunc[T, []T]
- func ConcatAll[T any]() OperatorFunc[Observable[T], T]
- func ConcatMap[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, R]
- func ConcatWith[T any](sources ...Observable[T]) OperatorFunc[T, T]
- func Count[T any](predicate ...PredicateFunc[T]) OperatorFunc[T, uint]
- func Debounce[T any, R any](durationSelector DurationFunc[T, R]) OperatorFunc[T, T]
- func DebounceTime[T any](duration time.Duration) OperatorFunc[T, T]
- func DefaultIfEmpty[T any](defaultValue T) OperatorFunc[T, T]
- func Delay[T any](duration time.Duration) OperatorFunc[T, T]
- func DelayWhen[T any, R any](delayDurationSelector ProjectionFunc[T, R]) OperatorFunc[T, T]
- func Dematerialize[T any]() OperatorFunc[ObservableNotification[T], T]
- func Distinct[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, T]
- func DistinctUntilChanged[T any](comparator ...ComparatorFunc[T, T]) OperatorFunc[T, T]
- func Do[T any](cb Observer[T]) OperatorFunc[T, T]
- func ElementAt[T any](pos uint, defaultValue ...T) OperatorFunc[T, T]
- func Every[T any](predicate PredicateFunc[T]) OperatorFunc[T, bool]
- func ExhaustAll[T any]() OperatorFunc[Observable[T], T]
- func ExhaustMap[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, R]
- func Expand[T any, R any](project ProjectionFunc[T, R]) OperatorFunc[T, Either[T, R]]
- func Filter[T any](predicate PredicateFunc[T]) OperatorFunc[T, T]
- func Find[T any](predicate PredicateFunc[T]) OperatorFunc[T, Optional[T]]
- func FindIndex[T any](predicate PredicateFunc[T]) OperatorFunc[T, int]
- func First[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]
- func GroupBy[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, GroupedObservable[K, T]]
- func IgnoreElements[T any]() OperatorFunc[T, T]
- func IsEmpty[T any]() OperatorFunc[T, bool]
- func Last[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]
- func Map[T any, R any](mapper func(T, uint) (R, error)) OperatorFunc[T, R]
- func Materialize[T any]() OperatorFunc[T, ObservableNotification[T]]
- func Max[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]
- func MergeMap[T any, R any](project ProjectionFunc[T, R], concurrent ...uint) OperatorFunc[T, R]
- func MergeScan[V any, A any](accumulator func(acc A, value V, index uint) Observable[A], seed A, ...) OperatorFunc[V, A]
- func MergeWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, T]
- func Min[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]
- func PairWise[T any]() OperatorFunc[T, Tuple[T, T]]
- func RaceWith[T any](sources ...Observable[T]) OperatorFunc[T, T]
- func Reduce[V any, A any](accumulator AccumulatorFunc[A, V], seed A) OperatorFunc[V, A]
- func Repeat[T any, C repeatConfig](config ...C) OperatorFunc[T, T]
- func Retry[T any, C retryConfig](config ...C) OperatorFunc[T, T]
- func Sample[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
- func SampleTime[T any](duration time.Duration) OperatorFunc[T, T]
- func Scan[V any, A any](accumulator AccumulatorFunc[A, V], seed A) OperatorFunc[V, A]
- func SequenceEqual[T any](compareTo Observable[T], comparator ...ComparatorFunc[T, T]) OperatorFunc[T, bool]
- func Single[T any](predicate ...func(value T, index uint, source Observable[T]) bool) OperatorFunc[T, T]
- func Skip[T any](count uint) OperatorFunc[T, T]
- func SkipLast[T any](skipCount uint) OperatorFunc[T, T]
- func SkipUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
- func SkipWhile[T any](predicate func(v T, index uint) bool) OperatorFunc[T, T]
- func SwitchAll[T any]() OperatorFunc[Observable[T], T]
- func SwitchMap[T any, R any](project func(value T, index uint) Observable[R]) OperatorFunc[T, R]
- func Take[T any](count uint) OperatorFunc[T, T]
- func TakeLast[T any](count uint) OperatorFunc[T, T]
- func TakeUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
- func TakeWhile[T any](predicate func(value T, index uint) bool) OperatorFunc[T, T]
- func Throttle[T any, R any](durationSelector func(value T) Observable[R]) OperatorFunc[T, T]
- func ThrottleTime[T any](duration time.Duration) OperatorFunc[T, T]
- func ThrowIfEmpty[T any](errorFactory ...ErrorFunc) OperatorFunc[T, T]
- func Timeout[T any, C timeoutConfig[T]](config C) OperatorFunc[T, T]
- func ToSlice[T any]() OperatorFunc[T, []T]
- func WithLatestFrom[A any, B any](input Observable[B]) OperatorFunc[A, Tuple[A, B]]
- func WithTimeInterval[T any]() OperatorFunc[T, TimeInterval[T]]
- func WithTimestamp[T any]() OperatorFunc[T, Timestamp[T]]
- func ZipAll[T any]() OperatorFunc[Observable[T], []T]
- func ZipWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, []T]
- type Option
- type Optional
- type Predicate
- type PredicateFunc
- type Producer
- type ProjectionFunc
- type RepeatConfig
- type RetryConfig
- type RxAssert
- func CustomPredicate(predicate AssertPredicate) RxAssert
- func HasAnError() RxAssert
- func HasError(err error) RxAssert
- func HasErrors(errs ...error) RxAssert
- func HasItem(i interface{}) RxAssert
- func HasItems(items ...interface{}) RxAssert
- func HasItemsNoOrder(items ...interface{}) RxAssert
- func HasNoError() RxAssert
- func IsNotEmpty() RxAssert
- type Subject
- type Subscriber
- type Subscription
- type Supplier
- type TimeInterval
- type TimeoutConfig
- type Timestamp
- type TimestampItem
- type Tuple
- type Unmarshaller
Constants ¶
const Infinite int64 = -1
Infinite represents an infinite wait time
Variables ¶
var ( // An error thrown when an Observable or a sequence was queried but has no elements. ErrEmpty = errors.New("rxgo: empty value") // An error thrown when a value or values are missing from an observable sequence. ErrNotFound = errors.New("rxgo: no values match") ErrSequence = errors.New("rxgo: too many values match") ErrArgumentOutOfRange = errors.New("rxgo: argument out of range") // An error thrown by the timeout operator. ErrTimeout = errors.New("rxgo: timeout") )
Functions ¶
func NewSafeSubscriber ¶
func NewSafeSubscriber[T any](onNext OnNextFunc[T], onError OnErrorFunc, onComplete OnCompleteFunc) *safeSubscriber[T]
func NewSubscriber ¶
func Partition ¶
func Partition[T any](source Observable[T], predicate PredicateFunc[T])
Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate. FIXME: redesign the API
func SendItems ¶
func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{})
SendItems is an utility function that send a list of interface{} and indicate a strategy on whether to close the channel once the function completes.
func SwitchScan ¶
func SwitchScan()
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, emitting values only from the most recently returned Observable.
Types ¶
type AccumulatorFunc ¶
type AssertPredicate ¶
type AssertPredicate func(items []interface{}) error
AssertPredicate is a custom predicate based on the items.
type BackpressureStrategy ¶
type BackpressureStrategy uint32
BackpressureStrategy is the backpressure strategy type.
const ( // Block blocks until the channel is available. Block BackpressureStrategy = iota // Drop drops the message. Drop )
type CloseChannelStrategy ¶
type CloseChannelStrategy uint32
CloseChannelStrategy indicates a strategy on whether to close a channel.
const ( // LeaveChannelOpen indicates to leave the channel open after completion. LeaveChannelOpen CloseChannelStrategy = iota // CloseChannel indicates to close the channel open after completion. CloseChannel )
type Comparator ¶
type Comparator func(interface{}, interface{}) int
Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second
type ComparatorFunc ¶
type ComparerFunc ¶
type Disposable ¶
type Disposable context.CancelFunc
Disposable is a function to be called in order to dispose a subscription.
type Disposed ¶
type Disposed <-chan struct{}
Disposed is a notification channel indicating when an Observable is closed.
type Duration ¶
type Duration interface {
// contains filtered or unexported methods
}
Duration represents a duration
type DurationFunc ¶
type DurationFunc[T any, R any] func(value T) Observable[R]
type FinalizerFunc ¶
type FinalizerFunc func()
type Func ¶
ItemToObservable defines a function that computes an observable from an item. ItemToObservable func(Item) Observable ErrorToObservable defines a function that transforms an observable from an error. ErrorToObservable func(error) Observable Func defines a function that computes a value from an input value.
type FuncN ¶
type FuncN func(...interface{}) interface{}
FuncN defines a function that computes a value from N input values.
type GroupedObservable ¶
type GroupedObservable[K comparable, R any] interface { Observable[R] // Inherit from observable Key() K }
func NewGroupedObservable ¶
func NewGroupedObservable[K comparable, T any](key K, connector func() Subject[T]) GroupedObservable[K, T]
type IllegalInputError ¶
type IllegalInputError struct {
// contains filtered or unexported fields
}
IllegalInputError is triggered when the observable receives an illegal input.
func (IllegalInputError) Error ¶
func (e IllegalInputError) Error() string
type IndexOutOfBoundError ¶
type IndexOutOfBoundError struct {
// contains filtered or unexported fields
}
IndexOutOfBoundError is triggered when the observable cannot access to the specified index.
func (IndexOutOfBoundError) Error ¶
func (e IndexOutOfBoundError) Error() string
type Item ¶
type Item struct { V interface{} E error }
Item is a wrapper having either a value or an error.
func (Item) SendBlocking ¶
SendBlocking sends an item and blocks until it is sent.
func (Item) SendContext ¶
SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.
func (Item) SendNonBlocking ¶
SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.
type Marshaller ¶
Marshaller defines a marshaller type (interface{} to []byte).
type Notification ¶
type Notification[T any] interface { ObservableNotification[T] Send(Subscriber[T]) bool Done() bool }
func Complete ¶
func Complete[T any]() Notification[T]
func Error ¶
func Error[T any](err error) Notification[T]
func Next ¶
func Next[T any](v T) Notification[T]
type NotificationKind ¶
type NotificationKind int
NotificationKind
const ( NextKind NotificationKind = iota ErrorKind CompleteKind )
type Observable ¶
type Observable[T any] interface { SubscribeWith(subscriber Subscriber[T]) SubscribeOn(finalizer ...func()) Subscriber[T] SubscribeSync(onNext func(v T), onError func(err error), onComplete func()) }
func Defer ¶
func Defer[T any](factory func() Observable[T]) Observable[T]
Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.
func Empty ¶
func Empty[T any]() Observable[T]
A simple Observable that emits no items to the Observer and immediately emits a complete notification.
func ForkJoin ¶
func ForkJoin[T any](sources ...Observable[T]) Observable[[]T]
Accepts an Array of ObservableInput or a dictionary Object of ObservableInput and returns an Observable that emits either an array of values in the exact same order as the passed array, or a dictionary of values in the same shape as the passed dictionary.
func Iif ¶
func Iif[T any](condition func() bool, trueObservable Observable[T], falseObservable Observable[T]) Observable[T]
Checks a boolean at subscription time, and chooses between one of two observable sources
func Interval ¶
func Interval(duration time.Duration) Observable[uint]
Interval creates an Observable emitting incremental integers infinitely between each given time interval.
func Never ¶
func Never[T any]() Observable[T]
An Observable that emits no items to the Observer and never completes.
func Pipe ¶
func Pipe[S any, O1 any]( stream Observable[S], f1 OperatorFunc[S, any], f ...OperatorFunc[any, any], ) Observable[any]
If there is a commonly used sequence of operators in your code, use the `Pipe` function to extract the sequence into a new operator. Even if a sequence is not that common, breaking it out into a single operator can improve readability.
func Pipe1 ¶
func Pipe1[S any, O1 any]( stream Observable[S], f1 OperatorFunc[S, O1], ) Observable[O1]
func Pipe10 ¶
func Pipe10[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any, O10 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], f6 OperatorFunc[O5, O6], f7 OperatorFunc[O6, O7], f8 OperatorFunc[O7, O8], f9 OperatorFunc[O8, O9], f10 OperatorFunc[O9, O10], ) Observable[O10]
func Pipe2 ¶
func Pipe2[S any, O1 any, O2 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], ) Observable[O2]
func Pipe3 ¶
func Pipe3[S any, O1 any, O2 any, O3 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], ) Observable[O3]
func Pipe4 ¶
func Pipe4[S any, O1 any, O2 any, O3 any, O4 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], ) Observable[O4]
func Pipe5 ¶
func Pipe5[S any, O1 any, O2 any, O3 any, O4 any, O5 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], ) Observable[O5]
func Pipe6 ¶
func Pipe6[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], f6 OperatorFunc[O5, O6], ) Observable[O6]
func Pipe7 ¶
func Pipe7[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], f6 OperatorFunc[O5, O6], f7 OperatorFunc[O6, O7], ) Observable[O7]
func Pipe8 ¶
func Pipe8[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], f6 OperatorFunc[O5, O6], f7 OperatorFunc[O6, O7], f8 OperatorFunc[O7, O8], ) Observable[O8]
func Pipe9 ¶
func Pipe9[S any, O1 any, O2 any, O3 any, O4 any, O5 any, O6 any, O7 any, O8 any, O9 any]( stream Observable[S], f1 OperatorFunc[S, O1], f2 OperatorFunc[O1, O2], f3 OperatorFunc[O2, O3], f4 OperatorFunc[O3, O4], f5 OperatorFunc[O4, O5], f6 OperatorFunc[O5, O6], f7 OperatorFunc[O6, O7], f8 OperatorFunc[O7, O8], f9 OperatorFunc[O8, O9], ) Observable[O9]
func Range ¶
func Range[T constraints.Unsigned](start, count T) Observable[T]
Creates an Observable that emits a sequence of numbers within a specified range.
func Scheduled ¶
func Scheduled[T any](item T, items ...T) Observable[T]
func Throw ¶
func Throw[T any](factory ErrorFunc) Observable[T]
Creates an observable that will create an error instance and push it to the consumer as an error immediately upon subscription. This creation function is useful for creating an observable that will create an error and error every time it is subscribed to. Generally, inside of most operators when you might want to return an errored observable, this is unnecessary. In most cases, such as in the inner return of `ConcatMap`, `MergeMap`, `Defer`, and many others, you can simply throw the error, and RxGo will pick that up and notify the consumer of the error.
func Timer ¶
func Timer[N constraints.Unsigned](startDue time.Duration, intervalDuration ...time.Duration) Observable[N]
Creates an observable that will wait for a specified time period before emitting the number 0.
type ObservableFunc ¶
type ObservableFunc[T any] func(subscriber Subscriber[T])
type ObservableNotification ¶
type ObservableNotification[T any] interface { Kind() NotificationKind Value() T // returns the underlying value if it's a "Next" notification Err() error IsEnd() bool }
type ObservationStrategy ¶
type ObservationStrategy uint32
ObservationStrategy defines the strategy to consume from an Observable.
const ( // Lazy is the default observation strategy, when an Observer subscribes. Lazy ObservationStrategy = iota // Eager means consuming as soon as the Observable is created. Eager )
type Observer ¶
func NewObserver ¶
func NewObserver[T any](onNext OnNextFunc[T], onError OnErrorFunc, onComplete OnCompleteFunc) Observer[T]
type OnCompleteFunc ¶
type OnCompleteFunc func()
type OnErrorFunc ¶
type OnErrorFunc func(error)
OnErrorFunc defines a function that computes a value from an error.
type OnErrorStrategy ¶
type OnErrorStrategy uint32
OnErrorStrategy is the Observable error strategy.
const ( // StopOnError is the default error strategy. // An operator will stop processing items on error. StopOnError OnErrorStrategy = iota // ContinueOnError means an operator will continue processing items after an error. ContinueOnError )
type OnNextFunc ¶
type OnNextFunc[T any] func(T)
type OperatorFunc ¶
type OperatorFunc[I any, O any] func(source Observable[I]) Observable[O]
func Audit ¶
Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.
func AuditTime ¶
Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.
func Buffer ¶
func Buffer[T any, R any](closingNotifier Observable[R]) OperatorFunc[T, []T]
Buffers the source Observable values until closingNotifier emits.
func BufferCount ¶
Buffers the source Observable values until the size hits the maximum bufferSize given.
func BufferTime ¶
Buffers the source Observable values for a specific time period.
func BufferToggle ¶
func BufferToggle[T any, O any](openings Observable[O], closingSelector func(value O) Observable[O]) OperatorFunc[T, []T]
Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.
func BufferWhen ¶
func BufferWhen[T any, R any](closingSelector func() Observable[R]) OperatorFunc[T, []T]
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.
func Catch ¶
func Catch[T any](catch func(err error, caught Observable[T]) Observable[T]) OperatorFunc[T, T]
Catches errors on the observable to be handled by returning a new observable or throwing an error.
func CombineLatestAll ¶
func CombineLatestAll[T any, R any](project func(values []T) R) OperatorFunc[Observable[T], R]
Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.
func CombineLatestWith ¶
func CombineLatestWith[T any](sources ...Observable[T]) OperatorFunc[T, []T]
Create an observable that combines the latest values from all passed observables and the source into arrays and emits them.
func ConcatAll ¶
func ConcatAll[T any]() OperatorFunc[Observable[T], T]
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
func ConcatMap ¶
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
func ConcatWith ¶
func ConcatWith[T any](sources ...Observable[T]) OperatorFunc[T, T]
Emits all of the values from the source observable, then, once it completes, subscribes to each observable source provided, one at a time, emitting all of their values, and not subscribing to the next one until it completes.
func Count ¶
Counts the number of emissions on the source and emits that number when the source completes.
func Debounce ¶
Emits a notification from the source Observable only after a particular time span determined by another Observable has passed without another source emission.
func DebounceTime ¶
Emits a notification from the source Observable only after a particular time span has passed without another source emission.
func DefaultIfEmpty ¶
func DefaultIfEmpty[T any](defaultValue T) OperatorFunc[T, T]
Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.
func DelayWhen ¶
Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.
func Dematerialize ¶
func Dematerialize[T any]() OperatorFunc[ObservableNotification[T], T]
Converts an Observable of ObservableNotification objects into the emissions that they represent.
func Distinct ¶
func Distinct[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, T]
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
func DistinctUntilChanged ¶
func DistinctUntilChanged[T any](comparator ...ComparatorFunc[T, T]) OperatorFunc[T, T]
Returns a result Observable that emits all values pushed by the source observable if they are distinct in comparison to the last value the result observable emitted.
func ElementAt ¶
Emits the single value at the specified index in a sequence of emissions from the source Observable.
func Every ¶
Returns an Observable that emits whether or not every item of the source satisfies the condition specified.
func ExhaustAll ¶
func ExhaustAll[T any]() OperatorFunc[Observable[T], T]
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
func ExhaustMap ¶
Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.
func Expand ¶
Recursively projects each source value to an Observable which is merged in the output Observable.
func Filter ¶
func Filter[T any](predicate PredicateFunc[T]) OperatorFunc[T, T]
Filter emits only those items from an Observable that pass a predicate test.
func FindIndex ¶
Emits only the index of the first value emitted by the source Observable that meets some condition.
func First ¶
func First[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]
Emits only the first value (or the first value that meets some condition) emitted by the source Observable.
func GroupBy ¶
func GroupBy[T any, K comparable](keySelector func(value T) K) OperatorFunc[T, GroupedObservable[K, T]]
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group. FIXME: maybe we should have a buffer channel
func IgnoreElements ¶
func IgnoreElements[T any]() OperatorFunc[T, T]
Ignores all items emitted by the source Observable and only passes calls of complete or error.
func IsEmpty ¶
Emits false if the input Observable emits any values, or emits true if the input Observable completes without emitting any values.
func Last ¶
func Last[T any](predicate PredicateFunc[T], defaultValue ...T) OperatorFunc[T, T]
Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.
func Materialize ¶
func Materialize[T any]() OperatorFunc[T, ObservableNotification[T]]
Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.
func Max ¶
func Max[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]
The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.
func MergeMap ¶
Projects each source value to an Observable which is merged in the output Observable.
func MergeScan ¶
func MergeScan[V any, A any](accumulator func(acc A, value V, index uint) Observable[A], seed A, concurrent ...uint) OperatorFunc[V, A]
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.
func MergeWith ¶
func MergeWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, T]
FIXME: Merge the values from all observables to a single observable result.
func Min ¶
func Min[T any](comparer ...ComparerFunc[T, T]) OperatorFunc[T, T]
The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.
func PairWise ¶
Groups pairs of consecutive emissions together and emits them as an array of two values.
func RaceWith ¶
func RaceWith[T any](sources ...Observable[T]) OperatorFunc[T, T]
Creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable to which the operator is applied and supplied Observables.
func Reduce ¶
Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.
func Repeat ¶
func Repeat[T any, C repeatConfig](config ...C) OperatorFunc[T, T]
Returns an Observable that will resubscribe to the source stream when the source stream completes.
func Retry ¶
func Retry[T any, C retryConfig](config ...C) OperatorFunc[T, T]
Returns an Observable that mirrors the source Observable with the exception of an error.
func Sample ¶
func Sample[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.
func SampleTime ¶
Emits the most recently emitted value from the source Observable within periodic time intervals.
func Scan ¶
Useful for encapsulating and managing state. Applies an accumulator (or "reducer function") to each value from the source after an initial state is established -- either via a seed value (second argument), or from the first value from the source.
func SequenceEqual ¶
func SequenceEqual[T any](compareTo Observable[T], comparator ...ComparatorFunc[T, T]) OperatorFunc[T, bool]
Compares all values of two observables in sequence using an optional comparator function and returns an observable of a single boolean value representing whether or not the two sequences are equal.
func Single ¶
func Single[T any](predicate ...func(value T, index uint, source Observable[T]) bool) OperatorFunc[T, T]
Returns an observable that asserts that only one value is emitted from the observable that matches the predicate. If no predicate is provided, then it will assert that the observable only emits one value.
func Skip ¶
Returns an Observable that skips the first count items emitted by the source Observable.
func SkipUntil ¶
func SkipUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
func SkipWhile ¶
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
func SwitchAll ¶
func SwitchAll[T any]() OperatorFunc[Observable[T], T]
Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence
func SwitchMap ¶
func SwitchMap[T any, R any](project func(value T, index uint) Observable[R]) OperatorFunc[T, R]
Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
func TakeLast ¶
Waits for the source to complete, then emits the last N values from the source, as specified by the count argument.
func TakeUntil ¶
func TakeUntil[T any, R any](notifier Observable[R]) OperatorFunc[T, T]
Emits the values emitted by the source Observable until a notifier Observable emits a value.
func TakeWhile ¶
Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.
func Throttle ¶
func Throttle[T any, R any](durationSelector func(value T) Observable[R]) OperatorFunc[T, T]
Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.
func ThrottleTime ¶
Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process
func ThrowIfEmpty ¶
func ThrowIfEmpty[T any](errorFactory ...ErrorFunc) OperatorFunc[T, T]
If the source observable completes without emitting a value, it will emit an error. The error will be created at that time by the optional errorFactory argument, otherwise, the error will be `ErrEmpty`.
func Timeout ¶
func Timeout[T any, C timeoutConfig[T]](config C) OperatorFunc[T, T]
Errors if Observable does not emit a value in given time span. FIXME: DATA RACE and send on closed channel
func ToSlice ¶
func ToSlice[T any]() OperatorFunc[T, []T]
Collects all source emissions and emits them as an array when the source completes.
func WithLatestFrom ¶
func WithLatestFrom[A any, B any](input Observable[B]) OperatorFunc[A, Tuple[A, B]]
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
func WithTimeInterval ¶
func WithTimeInterval[T any]() OperatorFunc[T, TimeInterval[T]]
Emits an object containing the current value, and the time that has passed between emitting the current value and the previous value, which is calculated by using the provided scheduler's now() method to retrieve the current time at each emission, then calculating the difference.
func WithTimestamp ¶
Attaches a UTC timestamp to each item emitted by an observable indicating when it was emitted
func ZipAll ¶
func ZipAll[T any]() OperatorFunc[Observable[T], []T]
Collects all observable inner sources from the source, once the source completes, it will subscribe to all inner sources, combining their values by index and emitting them.
func ZipWith ¶
func ZipWith[T any](input Observable[T], inputs ...Observable[T]) OperatorFunc[T, []T]
Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option handles configurable options.
func WithBufferedChannel ¶
WithBufferedChannel allows to configure the capacity of a buffered channel.
func WithCPUPool ¶
func WithCPUPool() Option
WithCPUPool allows to specify an execution pool based on the number of logical CPUs.
func WithContext ¶
WithContext allows to pass a context.
func WithPublishStrategy ¶
func WithPublishStrategy() Option
WithPublishStrategy converts an ordinary Observable into a connectable Observable.
type Predicate ¶
type Predicate func(interface{}) bool
ErrorFunc defines a function that computes a value from an error. ErrorFunc func(error) interface{} Predicate defines a func that returns a bool from an input value.
type PredicateFunc ¶
type ProjectionFunc ¶
type ProjectionFunc[T any, R any] func(value T, index uint) Observable[R]
type RepeatConfig ¶
type RxAssert ¶
type RxAssert interface {
// contains filtered or unexported methods
}
RxAssert lists the Observable assertions.
func CustomPredicate ¶
func CustomPredicate(predicate AssertPredicate) RxAssert
CustomPredicate checks a custom predicate.
func HasAnError ¶
func HasAnError() RxAssert
HasAnError checks that the observable has produce an error.
func HasItem ¶
func HasItem(i interface{}) RxAssert
HasItem checks if a single or optional single has a specific item.
func HasItems ¶
func HasItems(items ...interface{}) RxAssert
HasItems checks that the observable produces the corresponding items.
func HasItemsNoOrder ¶
func HasItemsNoOrder(items ...interface{}) RxAssert
HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order.
func HasNoError ¶
func HasNoError() RxAssert
HasNoError checks that the observable has not raised any error.
func IsNotEmpty ¶
func IsNotEmpty() RxAssert
IsNotEmpty checks that the observable produces some items.
type Subject ¶
type Subject[T any] interface { Subscriber[T] Subscription }
type Subscriber ¶
type Subscriber[T any] interface { Stop() Send() chan<- Notification[T] ForEach() <-chan Notification[T] Closed() <-chan struct{} }
type Subscription ¶
type Subscription interface {
// allow user to unsubscribe the stream manually
Unsubscribe()
}
type TimeInterval ¶
func NewTimeInterval ¶
func NewTimeInterval[T any](value T, elasped time.Duration) TimeInterval[T]
type TimeoutConfig ¶
type TimeoutConfig[T any] struct { With func() Observable[T] Each time.Duration }
type Timestamp ¶
func NewTimestamp ¶
type TimestampItem ¶
TimestampItem attach a timestamp to an item.
type Unmarshaller ¶
Unmarshaller defines an unmarshaller type ([]byte to interface).
Source Files
¶
- assert.go
- conditional.go
- duration.go
- either.go
- error.go
- errors.go
- factory.go
- filter.go
- group.go
- item.go
- iterable.go
- iterable_channel.go
- iterable_create.go
- iterable_defer.go
- iterable_eventsource.go
- iterable_factory.go
- iterable_just.go
- iterable_range.go
- iterable_slice.go
- join.go
- mathematical.go
- notification.go
- observable.go
- observable_old.go
- observable_operator.go
- operator.go
- optional.go
- optionalsingle.go
- options.go
- pipe.go
- rxgo.go
- single.go
- subscriber.go
- time.go
- transformation.go
- tuple.go
- types.go
- util.go