Documentation ¶
Index ¶
- Variables
- func DoWithRetries(fn func() error, opts ...RetryOption) error
- func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) error
- func Parallel(fns ...func())
- type FilterFunc
- type ForAllFunc
- type ForEachFunc
- type FxOption
- type GenerateFunc
- type KeyFunc
- type LessFunc
- type MapFunc
- type Option
- type ParallelFunc
- type ReduceFunc
- type RetryOption
- type Stream
- func (p Stream) Buffer(n int) Stream
- func (p Stream) Count() (count int)
- func (p Stream) Distinct(fn KeyFunc) Stream
- func (p Stream) Done()
- func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream
- func (p Stream) ForAll(fn ForAllFunc)
- func (p Stream) ForEach(fn ForEachFunc)
- func (p Stream) Group(fn KeyFunc) Stream
- func (p Stream) Head(n int64) Stream
- func (p Stream) Map(fn MapFunc, opts ...Option) Stream
- func (p Stream) Merge() Stream
- func (p Stream) Parallel(fn ParallelFunc, opts ...Option)
- func (p Stream) Reduce(fn ReduceFunc) (interface{}, error)
- func (p Stream) Reverse() Stream
- func (p Stream) Sort(less LessFunc) Stream
- func (p Stream) Split(n int) Stream
- func (p Stream) Tail(n int64) Stream
- func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream
- type WalkFunc
Constants ¶
This section is empty.
Variables ¶
var ( ErrCanceled = context.Canceled ErrTimeout = context.DeadlineExceeded )
Functions ¶
func DoWithRetries ¶
func DoWithRetries(fn func() error, opts ...RetryOption) error
func DoWithTimeout ¶
Types ¶
type FilterFunc ¶
type FilterFunc func(item interface{}) bool
type ForAllFunc ¶
type ForAllFunc func(pipe <-chan interface{})
type ForEachFunc ¶
type ForEachFunc func(item interface{})
type FxOption ¶
func WithContext ¶
type GenerateFunc ¶
type GenerateFunc func(source chan<- interface{})
type Option ¶
type Option func(opts *rxOptions)
func UnlimitedWorkers ¶
func UnlimitedWorkers() Option
UnlimitedWorkers lets the caller to use as many workers as the tasks.
func WithWorkers ¶
WithWorkers lets the caller to customize the concurrent workers.
type ParallelFunc ¶
type ParallelFunc func(item interface{})
type ReduceFunc ¶
type ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
type RetryOption ¶
type RetryOption func(*retryOptions)
func WithRetries ¶
func WithRetries(times int) RetryOption
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func From ¶
func From(generate GenerateFunc) Stream
From constructs a Stream from the given GenerateFunc.
func Just ¶
func Just(items ...interface{}) Stream
Just converts the given arbitrary items to a Stream.
func Range ¶
func Range(source <-chan interface{}) Stream
Range converts the given channel to a Stream.
func (Stream) Buffer ¶
Buffer buffers the items into a queue with size n. It can balance the producer and the consumer if their processing throughput don't match.
func (Stream) Filter ¶
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream
Filter filters the items by the given FilterFunc.
func (Stream) ForAll ¶
func (p Stream) ForAll(fn ForAllFunc)
ForAll handles the streaming elements from the source and no later streams.
func (Stream) ForEach ¶
func (p Stream) ForEach(fn ForEachFunc)
ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
func (Stream) Map ¶
Maps converts each item to another corresponding item, which means it's a 1:1 model.
func (Stream) Parallel ¶
func (p Stream) Parallel(fn ParallelFunc, opts ...Option)
Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (Stream) Reduce ¶
func (p Stream) Reduce(fn ReduceFunc) (interface{}, error)
Reduce is a utility method to let the caller deal with the underlying channel.