Documentation ¶
Index ¶
- Variables
- func DoWithRetry(fn func() error, opts ...RetryOption) error
- func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error
- func Parallel(fns ...func())
- type DoOption
- type FilterFunc
- type ForAllFunc
- type ForEachFunc
- type GenerateFunc
- type KeyFunc
- type LessFunc
- type MapFunc
- type Option
- type ParallelFunc
- type ReduceFunc
- type RetryOption
- type Stream
- func (s Stream) AllMach(predicate func(item interface{}) bool) bool
- func (s Stream) AnyMach(predicate func(item interface{}) bool) bool
- func (s Stream) Buffer(n int) Stream
- func (s Stream) Concat(others ...Stream) Stream
- func (s Stream) Count() (count int)
- func (s Stream) Distinct(fn KeyFunc) Stream
- func (s Stream) Done()
- func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream
- func (s Stream) ForAll(fn ForAllFunc)
- func (s Stream) ForEach(fn ForEachFunc)
- func (s Stream) Group(fn KeyFunc) Stream
- func (s Stream) Head(n int64) Stream
- func (s Stream) Map(fn MapFunc, opts ...Option) Stream
- func (s Stream) Merge() Stream
- func (s Stream) Parallel(fn ParallelFunc, opts ...Option)
- func (s Stream) Reduce(fn ReduceFunc) (interface{}, error)
- func (s Stream) Reverse() Stream
- func (s Stream) Skip(n int64) Stream
- func (s Stream) Sort(less LessFunc) Stream
- func (s Stream) Split(n int) Stream
- func (s Stream) Tail(n int64) Stream
- func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream
- type WalkFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCanceled is the error returned when the context is canceled. ErrCanceled = context.Canceled // ErrTimeout is the error returned when the context's deadline passes. ErrTimeout = context.DeadlineExceeded )
Functions ¶
func DoWithRetry ¶
func DoWithRetry(fn func() error, opts ...RetryOption) error
DoWithRetry runs fn, and retries if failed. Default to retry 3 times.
func DoWithTimeout ¶
DoWithTimeout runs fn with timeout control.
Types ¶
type DoOption ¶
DoOption defines the method to customize a DoWithTimeout call.
func WithContext ¶
WithContext customizes a DoWithTimeout call with given ctx.
type FilterFunc ¶
type FilterFunc func(item interface{}) bool
FilterFunc defines the method to filter a Stream.
type ForAllFunc ¶
type ForAllFunc func(pipe <-chan interface{})
ForAllFunc defines the method to handle all elements in a Stream.
type ForEachFunc ¶
type ForEachFunc func(item interface{})
ForEachFunc defines the method to handle each element in a Stream.
type GenerateFunc ¶
type GenerateFunc func(source chan<- interface{})
GenerateFunc defines the method to send elements into a Stream.
type KeyFunc ¶
type KeyFunc func(item interface{}) interface{}
KeyFunc defines the method to generate keys for the elements in a Stream.
type LessFunc ¶
type LessFunc func(a, b interface{}) bool
LessFunc defines the method to compare the elements in a Stream.
type MapFunc ¶
type MapFunc func(item interface{}) interface{}
MapFunc defines the method to map each element to another object in a Stream.
type Option ¶
type Option func(opts *rxOptions)
Option defines the method to customize a Stream.
func UnlimitedWorkers ¶
func UnlimitedWorkers() Option
UnlimitedWorkers lets the caller to use as many workers as the tasks.
func WithWorkers ¶
WithWorkers lets the caller to customize the concurrent workers.
type ParallelFunc ¶
type ParallelFunc func(item interface{})
ParallelFunc defines the method to handle elements parallelly.
type ReduceFunc ¶
type ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
ReduceFunc defines the method to reduce all the elements in a Stream.
type RetryOption ¶
type RetryOption func(*retryOptions)
RetryOption defines the method to customize DoWithRetry.
func WithRetry ¶
func WithRetry(times int) RetryOption
WithRetry customize a DoWithRetry call with given retry times.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
A Stream is a stream that can be used to do stream processing.
func From ¶
func From(generate GenerateFunc) Stream
From constructs a Stream from the given GenerateFunc.
func Just ¶
func Just(items ...interface{}) Stream
Just converts the given arbitrary items to a Stream.
func Range ¶
func Range(source <-chan interface{}) Stream
Range converts the given channel to a Stream.
func (Stream) AllMach ¶
AllMach returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.
func (Stream) AnyMach ¶
AnyMach returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated.
func (Stream) Buffer ¶
Buffer buffers the items into a queue with size n. It can balance the producer and the consumer if their processing throughput don't match.
func (Stream) Filter ¶
func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream
Filter filters the items by the given FilterFunc.
func (Stream) ForAll ¶
func (s Stream) ForAll(fn ForAllFunc)
ForAll handles the streaming elements from the source and no later streams.
func (Stream) ForEach ¶
func (s Stream) ForEach(fn ForEachFunc)
ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
func (Stream) Map ¶
Map converts each item to another corresponding item, which means it's a 1:1 model.
func (Stream) Parallel ¶
func (s Stream) Parallel(fn ParallelFunc, opts ...Option)
Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (Stream) Reduce ¶
func (s Stream) Reduce(fn ReduceFunc) (interface{}, error)
Reduce is a utility method to let the caller deal with the underlying channel.