Documentation
¶
Index ¶
- func Of(i interface{}) rxgo.Item
- type Handler
- type RxStream
- func ConvertObservable(observable rxgo.Observable) RxStream
- func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream
- func FromChannel(channel chan []byte) RxStream
- func FromReader(reader io.Reader) RxStream
- func FromReaderWithDecoder(readers chan io.Reader) RxStream
- func FromReaderWithFunc(f func() io.Reader) RxStream
- type RxStreamImpl
- func (s *RxStreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AuditTime(milliseconds uint32, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageFloat32(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageFloat64(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageInt(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageInt16(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageInt32(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageInt64(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) AverageInt8(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) BufferWithCount(count int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) BufferWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Connect(ctx context.Context) (context.Context, rxgo.Disposable)
- func (s *RxStreamImpl) Contains(equal rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Count(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Debounce(milliseconds uint32, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Distinct(apply rxgo.Func, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed
- func (s *RxStreamImpl) DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed
- func (s *RxStreamImpl) DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed
- func (s *RxStreamImpl) ElementAt(index uint, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Error(opts ...rxgo.Option) error
- func (s *RxStreamImpl) Errors(opts ...rxgo.Option) []error
- func (s *RxStreamImpl) Filter(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Find(find rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) First(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, ...) rxgo.Disposed
- func (s *RxStreamImpl) GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) IgnoreElements(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Join(joiner rxgo.Func2, right rxgo.Observable, ...) RxStream
- func (s *RxStreamImpl) Last(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Map(apply rxgo.Func, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Marshal(marshaller decoder.Marshaller, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Max(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) MergeReadWriterWithFunc(rwf serverless.GetFlowFunc, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Min(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Observe(opts ...rxgo.Option) <-chan rxgo.Item
- func (s *RxStreamImpl) OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) RxStream
- func (s *RxStreamImpl) Reduce(apply rxgo.Func2, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Run(opts ...rxgo.Option) rxgo.Disposed
- func (s *RxStreamImpl) Sample(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Scan(apply rxgo.Func2, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Send(output chan<- rxgo.Item, opts ...rxgo.Option)
- func (s *RxStreamImpl) SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Skip(nth uint, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SkipLast(nth uint, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, ...) RxStream
- func (s *RxStreamImpl) StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) StdOut(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Subscribe(key byte) RxStream
- func (s *RxStreamImpl) SumFloat32(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SumFloat64(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) SumInt64(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Take(nth uint, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) TakeLast(nth uint, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) TimeInterval(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) Timestamp(opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ToMap(keySelector rxgo.Func, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)
- func (s *RxStreamImpl) Unmarshal(unmarshaller decoder.Unmarshaller, factory func() interface{}, ...) RxStream
- func (s *RxStreamImpl) WindowWithCount(count int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) WindowWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream
- func (s *RxStreamImpl) ZipMultiObservers(observers []decoder.KeyObserveFunc, ...) RxStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Handler ¶ added in v1.1.0
type Handler func(interface{}) error
Handler defines a function that handle the input value.
type RxStream ¶
type RxStream interface { rxgo.Iterable MergeReadWriterWithFunc(rwf serverless.GetFlowFunc, opts ...rxgo.Option) RxStream Subscribe(key byte) RxStream Encode(key byte, opts ...rxgo.Option) RxStream OnObserve(function func(v []byte) (interface{}, error)) RxStream StdOut(opts ...rxgo.Option) RxStream AuditTime(milliseconds uint32, opts ...rxgo.Option) RxStream DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) RxStream All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream AverageFloat32(opts ...rxgo.Option) RxStream AverageFloat64(opts ...rxgo.Option) RxStream AverageInt(opts ...rxgo.Option) RxStream AverageInt8(opts ...rxgo.Option) RxStream AverageInt16(opts ...rxgo.Option) RxStream AverageInt32(opts ...rxgo.Option) RxStream AverageInt64(opts ...rxgo.Option) RxStream BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) RxStream BufferWithCount(count int, opts ...rxgo.Option) RxStream BufferWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream Connect(ctx context.Context) (context.Context, rxgo.Disposable) Contains(equal rxgo.Predicate, opts ...rxgo.Option) RxStream Count(opts ...rxgo.Option) RxStream Debounce(milliseconds uint32, opts ...rxgo.Option) RxStream DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) RxStream Distinct(apply rxgo.Func, opts ...rxgo.Option) RxStream DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) RxStream DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed ElementAt(index uint, opts ...rxgo.Option) RxStream Error(opts ...rxgo.Option) error Errors(opts ...rxgo.Option) []error Filter(apply rxgo.Predicate, opts ...rxgo.Option) RxStream Find(find rxgo.Predicate, opts ...rxgo.Option) RxStream First(opts ...rxgo.Option) RxStream FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) RxStream ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) RxStream GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) RxStream IgnoreElements(opts ...rxgo.Option) RxStream Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) RxStream Last(opts ...rxgo.Option) RxStream LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream Map(apply rxgo.Func, opts ...rxgo.Option) RxStream // Marshal transforms the items emitted by an Observable by applying a marshalling to each item. Marshal(marshaller decoder.Marshaller, opts ...rxgo.Option) RxStream Max(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream Min(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) RxStream OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) RxStream OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) RxStream Reduce(apply rxgo.Func2, opts ...rxgo.Option) RxStream Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) RxStream Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) RxStream Run(opts ...rxgo.Option) rxgo.Disposed Sample(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream Scan(apply rxgo.Func2, opts ...rxgo.Option) RxStream SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream Send(output chan<- rxgo.Item, opts ...rxgo.Option) Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) RxStream Skip(nth uint, opts ...rxgo.Option) RxStream SkipLast(nth uint, opts ...rxgo.Option) RxStream SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream SumFloat32(opts ...rxgo.Option) RxStream SumFloat64(opts ...rxgo.Option) RxStream SumInt64(opts ...rxgo.Option) RxStream Take(nth uint, opts ...rxgo.Option) RxStream TakeLast(nth uint, opts ...rxgo.Option) RxStream TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) RxStream TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream TimeInterval(opts ...rxgo.Option) RxStream Timestamp(opts ...rxgo.Option) RxStream ToMap(keySelector rxgo.Func, opts ...rxgo.Option) RxStream ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) RxStream ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error) // Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item. Unmarshal(unmarshaller decoder.Unmarshaller, factory func() interface{}, opts ...rxgo.Option) RxStream WindowWithCount(count int, opts ...rxgo.Option) RxStream WindowWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream // SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. // It returns the orginal data to RxStream, not the buffered slice. SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream // SlidingWindowWithTime buffers the data in the specified sliding window time in milliseconds, the buffered data can be processed in the handler func. // It returns the orginal data to RxStream, not the buffered slice. SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) RxStream // ZipMultiObservers subscribes multi Y3 observers, zips the values into a slice and calls the zipper callback when all keys are observed. ZipMultiObservers(observers []decoder.KeyObserveFunc, zipper func(items []interface{}) (interface{}, error)) RxStream }
func ConvertObservable ¶
func ConvertObservable(observable rxgo.Observable) RxStream
func CreateObservable ¶
func FromChannel ¶ added in v0.6.0
func FromReader ¶
func FromReaderWithDecoder ¶ added in v1.2.520
FromReaderWithDecoder creates a RxStream with decoder
func FromReaderWithFunc ¶ added in v0.6.0
type RxStreamImpl ¶
type RxStreamImpl struct {
// contains filtered or unexported fields
}
func (*RxStreamImpl) All ¶
func (s *RxStreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AuditTime ¶
func (s *RxStreamImpl) AuditTime(milliseconds uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageFloat32 ¶
func (s *RxStreamImpl) AverageFloat32(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageFloat64 ¶
func (s *RxStreamImpl) AverageFloat64(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageInt ¶
func (s *RxStreamImpl) AverageInt(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageInt16 ¶
func (s *RxStreamImpl) AverageInt16(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageInt32 ¶
func (s *RxStreamImpl) AverageInt32(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageInt64 ¶
func (s *RxStreamImpl) AverageInt64(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) AverageInt8 ¶
func (s *RxStreamImpl) AverageInt8(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) BackOffRetry ¶
func (s *RxStreamImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) BufferWithCount ¶
func (s *RxStreamImpl) BufferWithCount(count int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) BufferWithTime ¶
func (s *RxStreamImpl) BufferWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) BufferWithTimeOrCount ¶
func (s *RxStreamImpl) BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Connect ¶
func (s *RxStreamImpl) Connect(ctx context.Context) (context.Context, rxgo.Disposable)
func (*RxStreamImpl) Contains ¶
func (s *RxStreamImpl) Contains(equal rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Count ¶
func (s *RxStreamImpl) Count(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Debounce ¶
func (s *RxStreamImpl) Debounce(milliseconds uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) DefaultIfEmpty ¶
func (s *RxStreamImpl) DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) DefaultIfEmptyWithTime ¶
func (s *RxStreamImpl) DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Distinct ¶
func (s *RxStreamImpl) Distinct(apply rxgo.Func, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) DistinctUntilChanged ¶
func (s *RxStreamImpl) DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) DoOnCompleted ¶
func (s *RxStreamImpl) DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed
func (*RxStreamImpl) DoOnError ¶
func (s *RxStreamImpl) DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed
func (*RxStreamImpl) DoOnNext ¶
func (s *RxStreamImpl) DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed
func (*RxStreamImpl) ElementAt ¶
func (s *RxStreamImpl) ElementAt(index uint, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Encode ¶ added in v0.7.7
func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Error ¶
func (s *RxStreamImpl) Error(opts ...rxgo.Option) error
func (*RxStreamImpl) Errors ¶
func (s *RxStreamImpl) Errors(opts ...rxgo.Option) []error
func (*RxStreamImpl) Filter ¶
func (s *RxStreamImpl) Filter(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Find ¶
func (s *RxStreamImpl) Find(find rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) First ¶
func (s *RxStreamImpl) First(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) FirstOrDefault ¶
func (s *RxStreamImpl) FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) FlatMap ¶
func (s *RxStreamImpl) FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ForEach ¶
func (s *RxStreamImpl) ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed
func (*RxStreamImpl) GroupBy ¶
func (s *RxStreamImpl) GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) GroupByDynamic ¶
func (s *RxStreamImpl) GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) IgnoreElements ¶
func (s *RxStreamImpl) IgnoreElements(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Join ¶
func (s *RxStreamImpl) Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Last ¶
func (s *RxStreamImpl) Last(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) LastOrDefault ¶
func (s *RxStreamImpl) LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Map ¶
func (s *RxStreamImpl) Map(apply rxgo.Func, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Marshal ¶
func (s *RxStreamImpl) Marshal(marshaller decoder.Marshaller, opts ...rxgo.Option) RxStream
Marshal transforms the items emitted by an Observable by applying a marshalling to each item.
func (*RxStreamImpl) Max ¶
func (s *RxStreamImpl) Max(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) MergeReadWriterWithFunc ¶ added in v0.6.0
func (s *RxStreamImpl) MergeReadWriterWithFunc(rwf serverless.GetFlowFunc, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Min ¶
func (s *RxStreamImpl) Min(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Observe ¶
func (s *RxStreamImpl) Observe(opts ...rxgo.Option) <-chan rxgo.Item
func (*RxStreamImpl) OnErrorResumeNext ¶
func (s *RxStreamImpl) OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) OnErrorReturn ¶
func (s *RxStreamImpl) OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) OnErrorReturnItem ¶
func (s *RxStreamImpl) OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) OnObserve ¶ added in v0.7.2
func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) RxStream
func (*RxStreamImpl) Reduce ¶
func (s *RxStreamImpl) Reduce(apply rxgo.Func2, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Repeat ¶
func (s *RxStreamImpl) Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Retry ¶
func (s *RxStreamImpl) Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Run ¶
func (s *RxStreamImpl) Run(opts ...rxgo.Option) rxgo.Disposed
func (*RxStreamImpl) Sample ¶
func (s *RxStreamImpl) Sample(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Scan ¶
func (s *RxStreamImpl) Scan(apply rxgo.Func2, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Send ¶
func (s *RxStreamImpl) Send(output chan<- rxgo.Item, opts ...rxgo.Option)
func (*RxStreamImpl) SequenceEqual ¶
func (s *RxStreamImpl) SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Serialize ¶
func (s *RxStreamImpl) Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Skip ¶
func (s *RxStreamImpl) Skip(nth uint, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) SkipLast ¶
func (s *RxStreamImpl) SkipLast(nth uint, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) SkipWhile ¶
func (s *RxStreamImpl) SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) SlidingWindowWithCount ¶ added in v1.1.0
func (s *RxStreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream
SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. It returns the orginal data to RxStream, not the buffered slice.
func (*RxStreamImpl) SlidingWindowWithTime ¶ added in v1.1.0
func (s *RxStreamImpl) SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) RxStream
SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func. It returns the orginal data to RxStream, not the buffered slice.
func (*RxStreamImpl) StartWith ¶
func (s *RxStreamImpl) StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) StdOut ¶
func (s *RxStreamImpl) StdOut(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Subscribe ¶ added in v0.7.2
func (s *RxStreamImpl) Subscribe(key byte) RxStream
func (*RxStreamImpl) SumFloat32 ¶
func (s *RxStreamImpl) SumFloat32(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) SumFloat64 ¶
func (s *RxStreamImpl) SumFloat64(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) SumInt64 ¶
func (s *RxStreamImpl) SumInt64(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Take ¶
func (s *RxStreamImpl) Take(nth uint, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) TakeLast ¶
func (s *RxStreamImpl) TakeLast(nth uint, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) TakeUntil ¶
func (s *RxStreamImpl) TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) TakeWhile ¶
func (s *RxStreamImpl) TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) TimeInterval ¶
func (s *RxStreamImpl) TimeInterval(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) Timestamp ¶
func (s *RxStreamImpl) Timestamp(opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ToMap ¶
func (s *RxStreamImpl) ToMap(keySelector rxgo.Func, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ToMapWithValueSelector ¶
func (s *RxStreamImpl) ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ToSlice ¶
func (s *RxStreamImpl) ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)
func (*RxStreamImpl) Unmarshal ¶
func (s *RxStreamImpl) Unmarshal(unmarshaller decoder.Unmarshaller, factory func() interface{}, opts ...rxgo.Option) RxStream
Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.
func (*RxStreamImpl) WindowWithCount ¶
func (s *RxStreamImpl) WindowWithCount(count int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) WindowWithTime ¶
func (s *RxStreamImpl) WindowWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) WindowWithTimeOrCount ¶
func (s *RxStreamImpl) WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ZipFromIterable ¶
func (s *RxStreamImpl) ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream
func (*RxStreamImpl) ZipMultiObservers ¶ added in v1.1.0
func (s *RxStreamImpl) ZipMultiObservers(observers []decoder.KeyObserveFunc, zipper func(items []interface{}) (interface{}, error)) RxStream
ZipMultiObservers subscribes multi Y3 observers, zips the values into a slice and calls the zipper callback when all keys are observed.