Versions in this module Expand all Collapse all v1 v1.4.5 Dec 28, 2022 v1.4.4 Dec 28, 2022 Changes in this version + var ErrCanceled = context.Canceled + var ErrTimeout = context.DeadlineExceeded + func DoWithRetry(fn func() error, opts ...RetryOption) error + func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error + func Parallel(fns ...func()) + type DoOption func() context.Context + func WithContext(ctx context.Context) DoOption + type FilterFunc func(item interface{}) bool + type ForAllFunc func(pipe <-chan interface{}) + type ForEachFunc func(item interface{}) + type GenerateFunc func(source chan<- interface{}) + type KeyFunc func(item interface{}) interface + type LessFunc func(a, b interface{}) bool + type MapFunc func(item interface{}) interface + type Option func(opts *rxOptions) + func UnlimitedWorkers() Option + func WithWorkers(workers int) Option + type ParallelFunc func(item interface{}) + type ReduceFunc func(pipe <-chan interface{}) (interface{}, error) + type RetryOption func(*retryOptions) + func WithRetry(times int) RetryOption + type Stream struct + func Concat(s Stream, others ...Stream) Stream + func From(generate GenerateFunc) Stream + func Just(items ...interface{}) Stream + func Range(source <-chan interface{}) Stream + func (s Stream) AllMach(predicate func(item interface{}) bool) bool + func (s Stream) AnyMach(predicate func(item interface{}) bool) bool + func (s Stream) Buffer(n int) Stream + func (s Stream) Concat(others ...Stream) Stream + func (s Stream) Count() (count int) + func (s Stream) Distinct(fn KeyFunc) Stream + func (s Stream) Done() + func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream + func (s Stream) First() interface{} + func (s Stream) ForAll(fn ForAllFunc) + func (s Stream) ForEach(fn ForEachFunc) + func (s Stream) Group(fn KeyFunc) Stream + func (s Stream) Head(n int64) Stream + func (s Stream) Last() (item interface{}) + func (s Stream) Map(fn MapFunc, opts ...Option) Stream + func (s Stream) Merge() Stream + func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool + func (s Stream) Parallel(fn ParallelFunc, opts ...Option) + func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) + func (s Stream) Reverse() Stream + func (s Stream) Skip(n int64) Stream + func (s Stream) Sort(less LessFunc) Stream + func (s Stream) Split(n int) Stream + func (s Stream) Tail(n int64) Stream + func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream + type WalkFunc func(item interface{}, pipe chan<- interface{})