rx

package
v1.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: Apache-2.0 Imports: 11 Imported by: 3

Documentation

Overview

Package rx provides the Rx interfaces.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Of

func Of(i interface{}) rxgo.Item

Of creates an item from a value.

Types

type Factory

type Factory interface {
	// FromChannel creates a new Stream from a channel.
	FromChannel(ctx context.Context, channel chan interface{}) Stream

	// FromItems creates a new Stream from items.
	FromItems(ctx context.Context, items []interface{}) Stream
}

Factory creates the rx.Stream from several sources.

func NewFactory

func NewFactory() Factory

NewFactory creates a new Rx factory.

type Handler

type Handler func(interface{}) error

Handler defines a function that handle the input value.

type Marshaller

type Marshaller func(interface{}) ([]byte, error)

Marshaller defines a marshaller type (interface{} to []byte).

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime is the Stream Serverless Runtime for RxStream.

func NewRuntime

func NewRuntime(sfn yomo.StreamFunction) *Runtime

NewRuntime creates a new Rx Stream Serverless Runtime.

func (*Runtime) Pipe

func (r *Runtime) Pipe(rxHandler func(rxstream Stream) Stream)

Pipe the RxHandler with RxStream.

func (*Runtime) PipeHandler added in v1.5.4

func (r *Runtime) PipeHandler(in <-chan []byte, out chan<- *frame.DataFrame)

PipeHandler processes data sequentially.

func (*Runtime) RawByteHandler

func (r *Runtime) RawByteHandler(ctx serverless.Context)

RawByteHandler is the Handler for RawBytes.

type Stream

type Stream interface {
	rxgo.Iterable

	// PipeBackToZipper write the DataFrame with a specified DataID.
	PipeBackToZipper(dataTag frame.Tag) Stream

	// RawBytes get the raw bytes in Stream which receives from YoMo-Zipper.
	RawBytes() Stream

	// StdOut writes the value as standard output.
	StdOut(opts ...rxgo.Option) Stream

	// AuditTime ignores values for duration milliseconds, then only emits the most recent value.
	AuditTime(milliseconds uint32, opts ...rxgo.Option) Stream

	// DefaultIfEmptyWithTime emits a default value if didn't receive any values for duration milliseconds.
	DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) Stream

	// All determines whether all items emitted by an Observable meet some criteria
	All(predicate rxgo.Predicate, opts ...rxgo.Option) Stream

	// AverageFloat32 calculates the average of numbers emitted by an Observable and emits the average float32.
	AverageFloat32(opts ...rxgo.Option) Stream

	// AverageFloat64 calculates the average of numbers emitted by an Observable and emits the average float64.
	AverageFloat64(opts ...rxgo.Option) Stream

	// AverageInt calculates the average of numbers emitted by an Observable and emits the average int.
	AverageInt(opts ...rxgo.Option) Stream

	// AverageInt8 calculates the average of numbers emitted by an Observable and emits the average int8.
	AverageInt8(opts ...rxgo.Option) Stream

	// AverageInt16 calculates the average of numbers emitted by an Observable and emits the average int16.
	AverageInt16(opts ...rxgo.Option) Stream

	// AverageInt32 calculates the average of numbers emitted by an Observable and emits the average int32.
	AverageInt32(opts ...rxgo.Option) Stream

	// AverageInt64 calculates the average of numbers emitted by an Observable and emits the average int64.
	AverageInt64(opts ...rxgo.Option) Stream

	// BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error.
	// Cannot be run in parallel.
	BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) Stream

	// BufferWithCount returns an Observable that emits buffers of items it collects
	// from the source Observable.
	// The resulting Observable emits buffers every skip items, each containing a slice of count items.
	// When the source Observable completes or encounters an error,
	// the resulting Observable emits the current buffer and propagates
	// the notification from the source Observable.
	BufferWithCount(count int, opts ...rxgo.Option) Stream

	// BufferWithTime returns an Observable that emits buffers of items it collects from the source
	// Observable. The resulting Observable starts a new buffer periodically, as determined by the
	// timeshift argument. It emits each buffer after a fixed timespan, specified by the timespan argument.
	// When the source Observable completes or encounters an error, the resulting Observable emits
	// the current buffer and propagates the notification from the source Observable.
	BufferWithTime(milliseconds uint32, opts ...rxgo.Option) Stream

	// BufferWithTimeOrCount returns an Observable that emits buffers of items it collects from the source
	// Observable either from a given count or at a given time interval.
	BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) Stream

	// Connect instructs a connectable Observable to begin emitting items to its subscribers.
	Connect(ctx context.Context) (context.Context, rxgo.Disposable)

	// Contains determines whether an Observable emits a particular item or not.
	Contains(equal rxgo.Predicate, opts ...rxgo.Option) Stream

	// Count counts the number of items emitted by the source Observable and emit only this value.
	Count(opts ...rxgo.Option) Stream

	// Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item.
	Debounce(milliseconds uint32, opts ...rxgo.Option) Stream

	// DefaultIfEmpty returns an Observable that emits the items emitted by the source
	// Observable or a specified default item if the source Observable is empty.
	DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) Stream

	// Distinct suppresses duplicate items in the original Observable and returns
	// a new Observable.
	Distinct(apply rxgo.Func, opts ...rxgo.Option) Stream

	// DistinctUntilChanged suppresses consecutive duplicate items in the original Observable.
	// Cannot be run in parallel.
	DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) Stream

	// DoOnCompleted registers a callback action that will be called once the Observable terminates.
	DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

	// DoOnError registers a callback action that will be called if the Observable terminates abnormally.
	DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed

	// DoOnNext registers a callback action that will be called on each item emitted by the Observable.
	DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed

	// ElementAt emits only item n emitted by an Observable.
	// Cannot be run in parallel.
	ElementAt(index uint, opts ...rxgo.Option) Stream

	// Error returns the eventual Observable error.
	// This method is blocking.
	Error(opts ...rxgo.Option) error

	// Errors returns an eventual list of Observable errors.
	// This method is blocking
	Errors(opts ...rxgo.Option) []error

	// Filter emits only those items from an Observable that pass a predicate test.
	Filter(apply rxgo.Predicate, opts ...rxgo.Option) Stream

	// Find emits the first item passing a predicate then complete.
	Find(find rxgo.Predicate, opts ...rxgo.Option) Stream

	// First returns new Observable which emit only first item.
	// Cannot be run in parallel.
	First(opts ...rxgo.Option) Stream

	// FirstOrDefault returns new Observable which emit only first item.
	// If the observable fails to emit any items, it emits a default value.
	// Cannot be run in parallel.
	FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) Stream

	// FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.
	FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) Stream

	// ForEach subscribes to the Observable and receives notifications for each element.
	ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

	// GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.
	GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) Stream

	// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
	GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) Stream

	// IgnoreElements ignores all items emitted by the source ObservableSource except for the errors.
	// Cannot be run in parallel.
	IgnoreElements(opts ...rxgo.Option) Stream

	// Join combines items emitted by two Observables whenever an item from one Observable is emitted during
	// a time window defined according to an item emitted by the other Observable.
	// The time is extracted using a timeExtractor function.
	Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) Stream

	// Last returns a new Observable which emit only last item.
	// Cannot be run in parallel.
	Last(opts ...rxgo.Option) Stream

	// LastOrDefault returns a new Observable which emit only last item.
	// If the observable fails to emit any items, it emits a default value.
	// Cannot be run in parallel.
	LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) Stream

	// Map transforms the items emitted by an Observable by applying a function to each item.
	Map(apply rxgo.Func, opts ...rxgo.Option) Stream

	// Marshal transforms the items emitted by an Observable by applying a marshalling to each item.
	Marshal(marshaller Marshaller, opts ...rxgo.Option) Stream

	// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
	Max(comparator rxgo.Comparator, opts ...rxgo.Option) Stream

	// Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.
	Min(comparator rxgo.Comparator, opts ...rxgo.Option) Stream

	// OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking
	// onError if it encounters an error.
	OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) Stream

	// OnErrorReturn instructs an Observable to emit an item (returned by a specified function)
	// rather than invoking onError if it encounters an error.
	OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) Stream

	// OnErrorReturnItem instructs on Observable to emit an item if it encounters an error.
	OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) Stream

	// Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.
	Reduce(apply rxgo.Func2, opts ...rxgo.Option) Stream

	// Repeat returns an Observable that repeats the sequence of items emitted by the source Observable
	// at most count times, at a particular frequency.
	// Cannot run in parallel.
	Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) Stream

	// Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error.
	// Cannot be run in parallel.
	Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) Stream

	// Run creates an Observer without consuming the emitted items.
	Run(opts ...rxgo.Option) rxgo.Disposed

	// Sample returns an Observable that emits the most recent items emitted by the source
	// Iterable whenever the input Iterable emits an item.
	Sample(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

	// Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value.
	// Cannot be run in parallel.
	Scan(apply rxgo.Func2, opts ...rxgo.Option) Stream

	// SequenceEqual emits true if an Observable and the input Observable emit the same items,
	// in the same order, with the same termination state. Otherwise, it emits false.
	SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

	// Send sends the items to a given channel.
	Send(output chan<- rxgo.Item, opts ...rxgo.Option)

	// Serialize forces an Observable to make serialized calls and to be well-behaved.
	Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) Stream

	// Skip suppresses the first n items in the original Observable and
	// returns a new Observable with the rest items.
	// Cannot be run in parallel.
	Skip(nth uint, opts ...rxgo.Option) Stream

	// SkipLast suppresses the last n items in the original Observable and
	// returns a new Observable with the rest items.
	// Cannot be run in parallel.
	SkipLast(nth uint, opts ...rxgo.Option) Stream

	// SkipWhile discard items emitted by an Observable until a specified condition becomes false.
	// Cannot be run in parallel.
	SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) Stream

	// StartWith emits a specified Iterable before beginning to emit the items from the source Observable.
	StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

	// SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.
	SumFloat32(opts ...rxgo.Option) Stream

	// SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.
	SumFloat64(opts ...rxgo.Option) Stream

	// SumInt64 calculates the average of integers emitted by an Observable and emits an int64.
	SumInt64(opts ...rxgo.Option) Stream

	// Take emits only the first n items emitted by an Observable.
	// Cannot be run in parallel.
	Take(nth uint, opts ...rxgo.Option) Stream

	// TakeLast emits only the last n items emitted by an Observable.
	// Cannot be run in parallel.
	TakeLast(nth uint, opts ...rxgo.Option) Stream

	// TakeUntil returns an Observable that emits items emitted by the source Observable,
	// checks the specified predicate for each item, and then completes when the condition is satisfied.
	// Cannot be run in parallel.
	TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) Stream

	// TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each
	// item satisfied a specified condition, and then completes as soon as this condition is not satisfied.
	// Cannot be run in parallel.
	TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) Stream

	// TimeInterval converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.
	TimeInterval(opts ...rxgo.Option) Stream

	// Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted.
	Timestamp(opts ...rxgo.Option) Stream

	// ToMap convert the sequence of items emitted by an Observable
	// into a map keyed by a specified key function.
	// Cannot be run in parallel.
	ToMap(keySelector rxgo.Func, opts ...rxgo.Option) Stream

	// ToMapWithValueSelector convert the sequence of items emitted by an Observable
	// into a map keyed by a specified key function and valued by another
	// value function.
	// Cannot be run in parallel.
	ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) Stream

	// ToSlice collects all items from an Observable and emit them in a slice and an optional error.
	// Cannot be run in parallel.
	ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)

	// Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.
	Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...rxgo.Option) Stream

	// WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows
	// rather than emitting the items one at a time.
	WindowWithCount(count int, opts ...rxgo.Option) Stream

	// WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows
	// and emit them rather than emitting the items one at a time.
	WindowWithTime(milliseconds uint32, opts ...rxgo.Option) Stream

	// WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size
	// and emit them rather than emitting the items one at a time.
	WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) Stream

	// ZipFromIterable merges the emissions of an Iterable via a specified function
	// and emit single items for each combination based on the results of this function.
	ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) Stream

	// SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func.
	// It returns the orginal data to Stream, not the buffered slice.
	SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) Stream

	// SlidingWindowWithTime buffers the data in the specified sliding window time in milliseconds, the buffered data can be processed in the handler func.
	// It returns the orginal data to Stream, not the buffered slice.
	SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) Stream
}

Stream is the interface for RxStream.

func ConvertObservable

func ConvertObservable(ctx context.Context, observable rxgo.Observable) Stream

ConvertObservable converts the observable to RxStream.

func CreateObservable

func CreateObservable(ctx context.Context, f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) Stream

CreateObservable creates a new observable.

func CreateZipperObservable

func CreateZipperObservable(ctx context.Context, f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) Stream

CreateZipperObservable creates a new observable with the capacity 100 for Zipper.

type StreamImpl

type StreamImpl struct {
	// contains filtered or unexported fields
}

StreamImpl is the implementation of Stream interface.

func (*StreamImpl) All

func (s *StreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) Stream

All determines whether all items emitted by an Observable meet some criteria

func (*StreamImpl) AuditTime

func (s *StreamImpl) AuditTime(milliseconds uint32, opts ...rxgo.Option) Stream

AuditTime ignores values for duration milliseconds, then only emits the most recent value.

func (*StreamImpl) AverageFloat32

func (s *StreamImpl) AverageFloat32(opts ...rxgo.Option) Stream

AverageFloat32 calculates the average of numbers emitted by an Observable and emits the average float32.

func (*StreamImpl) AverageFloat64

func (s *StreamImpl) AverageFloat64(opts ...rxgo.Option) Stream

AverageFloat64 calculates the average of numbers emitted by an Observable and emits the average float64.

func (*StreamImpl) AverageInt

func (s *StreamImpl) AverageInt(opts ...rxgo.Option) Stream

AverageInt calculates the average of numbers emitted by an Observable and emits the average int.

func (*StreamImpl) AverageInt16

func (s *StreamImpl) AverageInt16(opts ...rxgo.Option) Stream

AverageInt16 calculates the average of numbers emitted by an Observable and emits the average int16.

func (*StreamImpl) AverageInt32

func (s *StreamImpl) AverageInt32(opts ...rxgo.Option) Stream

AverageInt32 calculates the average of numbers emitted by an Observable and emits the average int32.

func (*StreamImpl) AverageInt64

func (s *StreamImpl) AverageInt64(opts ...rxgo.Option) Stream

AverageInt64 calculates the average of numbers emitted by an Observable and emits the average int64.

func (*StreamImpl) AverageInt8

func (s *StreamImpl) AverageInt8(opts ...rxgo.Option) Stream

AverageInt8 calculates the average of numbers emitted by an Observable and emits the≤ average int8.

func (*StreamImpl) BackOffRetry

func (s *StreamImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) Stream

BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*StreamImpl) BufferWithCount

func (s *StreamImpl) BufferWithCount(count int, opts ...rxgo.Option) Stream

BufferWithCount returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits buffers every skip items, each containing a slice of count items. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

func (*StreamImpl) BufferWithTime

func (s *StreamImpl) BufferWithTime(milliseconds uint32, opts ...rxgo.Option) Stream

BufferWithTime returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable starts a new buffer periodically, as determined by the timeshift argument. It emits each buffer after a fixed timespan, specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

func (*StreamImpl) BufferWithTimeOrCount

func (s *StreamImpl) BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) Stream

BufferWithTimeOrCount returns an Observable that emits buffers of items it collects from the source Observable either from a given count or at a given time interval.

func (*StreamImpl) Connect

func (s *StreamImpl) Connect(ctx context.Context) (context.Context, rxgo.Disposable)

Connect instructs a connectable Observable to begin emitting items to its subscribers.

func (*StreamImpl) Contains

func (s *StreamImpl) Contains(equal rxgo.Predicate, opts ...rxgo.Option) Stream

Contains determines whether an Observable emits a particular item or not.

func (*StreamImpl) Count

func (s *StreamImpl) Count(opts ...rxgo.Option) Stream

Count counts the number of items emitted by the source Observable and emit only this value.

func (*StreamImpl) Debounce

func (s *StreamImpl) Debounce(milliseconds uint32, opts ...rxgo.Option) Stream

Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item.

func (*StreamImpl) DefaultIfEmpty

func (s *StreamImpl) DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) Stream

DefaultIfEmpty returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.

func (*StreamImpl) DefaultIfEmptyWithTime

func (s *StreamImpl) DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) Stream

DefaultIfEmptyWithTime emits a default value if didn't receive any values for duration milliseconds.

func (*StreamImpl) Distinct

func (s *StreamImpl) Distinct(apply rxgo.Func, opts ...rxgo.Option) Stream

Distinct suppresses duplicate items in the original Observable and returns a new Observable.

func (*StreamImpl) DistinctUntilChanged

func (s *StreamImpl) DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) Stream

DistinctUntilChanged suppresses consecutive duplicate items in the original Observable. Cannot be run in parallel.

func (*StreamImpl) DoOnCompleted

func (s *StreamImpl) DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

DoOnCompleted registers a callback action that will be called once the Observable terminates.

func (*StreamImpl) DoOnError

func (s *StreamImpl) DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed

DoOnError registers a callback action that will be called if the Observable terminates abnormally.

func (*StreamImpl) DoOnNext

func (s *StreamImpl) DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed

DoOnNext registers a callback action that will be called on each item emitted by the Observable.

func (*StreamImpl) ElementAt

func (s *StreamImpl) ElementAt(index uint, opts ...rxgo.Option) Stream

ElementAt emits only item n emitted by an Observable. Cannot be run in parallel.

func (*StreamImpl) Error

func (s *StreamImpl) Error(opts ...rxgo.Option) error

Error returns the eventual Observable error. This method is blocking.

func (*StreamImpl) Errors

func (s *StreamImpl) Errors(opts ...rxgo.Option) []error

Errors returns an eventual list of Observable errors. This method is blocking

func (*StreamImpl) Filter

func (s *StreamImpl) Filter(apply rxgo.Predicate, opts ...rxgo.Option) Stream

Filter emits only those items from an Observable that pass a predicate test.

func (*StreamImpl) Find

func (s *StreamImpl) Find(find rxgo.Predicate, opts ...rxgo.Option) Stream

Find emits the first item passing a predicate then complete.

func (*StreamImpl) First

func (s *StreamImpl) First(opts ...rxgo.Option) Stream

First returns new Observable which emit only first item. Cannot be run in parallel.

func (*StreamImpl) FirstOrDefault

func (s *StreamImpl) FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) Stream

FirstOrDefault returns new Observable which emit only first item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.

func (*StreamImpl) FlatMap

func (s *StreamImpl) FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) Stream

FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.

func (*StreamImpl) ForEach

func (s *StreamImpl) ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

ForEach subscribes to the Observable and receives notifications for each element.

func (*StreamImpl) GroupBy

func (s *StreamImpl) GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) Stream

GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.

func (*StreamImpl) GroupByDynamic

func (s *StreamImpl) GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) Stream

GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

func (*StreamImpl) IgnoreElements

func (s *StreamImpl) IgnoreElements(opts ...rxgo.Option) Stream

IgnoreElements ignores all items emitted by the source ObservableSource except for the errors. Cannot be run in parallel.

func (*StreamImpl) Join

func (s *StreamImpl) Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) Stream

Join combines items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable. The time is extracted using a timeExtractor function.

func (*StreamImpl) Last

func (s *StreamImpl) Last(opts ...rxgo.Option) Stream

Last returns a new Observable which emit only last item. Cannot be run in parallel.

func (*StreamImpl) LastOrDefault

func (s *StreamImpl) LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) Stream

LastOrDefault returns a new Observable which emit only last item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.

func (*StreamImpl) Map

func (s *StreamImpl) Map(apply rxgo.Func, opts ...rxgo.Option) Stream

Map transforms the items emitted by an Observable by applying a function to each item.

func (*StreamImpl) Marshal

func (s *StreamImpl) Marshal(marshaller Marshaller, opts ...rxgo.Option) Stream

Marshal transforms the items emitted by an Observable by applying a marshalling to each item.

func (*StreamImpl) Max

func (s *StreamImpl) Max(comparator rxgo.Comparator, opts ...rxgo.Option) Stream

Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.

func (*StreamImpl) Min

func (s *StreamImpl) Min(comparator rxgo.Comparator, opts ...rxgo.Option) Stream

Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.

func (*StreamImpl) Observe

func (s *StreamImpl) Observe(opts ...rxgo.Option) <-chan rxgo.Item

Observe the items in RxStream.

func (*StreamImpl) OnErrorResumeNext

func (s *StreamImpl) OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) Stream

OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking onError if it encounters an error.

func (*StreamImpl) OnErrorReturn

func (s *StreamImpl) OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) Stream

OnErrorReturn instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

func (*StreamImpl) OnErrorReturnItem

func (s *StreamImpl) OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) Stream

OnErrorReturnItem instructs on Observable to emit an item if it encounters an error.

func (*StreamImpl) PipeBackToZipper

func (s *StreamImpl) PipeBackToZipper(dataTag frame.Tag) Stream

PipeBackToZipper sets a specified DataTag to bytes and will pipe it back to zipper.

func (*StreamImpl) RawBytes

func (s *StreamImpl) RawBytes() Stream

RawBytes get the raw bytes in Stream which receives from YoMo-Zipper.

func (*StreamImpl) Reduce

func (s *StreamImpl) Reduce(apply rxgo.Func2, opts ...rxgo.Option) Stream

Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.

func (*StreamImpl) Repeat

func (s *StreamImpl) Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) Stream

Repeat returns an Observable that repeats the sequence of items emitted by the source Observable at most count times, at a particular frequency. Cannot run in parallel.

func (*StreamImpl) Retry

func (s *StreamImpl) Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) Stream

Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*StreamImpl) Run

func (s *StreamImpl) Run(opts ...rxgo.Option) rxgo.Disposed

Run creates an Observer without consuming the emitted items.

func (*StreamImpl) Sample

func (s *StreamImpl) Sample(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

Sample returns an Observable that emits the most recent items emitted by the source Iterable whenever the input Iterable emits an item.

func (*StreamImpl) Scan

func (s *StreamImpl) Scan(apply rxgo.Func2, opts ...rxgo.Option) Stream

Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value. Cannot be run in parallel.

func (*StreamImpl) Send

func (s *StreamImpl) Send(output chan<- rxgo.Item, opts ...rxgo.Option)

Send sends the items to a given channel.

func (*StreamImpl) SequenceEqual

func (s *StreamImpl) SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

SequenceEqual emits true if an Observable and the input Observable emit the same items, in the same order, with the same termination state. Otherwise, it emits false.

func (*StreamImpl) Serialize

func (s *StreamImpl) Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) Stream

Serialize forces an Observable to make serialized calls and to be well-behaved.

func (*StreamImpl) Skip

func (s *StreamImpl) Skip(nth uint, opts ...rxgo.Option) Stream

Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*StreamImpl) SkipLast

func (s *StreamImpl) SkipLast(nth uint, opts ...rxgo.Option) Stream

SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*StreamImpl) SkipWhile

func (s *StreamImpl) SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) Stream

SkipWhile discard items emitted by an Observable until a specified condition becomes false. Cannot be run in parallel.

func (*StreamImpl) SlidingWindowWithCount

func (s *StreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) Stream

SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. It returns the orginal data to Stream, not the buffered slice.

func (*StreamImpl) SlidingWindowWithTime

func (s *StreamImpl) SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) Stream

SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func. It returns the orginal data to Stream, not the buffered slice.

func (*StreamImpl) StartWith

func (s *StreamImpl) StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) Stream

StartWith emits a specified Iterable before beginning to emit the items from the source Observable.

func (*StreamImpl) StdOut

func (s *StreamImpl) StdOut(opts ...rxgo.Option) Stream

StdOut writes the item as standard output.

func (*StreamImpl) SumFloat32

func (s *StreamImpl) SumFloat32(opts ...rxgo.Option) Stream

SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.

func (*StreamImpl) SumFloat64

func (s *StreamImpl) SumFloat64(opts ...rxgo.Option) Stream

SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.

func (*StreamImpl) SumInt64

func (s *StreamImpl) SumInt64(opts ...rxgo.Option) Stream

SumInt64 calculates the average of integers emitted by an Observable and emits an int64.

func (*StreamImpl) Take

func (s *StreamImpl) Take(nth uint, opts ...rxgo.Option) Stream

Take emits only the first n items emitted by an Observable. Cannot be run in parallel.

func (*StreamImpl) TakeLast

func (s *StreamImpl) TakeLast(nth uint, opts ...rxgo.Option) Stream

TakeLast emits only the last n items emitted by an Observable. Cannot be run in parallel.

func (*StreamImpl) TakeUntil

func (s *StreamImpl) TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) Stream

TakeUntil returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. Cannot be run in parallel.

func (*StreamImpl) TakeWhile

func (s *StreamImpl) TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) Stream

TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied. Cannot be run in parallel.

func (*StreamImpl) TimeInterval

func (s *StreamImpl) TimeInterval(opts ...rxgo.Option) Stream

TimeInterval converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.

func (*StreamImpl) Timestamp

func (s *StreamImpl) Timestamp(opts ...rxgo.Option) Stream

Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted.

func (*StreamImpl) ToMap

func (s *StreamImpl) ToMap(keySelector rxgo.Func, opts ...rxgo.Option) Stream

ToMap convert the sequence of items emitted by an Observable into a map keyed by a specified key function. Cannot be run in parallel.

func (*StreamImpl) ToMapWithValueSelector

func (s *StreamImpl) ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) Stream

ToMapWithValueSelector convert the sequence of items emitted by an Observable into a map keyed by a specified key function and valued by another value function. Cannot be run in parallel.

func (*StreamImpl) ToSlice

func (s *StreamImpl) ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)

ToSlice collects all items from an Observable and emit them in a slice and an optional error. Cannot be run in parallel.

func (*StreamImpl) Unmarshal

func (s *StreamImpl) Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...rxgo.Option) Stream

Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.

func (*StreamImpl) WindowWithCount

func (s *StreamImpl) WindowWithCount(count int, opts ...rxgo.Option) Stream

WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows rather than emitting the items one at a time.

func (*StreamImpl) WindowWithTime

func (s *StreamImpl) WindowWithTime(milliseconds uint32, opts ...rxgo.Option) Stream

WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.

func (*StreamImpl) WindowWithTimeOrCount

func (s *StreamImpl) WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) Stream

WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size and emit them rather than emitting the items one at a time.

func (*StreamImpl) ZipFromIterable

func (s *StreamImpl) ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) Stream

ZipFromIterable merges the emissions of an Iterable via a specified function and emit single items for each combination based on the results of this function.

type Unmarshaller

type Unmarshaller func([]byte, interface{}) error

Unmarshaller defines an unmarshaller type ([]byte to interface).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL