Versions in this module Expand all Collapse all v1 v1.0.50 Nov 27, 2020 Changes in this version + var ErrCanceled = context.Canceled + var ErrTimeout = context.DeadlineExceeded + func DoWithRetries(fn func() error, opts ...RetryOption) error + func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) error + func Parallel(fns ...func()) + type FilterFunc func(item interface{}) bool + type ForAllFunc func(pipe <-chan interface{}) + type ForEachFunc func(item interface{}) + type FxOption func() context.Context + func WithContext(ctx context.Context) FxOption + type GenerateFunc func(source chan<- interface{}) + type KeyFunc func(item interface{}) interface + type LessFunc func(a, b interface{}) bool + type MapFunc func(item interface{}) interface + type Option func(opts *rxOptions) + func UnlimitedWorkers() Option + func WithWorkers(workers int) Option + type ParallelFunc func(item interface{}) + type ReduceFunc func(pipe <-chan interface{}) (interface{}, error) + type RetryOption func(*retryOptions) + func WithRetries(times int) RetryOption + type Stream struct + func From(generate GenerateFunc) Stream + func Just(items ...interface{}) Stream + func Range(source <-chan interface{}) Stream + func (p Stream) Buffer(n int) Stream + func (p Stream) Count() (count int) + func (p Stream) Distinct(fn KeyFunc) Stream + func (p Stream) Done() + func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream + func (p Stream) ForAll(fn ForAllFunc) + func (p Stream) ForEach(fn ForEachFunc) + func (p Stream) Group(fn KeyFunc) Stream + func (p Stream) Head(n int64) Stream + func (p Stream) Map(fn MapFunc, opts ...Option) Stream + func (p Stream) Merge() Stream + func (p Stream) Parallel(fn ParallelFunc, opts ...Option) + func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) + func (p Stream) Reverse() Stream + func (p Stream) Sort(less LessFunc) Stream + func (p Stream) Split(n int) Stream + func (p Stream) Tail(n int64) Stream + func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream + type WalkFunc func(item interface{}, pipe chan<- interface{})