Versions in this module Expand all Collapse all v0 v0.0.1 Aug 3, 2024 Changes in this version + var ErrCanceled = context.Canceled + var ErrTimeout = context.DeadlineExceeded + func DoWithRetry(fn func() error, opts ...RetryOption) error + func DoWithRetryCtx(ctx context.Context, fn func(ctx context.Context, retryCount int) error, ...) error + func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error + func Parallel(fns ...func()) + func ParallelErr(fns ...func() error) error + type DoOption func() context.Context + func WithContext(ctx context.Context) DoOption + type FilterFunc func(item any) bool + type ForAllFunc func(pipe <-chan any) + type ForEachFunc func(item any) + type GenerateFunc func(source chan<- any) + type KeyFunc func(item any) any + type LessFunc func(a, b any) bool + type MapFunc func(item any) any + type Option func(opts *rxOptions) + func UnlimitedWorkers() Option + func WithWorkers(workers int) Option + type ParallelFunc func(item any) + type ReduceFunc func(pipe <-chan any) (any, error) + type RetryOption func(*retryOptions) + func WithIgnoreErrors(ignoreErrors []error) RetryOption + func WithInterval(interval time.Duration) RetryOption + func WithRetry(times int) RetryOption + func WithTimeout(timeout time.Duration) RetryOption + type Stream struct + func Concat(s Stream, others ...Stream) Stream + func From(generate GenerateFunc) Stream + func Just(items ...any) Stream + func Range(source <-chan any) Stream + func (s Stream) AllMach(predicate func(item any) bool) bool + func (s Stream) AnyMach(predicate func(item any) bool) bool + func (s Stream) Buffer(n int) Stream + func (s Stream) Concat(others ...Stream) Stream + func (s Stream) Count() (count int) + func (s Stream) Distinct(fn KeyFunc) Stream + func (s Stream) Done() + func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream + func (s Stream) First() any + func (s Stream) ForAll(fn ForAllFunc) + func (s Stream) ForEach(fn ForEachFunc) + func (s Stream) Group(fn KeyFunc) Stream + func (s Stream) Head(n int64) Stream + func (s Stream) Last() (item any) + func (s Stream) Map(fn MapFunc, opts ...Option) Stream + func (s Stream) Max(less LessFunc) any + func (s Stream) Merge() Stream + func (s Stream) Min(less LessFunc) any + func (s Stream) NoneMatch(predicate func(item any) bool) bool + func (s Stream) Parallel(fn ParallelFunc, opts ...Option) + func (s Stream) Reduce(fn ReduceFunc) (any, error) + func (s Stream) Reverse() Stream + func (s Stream) Skip(n int64) Stream + func (s Stream) Sort(less LessFunc) Stream + func (s Stream) Split(n int) Stream + func (s Stream) Tail(n int64) Stream + func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream + type WalkFunc func(item any, pipe chan<- any)