rx

package
v1.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Of

func Of(i interface{}) rxgo.Item

Types

type Handler added in v1.1.0

type Handler func(interface{}) error

Handler defines a function that handle the input value.

type RxStream

type RxStream interface {
	rxgo.Iterable
	MergeReadWriterWithFunc(rwf serverless.GetFlowFunc, opts ...rxgo.Option) RxStream
	Subscribe(key byte) RxStream
	Encode(key byte, opts ...rxgo.Option) RxStream
	OnObserve(function func(v []byte) (interface{}, error)) RxStream
	StdOut(opts ...rxgo.Option) RxStream
	AuditTime(milliseconds uint32, opts ...rxgo.Option) RxStream
	DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) RxStream
	All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream
	AverageFloat32(opts ...rxgo.Option) RxStream
	AverageFloat64(opts ...rxgo.Option) RxStream
	AverageInt(opts ...rxgo.Option) RxStream
	AverageInt8(opts ...rxgo.Option) RxStream
	AverageInt16(opts ...rxgo.Option) RxStream
	AverageInt32(opts ...rxgo.Option) RxStream
	AverageInt64(opts ...rxgo.Option) RxStream
	BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) RxStream
	BufferWithCount(count int, opts ...rxgo.Option) RxStream
	BufferWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
	BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
	Connect(ctx context.Context) (context.Context, rxgo.Disposable)
	Contains(equal rxgo.Predicate, opts ...rxgo.Option) RxStream
	Count(opts ...rxgo.Option) RxStream
	Debounce(milliseconds uint32, opts ...rxgo.Option) RxStream
	DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) RxStream
	Distinct(apply rxgo.Func, opts ...rxgo.Option) RxStream
	DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) RxStream
	DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed
	DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed
	DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed
	ElementAt(index uint, opts ...rxgo.Option) RxStream
	Error(opts ...rxgo.Option) error
	Errors(opts ...rxgo.Option) []error
	Filter(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
	Find(find rxgo.Predicate, opts ...rxgo.Option) RxStream
	First(opts ...rxgo.Option) RxStream
	FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
	FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) RxStream
	ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed
	GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) RxStream
	GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) RxStream
	IgnoreElements(opts ...rxgo.Option) RxStream
	Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) RxStream
	Last(opts ...rxgo.Option) RxStream
	LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream
	Map(apply rxgo.Func, opts ...rxgo.Option) RxStream
	// Marshal transforms the items emitted by an Observable by applying a marshalling to each item.
	Marshal(marshaller decoder.Marshaller, opts ...rxgo.Option) RxStream
	Max(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
	Min(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream
	OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) RxStream
	OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) RxStream
	OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) RxStream
	Reduce(apply rxgo.Func2, opts ...rxgo.Option) RxStream
	Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) RxStream
	Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) RxStream
	Run(opts ...rxgo.Option) rxgo.Disposed
	Sample(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
	Scan(apply rxgo.Func2, opts ...rxgo.Option) RxStream
	SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
	Send(output chan<- rxgo.Item, opts ...rxgo.Option)
	Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) RxStream
	Skip(nth uint, opts ...rxgo.Option) RxStream
	SkipLast(nth uint, opts ...rxgo.Option) RxStream
	SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
	StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream
	SumFloat32(opts ...rxgo.Option) RxStream
	SumFloat64(opts ...rxgo.Option) RxStream
	SumInt64(opts ...rxgo.Option) RxStream
	Take(nth uint, opts ...rxgo.Option) RxStream
	TakeLast(nth uint, opts ...rxgo.Option) RxStream
	TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
	TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream
	TimeInterval(opts ...rxgo.Option) RxStream
	Timestamp(opts ...rxgo.Option) RxStream
	ToMap(keySelector rxgo.Func, opts ...rxgo.Option) RxStream
	ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) RxStream
	ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)
	// Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.
	Unmarshal(unmarshaller decoder.Unmarshaller, factory func() interface{}, opts ...rxgo.Option) RxStream
	WindowWithCount(count int, opts ...rxgo.Option) RxStream
	WindowWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream
	WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream
	ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream

	// SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func.
	// It returns the orginal data to RxStream, not the buffered slice.
	SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream

	// SlidingWindowWithTime buffers the data in the specified sliding window time in milliseconds, the buffered data can be processed in the handler func.
	// It returns the orginal data to RxStream, not the buffered slice.
	SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) RxStream

	// ZipMultiObservers subscribes multi Y3 observers, zips the values into a slice and calls the zipper callback when all keys are observed.
	ZipMultiObservers(observers []decoder.KeyObserveFunc, zipper func(items []interface{}) (interface{}, error)) RxStream
}

func ConvertObservable

func ConvertObservable(observable rxgo.Observable) RxStream

func CreateObservable

func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream

func FromChannel added in v0.6.0

func FromChannel(channel chan []byte) RxStream

func FromReader

func FromReader(reader io.Reader) RxStream

func FromReaderWithDecoder added in v1.2.520

func FromReaderWithDecoder(readers chan io.Reader) RxStream

FromReaderWithDecoder creates a RxStream with decoder

func FromReaderWithFunc added in v0.6.0

func FromReaderWithFunc(f func() io.Reader) RxStream

type RxStreamImpl

type RxStreamImpl struct {
	// contains filtered or unexported fields
}

func (*RxStreamImpl) All

func (s *RxStreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AuditTime

func (s *RxStreamImpl) AuditTime(milliseconds uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageFloat32

func (s *RxStreamImpl) AverageFloat32(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageFloat64

func (s *RxStreamImpl) AverageFloat64(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageInt

func (s *RxStreamImpl) AverageInt(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageInt16

func (s *RxStreamImpl) AverageInt16(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageInt32

func (s *RxStreamImpl) AverageInt32(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageInt64

func (s *RxStreamImpl) AverageInt64(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) AverageInt8

func (s *RxStreamImpl) AverageInt8(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) BackOffRetry

func (s *RxStreamImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) BufferWithCount

func (s *RxStreamImpl) BufferWithCount(count int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) BufferWithTime

func (s *RxStreamImpl) BufferWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) BufferWithTimeOrCount

func (s *RxStreamImpl) BufferWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Connect

func (s *RxStreamImpl) Connect(ctx context.Context) (context.Context, rxgo.Disposable)

func (*RxStreamImpl) Contains

func (s *RxStreamImpl) Contains(equal rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Count

func (s *RxStreamImpl) Count(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Debounce

func (s *RxStreamImpl) Debounce(milliseconds uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) DefaultIfEmpty

func (s *RxStreamImpl) DefaultIfEmpty(defaultValue interface{}, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) DefaultIfEmptyWithTime

func (s *RxStreamImpl) DefaultIfEmptyWithTime(milliseconds uint32, defaultValue interface{}, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Distinct

func (s *RxStreamImpl) Distinct(apply rxgo.Func, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) DistinctUntilChanged

func (s *RxStreamImpl) DistinctUntilChanged(apply rxgo.Func, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) DoOnCompleted

func (s *RxStreamImpl) DoOnCompleted(completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

func (*RxStreamImpl) DoOnError

func (s *RxStreamImpl) DoOnError(errFunc rxgo.ErrFunc, opts ...rxgo.Option) rxgo.Disposed

func (*RxStreamImpl) DoOnNext

func (s *RxStreamImpl) DoOnNext(nextFunc rxgo.NextFunc, opts ...rxgo.Option) rxgo.Disposed

func (*RxStreamImpl) ElementAt

func (s *RxStreamImpl) ElementAt(index uint, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Encode added in v0.7.7

func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Error

func (s *RxStreamImpl) Error(opts ...rxgo.Option) error

func (*RxStreamImpl) Errors

func (s *RxStreamImpl) Errors(opts ...rxgo.Option) []error

func (*RxStreamImpl) Filter

func (s *RxStreamImpl) Filter(apply rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Find

func (s *RxStreamImpl) Find(find rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) First

func (s *RxStreamImpl) First(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) FirstOrDefault

func (s *RxStreamImpl) FirstOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) FlatMap

func (s *RxStreamImpl) FlatMap(apply rxgo.ItemToObservable, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ForEach

func (s *RxStreamImpl) ForEach(nextFunc rxgo.NextFunc, errFunc rxgo.ErrFunc, completedFunc rxgo.CompletedFunc, opts ...rxgo.Option) rxgo.Disposed

func (*RxStreamImpl) GroupBy

func (s *RxStreamImpl) GroupBy(length int, distribution func(rxgo.Item) int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) GroupByDynamic

func (s *RxStreamImpl) GroupByDynamic(distribution func(rxgo.Item) string, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) IgnoreElements

func (s *RxStreamImpl) IgnoreElements(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Join

func (s *RxStreamImpl) Join(joiner rxgo.Func2, right rxgo.Observable, timeExtractor func(interface{}) time.Time, windowInMS uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Last

func (s *RxStreamImpl) Last(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) LastOrDefault

func (s *RxStreamImpl) LastOrDefault(defaultValue interface{}, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Map

func (s *RxStreamImpl) Map(apply rxgo.Func, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Marshal

func (s *RxStreamImpl) Marshal(marshaller decoder.Marshaller, opts ...rxgo.Option) RxStream

Marshal transforms the items emitted by an Observable by applying a marshalling to each item.

func (*RxStreamImpl) Max

func (s *RxStreamImpl) Max(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) MergeReadWriterWithFunc added in v0.6.0

func (s *RxStreamImpl) MergeReadWriterWithFunc(rwf serverless.GetFlowFunc, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Min

func (s *RxStreamImpl) Min(comparator rxgo.Comparator, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Observe

func (s *RxStreamImpl) Observe(opts ...rxgo.Option) <-chan rxgo.Item

func (*RxStreamImpl) OnErrorResumeNext

func (s *RxStreamImpl) OnErrorResumeNext(resumeSequence rxgo.ErrorToObservable, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) OnErrorReturn

func (s *RxStreamImpl) OnErrorReturn(resumeFunc rxgo.ErrorFunc, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) OnErrorReturnItem

func (s *RxStreamImpl) OnErrorReturnItem(resume interface{}, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) OnObserve added in v0.7.2

func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) RxStream

func (*RxStreamImpl) Reduce

func (s *RxStreamImpl) Reduce(apply rxgo.Func2, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Repeat

func (s *RxStreamImpl) Repeat(count int64, milliseconds uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Retry

func (s *RxStreamImpl) Retry(count int, shouldRetry func(error) bool, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Run

func (s *RxStreamImpl) Run(opts ...rxgo.Option) rxgo.Disposed

func (*RxStreamImpl) Sample

func (s *RxStreamImpl) Sample(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Scan

func (s *RxStreamImpl) Scan(apply rxgo.Func2, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Send

func (s *RxStreamImpl) Send(output chan<- rxgo.Item, opts ...rxgo.Option)

func (*RxStreamImpl) SequenceEqual

func (s *RxStreamImpl) SequenceEqual(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Serialize

func (s *RxStreamImpl) Serialize(from int, identifier func(interface{}) int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Skip

func (s *RxStreamImpl) Skip(nth uint, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) SkipLast

func (s *RxStreamImpl) SkipLast(nth uint, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) SkipWhile

func (s *RxStreamImpl) SkipWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) SlidingWindowWithCount added in v1.1.0

func (s *RxStreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream

SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. It returns the orginal data to RxStream, not the buffered slice.

func (*RxStreamImpl) SlidingWindowWithTime added in v1.1.0

func (s *RxStreamImpl) SlidingWindowWithTime(windowTimeInMS uint32, slideTimeInMS uint32, handler Handler, opts ...rxgo.Option) RxStream

SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func. It returns the orginal data to RxStream, not the buffered slice.

func (*RxStreamImpl) StartWith

func (s *RxStreamImpl) StartWith(iterable rxgo.Iterable, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) StdOut

func (s *RxStreamImpl) StdOut(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Subscribe added in v0.7.2

func (s *RxStreamImpl) Subscribe(key byte) RxStream

func (*RxStreamImpl) SumFloat32

func (s *RxStreamImpl) SumFloat32(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) SumFloat64

func (s *RxStreamImpl) SumFloat64(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) SumInt64

func (s *RxStreamImpl) SumInt64(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Take

func (s *RxStreamImpl) Take(nth uint, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) TakeLast

func (s *RxStreamImpl) TakeLast(nth uint, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) TakeUntil

func (s *RxStreamImpl) TakeUntil(apply rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) TakeWhile

func (s *RxStreamImpl) TakeWhile(apply rxgo.Predicate, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) TimeInterval

func (s *RxStreamImpl) TimeInterval(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) Timestamp

func (s *RxStreamImpl) Timestamp(opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ToMap

func (s *RxStreamImpl) ToMap(keySelector rxgo.Func, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ToMapWithValueSelector

func (s *RxStreamImpl) ToMapWithValueSelector(keySelector, valueSelector rxgo.Func, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ToSlice

func (s *RxStreamImpl) ToSlice(initialCapacity int, opts ...rxgo.Option) ([]interface{}, error)

func (*RxStreamImpl) Unmarshal

func (s *RxStreamImpl) Unmarshal(unmarshaller decoder.Unmarshaller, factory func() interface{}, opts ...rxgo.Option) RxStream

Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.

func (*RxStreamImpl) WindowWithCount

func (s *RxStreamImpl) WindowWithCount(count int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) WindowWithTime

func (s *RxStreamImpl) WindowWithTime(milliseconds uint32, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) WindowWithTimeOrCount

func (s *RxStreamImpl) WindowWithTimeOrCount(milliseconds uint32, count int, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ZipFromIterable

func (s *RxStreamImpl) ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream

func (*RxStreamImpl) ZipMultiObservers added in v1.1.0

func (s *RxStreamImpl) ZipMultiObservers(observers []decoder.KeyObserveFunc, zipper func(items []interface{}) (interface{}, error)) RxStream

ZipMultiObservers subscribes multi Y3 observers, zips the values into a slice and calls the zipper callback when all keys are observed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL