Documentation
¶
Overview ¶
Package rxgo is the main RxGo package.
Index ¶
- Variables
- func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ...RxAssert)
- func Just(items ...interface{}) func(opts ...Option) Observable
- func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, ...)
- type AssertPredicate
- type BackpressureStrategy
- type CloseChannelStrategy
- type Comparator
- type CompletedFunc
- type Disposable
- type Disposed
- type Duration
- type ErrFunc
- type ErrorFunc
- type ErrorToObservable
- type Func
- type Func2
- type FuncN
- type GroupedObservable
- type IllegalInputError
- type IndexOutOfBoundError
- type Item
- type ItemToObservable
- type Iterable
- type Marshaller
- type NextFunc
- type Observable
- func Amb(observables []Observable, opts ...Option) Observable
- func CombineLatest(f FuncN, observables []Observable, opts ...Option) Observable
- func Concat(observables []Observable, opts ...Option) Observable
- func Create(f []Producer, opts ...Option) Observable
- func Defer(f []Producer, opts ...Option) Observable
- func Empty() Observable
- func FromChannel(next <-chan Item, opts ...Option) Observable
- func FromEventSource(next <-chan Item, opts ...Option) Observable
- func Interval(interval Duration, opts ...Option) Observable
- func Merge(observables []Observable, opts ...Option) Observable
- func Never() Observable
- func Range(start, count int, opts ...Option) Observable
- func Start(fs []Supplier, opts ...Option) Observable
- func Thrown(err error) Observable
- func Timer(d Duration, opts ...Option) Observable
- type ObservableImpl
- func (o *ObservableImpl) All(predicate Predicate, opts ...Option) Single
- func (o *ObservableImpl) AverageFloat32(opts ...Option) Single
- func (o *ObservableImpl) AverageFloat64(opts ...Option) Single
- func (o *ObservableImpl) AverageInt(opts ...Option) Single
- func (o *ObservableImpl) AverageInt16(opts ...Option) Single
- func (o *ObservableImpl) AverageInt32(opts ...Option) Single
- func (o *ObservableImpl) AverageInt64(opts ...Option) Single
- func (o *ObservableImpl) AverageInt8(opts ...Option) Single
- func (o *ObservableImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option) Observable
- func (o *ObservableImpl) BufferWithCount(count int, opts ...Option) Observable
- func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Observable
- func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
- func (o *ObservableImpl) Connect(ctx context.Context) (context.Context, Disposable)
- func (o *ObservableImpl) Contains(equal Predicate, opts ...Option) Single
- func (o *ObservableImpl) Count(opts ...Option) Single
- func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable
- func (o *ObservableImpl) DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable
- func (o *ObservableImpl) Distinct(apply Func, opts ...Option) Observable
- func (o *ObservableImpl) DistinctUntilChanged(apply Func, opts ...Option) Observable
- func (o *ObservableImpl) DoOnCompleted(completedFunc CompletedFunc, opts ...Option) Disposed
- func (o *ObservableImpl) DoOnError(errFunc ErrFunc, opts ...Option) Disposed
- func (o *ObservableImpl) DoOnNext(nextFunc NextFunc, opts ...Option) Disposed
- func (o *ObservableImpl) ElementAt(index uint, opts ...Option) Single
- func (o *ObservableImpl) Error(opts ...Option) error
- func (o *ObservableImpl) Errors(opts ...Option) []error
- func (o *ObservableImpl) Filter(apply Predicate, opts ...Option) Observable
- func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle
- func (o *ObservableImpl) First(opts ...Option) OptionalSingle
- func (o *ObservableImpl) FirstOrDefault(defaultValue interface{}, opts ...Option) Single
- func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observable
- func (o *ObservableImpl) ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, ...) Disposed
- func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
- func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
- func (o *ObservableImpl) IgnoreElements(opts ...Option) Observable
- func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, ...) Observable
- func (o *ObservableImpl) Last(opts ...Option) OptionalSingle
- func (o *ObservableImpl) LastOrDefault(defaultValue interface{}, opts ...Option) Single
- func (o *ObservableImpl) Map(apply Func, opts ...Option) Observable
- func (o *ObservableImpl) Marshal(marshaller Marshaller, opts ...Option) Observable
- func (o *ObservableImpl) Max(comparator Comparator, opts ...Option) OptionalSingle
- func (o *ObservableImpl) Min(comparator Comparator, opts ...Option) OptionalSingle
- func (o *ObservableImpl) Observe(opts ...Option) <-chan Item
- func (o *ObservableImpl) OnErrorResumeNext(resumeSequence ErrorToObservable, opts ...Option) Observable
- func (o *ObservableImpl) OnErrorReturn(resumeFunc ErrorFunc, opts ...Option) Observable
- func (o *ObservableImpl) OnErrorReturnItem(resume interface{}, opts ...Option) Observable
- func (o *ObservableImpl) Reduce(apply Func2, opts ...Option) OptionalSingle
- func (o *ObservableImpl) Repeat(count int64, frequency Duration, opts ...Option) Observable
- func (o *ObservableImpl) Retry(count int, shouldRetry func(error) bool, opts ...Option) Observable
- func (o *ObservableImpl) Run(opts ...Option) Disposed
- func (o *ObservableImpl) Sample(iterable Iterable, opts ...Option) Observable
- func (o *ObservableImpl) Scan(apply Func2, opts ...Option) Observable
- func (o *ObservableImpl) Send(output chan<- Item, opts ...Option)
- func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single
- func (o *ObservableImpl) Serialize(from int, identifier func(interface{}) int, opts ...Option) Observable
- func (o *ObservableImpl) Skip(nth uint, opts ...Option) Observable
- func (o *ObservableImpl) SkipLast(nth uint, opts ...Option) Observable
- func (o *ObservableImpl) SkipWhile(apply Predicate, opts ...Option) Observable
- func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable
- func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle
- func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle
- func (o *ObservableImpl) SumInt64(opts ...Option) OptionalSingle
- func (o *ObservableImpl) Take(nth uint, opts ...Option) Observable
- func (o *ObservableImpl) TakeLast(nth uint, opts ...Option) Observable
- func (o *ObservableImpl) TakeUntil(apply Predicate, opts ...Option) Observable
- func (o *ObservableImpl) TakeWhile(apply Predicate, opts ...Option) Observable
- func (o *ObservableImpl) TimeInterval(opts ...Option) Observable
- func (o *ObservableImpl) Timestamp(opts ...Option) Observable
- func (o *ObservableImpl) ToMap(keySelector Func, opts ...Option) Single
- func (o *ObservableImpl) ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single
- func (o *ObservableImpl) ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error)
- func (o *ObservableImpl) Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable
- func (o *ObservableImpl) WindowWithCount(count int, opts ...Option) Observable
- func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable
- func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
- func (o *ObservableImpl) ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable
- type ObservationStrategy
- type OnErrorStrategy
- type Option
- func Serialize(identifier func(interface{}) int) Option
- func WithBackPressureStrategy(strategy BackpressureStrategy) Option
- func WithBufferedChannel(capacity int) Option
- func WithCPUPool() Option
- func WithContext(ctx context.Context) Option
- func WithErrorStrategy(strategy OnErrorStrategy) Option
- func WithObservationStrategy(strategy ObservationStrategy) Option
- func WithPool(pool int) Option
- func WithPublishStrategy() Option
- type OptionalSingle
- type OptionalSingleImpl
- type Predicate
- type Producer
- type RxAssert
- func CustomPredicate(predicate AssertPredicate) RxAssert
- func HasAnError() RxAssert
- func HasError(err error) RxAssert
- func HasErrors(errs ...error) RxAssert
- func HasItem(i interface{}) RxAssert
- func HasItems(items ...interface{}) RxAssert
- func HasItemsNoOrder(items ...interface{}) RxAssert
- func HasNoError() RxAssert
- func IsEmpty() RxAssert
- func IsNotEmpty() RxAssert
- type Single
- type SingleImpl
- type Supplier
- type TimestampItem
- type Unmarshaller
Constants ¶
This section is empty.
Variables ¶
var Infinite int64 = -1
Infinite represents an infinite wait time
var OptionalSingleEmpty = Item{}
OptionalSingleEmpty is the constant returned when an OptionalSingle is empty.
Functions ¶
func Just ¶
func Just(items ...interface{}) func(opts ...Option) Observable
Just creates an Observable with the provided items.
Types ¶
type AssertPredicate ¶
type AssertPredicate func(items []interface{}) error
AssertPredicate is a custom predicate based on the items.
type BackpressureStrategy ¶
type BackpressureStrategy uint32
BackpressureStrategy is the backpressure strategy type.
const ( // Block blocks until the channel is available. Block BackpressureStrategy = iota // Drop drops the message. Drop )
type CloseChannelStrategy ¶
type CloseChannelStrategy uint32
CloseChannelStrategy indicates a strategy on whether to close a channel.
const ( // LeaveChannelOpen indicates to leave the channel open after completion. LeaveChannelOpen CloseChannelStrategy = iota // CloseChannel indicates to close the channel open after completion. CloseChannel )
type Comparator ¶
type Comparator func(interface{}, interface{}) int
Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second
type Disposable ¶
type Disposable context.CancelFunc
Disposable is a function to be called in order to dispose a subscription.
type Disposed ¶
type Disposed <-chan struct{}
Disposed is a notification channel indicating when an Observable is closed.
type Duration ¶
type Duration interface {
// contains filtered or unexported methods
}
Duration represents a duration
type ErrorFunc ¶
type ErrorFunc func(error) interface{}
ErrorFunc defines a function that computes a value from an error.
type ErrorToObservable ¶
type ErrorToObservable func(error) Observable
ErrorToObservable defines a function that transforms an observable from an error.
type FuncN ¶
type FuncN func(...interface{}) interface{}
FuncN defines a function that computes a value from N input values.
type GroupedObservable ¶ added in v2.2.0
type GroupedObservable struct { Observable // Key is the distribution key Key string }
GroupedObservable is the observable type emitted by the GroupByDynamic operator.
type IllegalInputError ¶
type IllegalInputError struct {
// contains filtered or unexported fields
}
IllegalInputError is triggered when the observable receives an illegal input.
func (IllegalInputError) Error ¶
func (e IllegalInputError) Error() string
type IndexOutOfBoundError ¶
type IndexOutOfBoundError struct {
// contains filtered or unexported fields
}
IndexOutOfBoundError is triggered when the observable cannot access to the specified index.
func (IndexOutOfBoundError) Error ¶
func (e IndexOutOfBoundError) Error() string
type Item ¶
type Item struct { V interface{} E error }
Item is a wrapper having either a value or an error.
func (Item) SendBlocking ¶
SendBlocking sends an item and blocks until it is sent.
func (Item) SendContext ¶
SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.
func (Item) SendNonBlocking ¶
SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.
type ItemToObservable ¶
type ItemToObservable func(Item) Observable
ItemToObservable defines a function that computes an observable from an item.
type Marshaller ¶
Marshaller defines a marshaller type (interface{} to []byte).
type Observable ¶
type Observable interface { Iterable All(predicate Predicate, opts ...Option) Single AverageFloat32(opts ...Option) Single AverageFloat64(opts ...Option) Single AverageInt(opts ...Option) Single AverageInt8(opts ...Option) Single AverageInt16(opts ...Option) Single AverageInt32(opts ...Option) Single AverageInt64(opts ...Option) Single BackOffRetry(backOffCfg backoff.BackOff, opts ...Option) Observable BufferWithCount(count int, opts ...Option) Observable BufferWithTime(timespan Duration, opts ...Option) Observable BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable Connect(ctx context.Context) (context.Context, Disposable) Contains(equal Predicate, opts ...Option) Single Count(opts ...Option) Single Debounce(timespan Duration, opts ...Option) Observable DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable Distinct(apply Func, opts ...Option) Observable DistinctUntilChanged(apply Func, opts ...Option) Observable DoOnCompleted(completedFunc CompletedFunc, opts ...Option) Disposed DoOnError(errFunc ErrFunc, opts ...Option) Disposed DoOnNext(nextFunc NextFunc, opts ...Option) Disposed ElementAt(index uint, opts ...Option) Single Error(opts ...Option) error Errors(opts ...Option) []error Filter(apply Predicate, opts ...Option) Observable Find(find Predicate, opts ...Option) OptionalSingle First(opts ...Option) OptionalSingle FirstOrDefault(defaultValue interface{}, opts ...Option) Single FlatMap(apply ItemToObservable, opts ...Option) Observable ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed GroupBy(length int, distribution func(Item) int, opts ...Option) Observable GroupByDynamic(distribution func(Item) string, opts ...Option) Observable IgnoreElements(opts ...Option) Observable Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable Last(opts ...Option) OptionalSingle LastOrDefault(defaultValue interface{}, opts ...Option) Single Map(apply Func, opts ...Option) Observable Marshal(marshaller Marshaller, opts ...Option) Observable Max(comparator Comparator, opts ...Option) OptionalSingle Min(comparator Comparator, opts ...Option) OptionalSingle OnErrorResumeNext(resumeSequence ErrorToObservable, opts ...Option) Observable OnErrorReturn(resumeFunc ErrorFunc, opts ...Option) Observable OnErrorReturnItem(resume interface{}, opts ...Option) Observable Reduce(apply Func2, opts ...Option) OptionalSingle Repeat(count int64, frequency Duration, opts ...Option) Observable Retry(count int, shouldRetry func(error) bool, opts ...Option) Observable Run(opts ...Option) Disposed Sample(iterable Iterable, opts ...Option) Observable Scan(apply Func2, opts ...Option) Observable SequenceEqual(iterable Iterable, opts ...Option) Single Send(output chan<- Item, opts ...Option) Serialize(from int, identifier func(interface{}) int, opts ...Option) Observable Skip(nth uint, opts ...Option) Observable SkipLast(nth uint, opts ...Option) Observable SkipWhile(apply Predicate, opts ...Option) Observable StartWith(iterable Iterable, opts ...Option) Observable SumFloat32(opts ...Option) OptionalSingle SumFloat64(opts ...Option) OptionalSingle SumInt64(opts ...Option) OptionalSingle Take(nth uint, opts ...Option) Observable TakeLast(nth uint, opts ...Option) Observable TakeUntil(apply Predicate, opts ...Option) Observable TakeWhile(apply Predicate, opts ...Option) Observable TimeInterval(opts ...Option) Observable Timestamp(opts ...Option) Observable ToMap(keySelector Func, opts ...Option) Single ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error) Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable WindowWithCount(count int, opts ...Option) Observable WindowWithTime(timespan Duration, opts ...Option) Observable WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable }
Observable is the standard interface for Observables.
func Amb ¶
func Amb(observables []Observable, opts ...Option) Observable
Amb takes several Observables, emit all of the items from only the first of these Observables to emit an item or notification.
func CombineLatest ¶
func CombineLatest(f FuncN, observables []Observable, opts ...Option) Observable
CombineLatest combines the latest item emitted by each Observable via a specified function and emit items based on the results of this function.
func Concat ¶
func Concat(observables []Observable, opts ...Option) Observable
Concat emits the emissions from two or more Observables without interleaving them.
func Create ¶
func Create(f []Producer, opts ...Option) Observable
Create creates an Observable from scratch by calling observer methods programmatically.
func Defer ¶
func Defer(f []Producer, opts ...Option) Observable
Defer does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer.
func Empty ¶
func Empty() Observable
Empty creates an Observable with no item and terminate immediately.
func FromChannel ¶
func FromChannel(next <-chan Item, opts ...Option) Observable
FromChannel creates a cold observable from a channel.
func FromEventSource ¶
func FromEventSource(next <-chan Item, opts ...Option) Observable
FromEventSource creates a hot observable from a channel.
func Interval ¶
func Interval(interval Duration, opts ...Option) Observable
Interval creates an Observable emitting incremental integers infinitely between each given time interval.
func Merge ¶
func Merge(observables []Observable, opts ...Option) Observable
Merge combines multiple Observables into one by merging their emissions
func Never ¶
func Never() Observable
Never creates an Observable that emits no items and does not terminate.
func Range ¶
func Range(start, count int, opts ...Option) Observable
Range creates an Observable that emits count sequential integers beginning at start.
func Start ¶
func Start(fs []Supplier, opts ...Option) Observable
Start creates an Observable from one or more directive-like Supplier and emits the result of each operation asynchronously on a new Observable.
func Thrown ¶
func Thrown(err error) Observable
Thrown creates an Observable that emits no items and terminates with an error.
func Timer ¶
func Timer(d Duration, opts ...Option) Observable
Timer returns an Observable that completes after a specified delay.
type ObservableImpl ¶
type ObservableImpl struct {
// contains filtered or unexported fields
}
ObservableImpl implements Observable.
func (*ObservableImpl) All ¶
func (o *ObservableImpl) All(predicate Predicate, opts ...Option) Single
All determines whether all items emitted by an Observable meet some criteria.
func (*ObservableImpl) AverageFloat32 ¶
func (o *ObservableImpl) AverageFloat32(opts ...Option) Single
AverageFloat32 calculates the average of numbers emitted by an Observable and emits the average float32.
func (*ObservableImpl) AverageFloat64 ¶
func (o *ObservableImpl) AverageFloat64(opts ...Option) Single
AverageFloat64 calculates the average of numbers emitted by an Observable and emits the average float64.
func (*ObservableImpl) AverageInt ¶
func (o *ObservableImpl) AverageInt(opts ...Option) Single
AverageInt calculates the average of numbers emitted by an Observable and emits the average int.
func (*ObservableImpl) AverageInt16 ¶
func (o *ObservableImpl) AverageInt16(opts ...Option) Single
AverageInt16 calculates the average of numbers emitted by an Observable and emits the average int16.
func (*ObservableImpl) AverageInt32 ¶
func (o *ObservableImpl) AverageInt32(opts ...Option) Single
AverageInt32 calculates the average of numbers emitted by an Observable and emits the average int32.
func (*ObservableImpl) AverageInt64 ¶
func (o *ObservableImpl) AverageInt64(opts ...Option) Single
AverageInt64 calculates the average of numbers emitted by an Observable and emits this average int64.
func (*ObservableImpl) AverageInt8 ¶
func (o *ObservableImpl) AverageInt8(opts ...Option) Single
AverageInt8 calculates the average of numbers emitted by an Observable and emits the≤ average int8.
func (*ObservableImpl) BackOffRetry ¶
func (o *ObservableImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option) Observable
BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.
func (*ObservableImpl) BufferWithCount ¶
func (o *ObservableImpl) BufferWithCount(count int, opts ...Option) Observable
BufferWithCount returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits buffers every skip items, each containing a slice of count items. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.
func (*ObservableImpl) BufferWithTime ¶
func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Observable
BufferWithTime returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable starts a new buffer periodically, as determined by the timeshift argument. It emits each buffer after a fixed timespan, specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.
func (*ObservableImpl) BufferWithTimeOrCount ¶
func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
BufferWithTimeOrCount returns an Observable that emits buffers of items it collects from the source Observable either from a given count or at a given time interval.
func (*ObservableImpl) Connect ¶
func (o *ObservableImpl) Connect(ctx context.Context) (context.Context, Disposable)
Connect instructs a connectable Observable to begin emitting items to its subscribers.
func (*ObservableImpl) Contains ¶
func (o *ObservableImpl) Contains(equal Predicate, opts ...Option) Single
Contains determines whether an Observable emits a particular item or not.
func (*ObservableImpl) Count ¶
func (o *ObservableImpl) Count(opts ...Option) Single
Count counts the number of items emitted by the source Observable and emit only this value.
func (*ObservableImpl) Debounce ¶
func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable
Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item.
func (*ObservableImpl) DefaultIfEmpty ¶
func (o *ObservableImpl) DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable
DefaultIfEmpty returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.
func (*ObservableImpl) Distinct ¶
func (o *ObservableImpl) Distinct(apply Func, opts ...Option) Observable
Distinct suppresses duplicate items in the original Observable and returns a new Observable.
func (*ObservableImpl) DistinctUntilChanged ¶
func (o *ObservableImpl) DistinctUntilChanged(apply Func, opts ...Option) Observable
DistinctUntilChanged suppresses consecutive duplicate items in the original Observable. Cannot be run in parallel.
func (*ObservableImpl) DoOnCompleted ¶
func (o *ObservableImpl) DoOnCompleted(completedFunc CompletedFunc, opts ...Option) Disposed
DoOnCompleted registers a callback action that will be called once the Observable terminates.
func (*ObservableImpl) DoOnError ¶
func (o *ObservableImpl) DoOnError(errFunc ErrFunc, opts ...Option) Disposed
DoOnError registers a callback action that will be called if the Observable terminates abnormally.
func (*ObservableImpl) DoOnNext ¶
func (o *ObservableImpl) DoOnNext(nextFunc NextFunc, opts ...Option) Disposed
DoOnNext registers a callback action that will be called on each item emitted by the Observable.
func (*ObservableImpl) ElementAt ¶
func (o *ObservableImpl) ElementAt(index uint, opts ...Option) Single
ElementAt emits only item n emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl) Error ¶
func (o *ObservableImpl) Error(opts ...Option) error
Error returns the eventual Observable error. This method is blocking.
func (*ObservableImpl) Errors ¶
func (o *ObservableImpl) Errors(opts ...Option) []error
Errors returns an eventual list of Observable errors. This method is blocking
func (*ObservableImpl) Filter ¶
func (o *ObservableImpl) Filter(apply Predicate, opts ...Option) Observable
Filter emits only those items from an Observable that pass a predicate test.
func (*ObservableImpl) Find ¶ added in v2.3.0
func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle
Find emits the first item passing a predicate then complete.
func (*ObservableImpl) First ¶
func (o *ObservableImpl) First(opts ...Option) OptionalSingle
First returns new Observable which emit only first item. Cannot be run in parallel.
func (*ObservableImpl) FirstOrDefault ¶
func (o *ObservableImpl) FirstOrDefault(defaultValue interface{}, opts ...Option) Single
FirstOrDefault returns new Observable which emit only first item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.
func (*ObservableImpl) FlatMap ¶
func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observable
FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.
func (*ObservableImpl) ForEach ¶
func (o *ObservableImpl) ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
ForEach subscribes to the Observable and receives notifications for each element.
func (*ObservableImpl) GroupBy ¶
func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.
func (*ObservableImpl) GroupByDynamic ¶ added in v2.2.0
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
func (*ObservableImpl) IgnoreElements ¶
func (o *ObservableImpl) IgnoreElements(opts ...Option) Observable
IgnoreElements ignores all items emitted by the source ObservableSource except for the errors. Cannot be run in parallel.
func (*ObservableImpl) Join ¶
func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
Join combines items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable. The time is extracted using a timeExtractor function.
func (*ObservableImpl) Last ¶
func (o *ObservableImpl) Last(opts ...Option) OptionalSingle
Last returns a new Observable which emit only last item. Cannot be run in parallel.
func (*ObservableImpl) LastOrDefault ¶
func (o *ObservableImpl) LastOrDefault(defaultValue interface{}, opts ...Option) Single
LastOrDefault returns a new Observable which emit only last item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.
func (*ObservableImpl) Map ¶
func (o *ObservableImpl) Map(apply Func, opts ...Option) Observable
Map transforms the items emitted by an Observable by applying a function to each item.
func (*ObservableImpl) Marshal ¶
func (o *ObservableImpl) Marshal(marshaller Marshaller, opts ...Option) Observable
Marshal transforms the items emitted by an Observable by applying a marshalling to each item.
func (*ObservableImpl) Max ¶
func (o *ObservableImpl) Max(comparator Comparator, opts ...Option) OptionalSingle
Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
func (*ObservableImpl) Min ¶
func (o *ObservableImpl) Min(comparator Comparator, opts ...Option) OptionalSingle
Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.
func (*ObservableImpl) Observe ¶
func (o *ObservableImpl) Observe(opts ...Option) <-chan Item
Observe observes an Observable by returning its channel.
func (*ObservableImpl) OnErrorResumeNext ¶
func (o *ObservableImpl) OnErrorResumeNext(resumeSequence ErrorToObservable, opts ...Option) Observable
OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking onError if it encounters an error.
func (*ObservableImpl) OnErrorReturn ¶
func (o *ObservableImpl) OnErrorReturn(resumeFunc ErrorFunc, opts ...Option) Observable
OnErrorReturn instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.
func (*ObservableImpl) OnErrorReturnItem ¶
func (o *ObservableImpl) OnErrorReturnItem(resume interface{}, opts ...Option) Observable
OnErrorReturnItem instructs on Observable to emit an item if it encounters an error.
func (*ObservableImpl) Reduce ¶
func (o *ObservableImpl) Reduce(apply Func2, opts ...Option) OptionalSingle
Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.
func (*ObservableImpl) Repeat ¶
func (o *ObservableImpl) Repeat(count int64, frequency Duration, opts ...Option) Observable
Repeat returns an Observable that repeats the sequence of items emitted by the source Observable at most count times, at a particular frequency. Cannot run in parallel.
func (*ObservableImpl) Retry ¶
func (o *ObservableImpl) Retry(count int, shouldRetry func(error) bool, opts ...Option) Observable
Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.
func (*ObservableImpl) Run ¶
func (o *ObservableImpl) Run(opts ...Option) Disposed
Run creates an Observer without consuming the emitted items.
func (*ObservableImpl) Sample ¶
func (o *ObservableImpl) Sample(iterable Iterable, opts ...Option) Observable
Sample returns an Observable that emits the most recent items emitted by the source Iterable whenever the input Iterable emits an item.
func (*ObservableImpl) Scan ¶
func (o *ObservableImpl) Scan(apply Func2, opts ...Option) Observable
Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value. Cannot be run in parallel.
func (*ObservableImpl) Send ¶
func (o *ObservableImpl) Send(output chan<- Item, opts ...Option)
Send sends the items to a given channel.
func (*ObservableImpl) SequenceEqual ¶
func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single
SequenceEqual emits true if an Observable and the input Observable emit the same items, in the same order, with the same termination state. Otherwise, it emits false.
func (*ObservableImpl) Serialize ¶
func (o *ObservableImpl) Serialize(from int, identifier func(interface{}) int, opts ...Option) Observable
Serialize forces an Observable to make serialized calls and to be well-behaved.
func (*ObservableImpl) Skip ¶
func (o *ObservableImpl) Skip(nth uint, opts ...Option) Observable
Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.
func (*ObservableImpl) SkipLast ¶
func (o *ObservableImpl) SkipLast(nth uint, opts ...Option) Observable
SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.
func (*ObservableImpl) SkipWhile ¶
func (o *ObservableImpl) SkipWhile(apply Predicate, opts ...Option) Observable
SkipWhile discard items emitted by an Observable until a specified condition becomes false. Cannot be run in parallel.
func (*ObservableImpl) StartWith ¶
func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable
StartWith emits a specified Iterable before beginning to emit the items from the source Observable.
func (*ObservableImpl) SumFloat32 ¶
func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle
SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.
func (*ObservableImpl) SumFloat64 ¶
func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle
SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.
func (*ObservableImpl) SumInt64 ¶
func (o *ObservableImpl) SumInt64(opts ...Option) OptionalSingle
SumInt64 calculates the average of integers emitted by an Observable and emits an int64.
func (*ObservableImpl) Take ¶
func (o *ObservableImpl) Take(nth uint, opts ...Option) Observable
Take emits only the first n items emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl) TakeLast ¶
func (o *ObservableImpl) TakeLast(nth uint, opts ...Option) Observable
TakeLast emits only the last n items emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl) TakeUntil ¶
func (o *ObservableImpl) TakeUntil(apply Predicate, opts ...Option) Observable
TakeUntil returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. Cannot be run in parallel.
func (*ObservableImpl) TakeWhile ¶
func (o *ObservableImpl) TakeWhile(apply Predicate, opts ...Option) Observable
TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied. Cannot be run in parallel.
func (*ObservableImpl) TimeInterval ¶
func (o *ObservableImpl) TimeInterval(opts ...Option) Observable
TimeInterval converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.
func (*ObservableImpl) Timestamp ¶
func (o *ObservableImpl) Timestamp(opts ...Option) Observable
Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted.
func (*ObservableImpl) ToMap ¶
func (o *ObservableImpl) ToMap(keySelector Func, opts ...Option) Single
ToMap convert the sequence of items emitted by an Observable into a map keyed by a specified key function. Cannot be run in parallel.
func (*ObservableImpl) ToMapWithValueSelector ¶
func (o *ObservableImpl) ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single
ToMapWithValueSelector convert the sequence of items emitted by an Observable into a map keyed by a specified key function and valued by another value function. Cannot be run in parallel.
func (*ObservableImpl) ToSlice ¶
func (o *ObservableImpl) ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error)
ToSlice collects all items from an Observable and emit them in a slice and an optional error. Cannot be run in parallel.
func (*ObservableImpl) Unmarshal ¶
func (o *ObservableImpl) Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable
Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.
func (*ObservableImpl) WindowWithCount ¶
func (o *ObservableImpl) WindowWithCount(count int, opts ...Option) Observable
WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows rather than emitting the items one at a time.
func (*ObservableImpl) WindowWithTime ¶
func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable
WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.
func (*ObservableImpl) WindowWithTimeOrCount ¶
func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size and emit them rather than emitting the items one at a time.
func (*ObservableImpl) ZipFromIterable ¶
func (o *ObservableImpl) ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable
ZipFromIterable merges the emissions of an Iterable via a specified function and emit single items for each combination based on the results of this function.
type ObservationStrategy ¶
type ObservationStrategy uint32
ObservationStrategy defines the strategy to consume from an Observable.
const ( // Lazy is the default observation strategy, when an Observer subscribes. Lazy ObservationStrategy = iota // Eager means consuming as soon as the Observable is created. Eager )
type OnErrorStrategy ¶
type OnErrorStrategy uint32
OnErrorStrategy is the Observable error strategy.
const ( // StopOnError is the default error strategy. // An operator will stop processing items on error. StopOnError OnErrorStrategy = iota // ContinueOnError means an operator will continue processing items after an error. ContinueOnError )
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option handles configurable options.
func WithBackPressureStrategy ¶
func WithBackPressureStrategy(strategy BackpressureStrategy) Option
WithBackPressureStrategy sets the back pressure strategy: drop or block.
func WithBufferedChannel ¶
WithBufferedChannel allows to configure the capacity of a buffered channel.
func WithCPUPool ¶
func WithCPUPool() Option
WithCPUPool allows to specify an execution pool based on the number of logical CPUs.
func WithContext ¶
WithContext allows to pass a context.
func WithErrorStrategy ¶
func WithErrorStrategy(strategy OnErrorStrategy) Option
WithErrorStrategy defines how an observable should deal with error. This strategy is propagated to the parent observable.
func WithObservationStrategy ¶
func WithObservationStrategy(strategy ObservationStrategy) Option
WithObservationStrategy uses the eager observation mode meaning consuming the items even without subscription.
func WithPublishStrategy ¶
func WithPublishStrategy() Option
WithPublishStrategy converts an ordinary Observable into a connectable Observable.
type OptionalSingle ¶
type OptionalSingle interface { Iterable Get(opts ...Option) (Item, error) Map(apply Func, opts ...Option) OptionalSingle Run(opts ...Option) Disposed }
OptionalSingle is an optional single.
type OptionalSingleImpl ¶
type OptionalSingleImpl struct {
// contains filtered or unexported fields
}
OptionalSingleImpl implements OptionalSingle.
func (*OptionalSingleImpl) Get ¶
func (o *OptionalSingleImpl) Get(opts ...Option) (Item, error)
Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled. This method is blocking.
func (*OptionalSingleImpl) Map ¶
func (o *OptionalSingleImpl) Map(apply Func, opts ...Option) OptionalSingle
Map transforms the items emitted by an OptionalSingle by applying a function to each item.
func (*OptionalSingleImpl) Observe ¶
func (o *OptionalSingleImpl) Observe(opts ...Option) <-chan Item
Observe observes an OptionalSingle by returning its channel.
func (*OptionalSingleImpl) Run ¶
func (o *OptionalSingleImpl) Run(opts ...Option) Disposed
Run creates an observer without consuming the emitted items.
type Predicate ¶
type Predicate func(interface{}) bool
Predicate defines a func that returns a bool from an input value.
type RxAssert ¶
type RxAssert interface {
// contains filtered or unexported methods
}
RxAssert lists the Observable assertions.
func CustomPredicate ¶
func CustomPredicate(predicate AssertPredicate) RxAssert
CustomPredicate checks a custom predicate.
func HasAnError ¶
func HasAnError() RxAssert
HasAnError checks that the observable has produce an error.
func HasItem ¶
func HasItem(i interface{}) RxAssert
HasItem checks if a single or optional single has a specific item.
func HasItems ¶
func HasItems(items ...interface{}) RxAssert
HasItems checks that the observable produces the corresponding items.
func HasItemsNoOrder ¶
func HasItemsNoOrder(items ...interface{}) RxAssert
HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order.
func HasNoError ¶
func HasNoError() RxAssert
HasNoError checks that the observable has not raised any error.
func IsNotEmpty ¶
func IsNotEmpty() RxAssert
IsNotEmpty checks that the observable produces some items.
type Single ¶
type Single interface { Iterable Filter(apply Predicate, opts ...Option) OptionalSingle Get(opts ...Option) (Item, error) Map(apply Func, opts ...Option) Single Run(opts ...Option) Disposed }
Single is a observable with a single element.
type SingleImpl ¶
type SingleImpl struct {
// contains filtered or unexported fields
}
SingleImpl implements Single.
func (*SingleImpl) Filter ¶
func (s *SingleImpl) Filter(apply Predicate, opts ...Option) OptionalSingle
Filter emits only those items from an Observable that pass a predicate test.
func (*SingleImpl) Get ¶
func (s *SingleImpl) Get(opts ...Option) (Item, error)
Get returns the item. The error returned is if the context has been cancelled. This method is blocking.
func (*SingleImpl) Map ¶
func (s *SingleImpl) Map(apply Func, opts ...Option) Single
Map transforms the items emitted by a Single by applying a function to each item.
func (*SingleImpl) Observe ¶
func (s *SingleImpl) Observe(opts ...Option) <-chan Item
Observe observes a Single by returning its channel.
func (*SingleImpl) Run ¶
func (s *SingleImpl) Run(opts ...Option) Disposed
Run creates an observer without consuming the emitted items.
type TimestampItem ¶
TimestampItem attach a timestamp to an item.
type Unmarshaller ¶
Unmarshaller defines an unmarshaller type ([]byte to interface).
Source Files
¶
- assert.go
- duration.go
- errors.go
- factory.go
- item.go
- iterable.go
- iterable_channel.go
- iterable_create.go
- iterable_defer.go
- iterable_eventsource.go
- iterable_factory.go
- iterable_just.go
- iterable_range.go
- iterable_slice.go
- observable.go
- observable_operator.go
- optionalsingle.go
- options.go
- single.go
- types.go