Documentation ¶
Index ¶
- Variables
- func DoWithRetry(fn func() error, opts ...RetryOption) error
- func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error
- func Parallel(fns ...func())
- type DoOption
- type FilterFunc
- type ForAllFunc
- type ForEachFunc
- type GenerateFunc
- type KeyFunc
- type LessFunc
- type MapFunc
- type Option
- type ParallelFunc
- type ReduceFunc
- type RetryOption
- type Stream
- func (p Stream) Buffer(n int) Stream
- func (p Stream) Count() (count int)
- func (p Stream) Distinct(fn KeyFunc) Stream
- func (p Stream) Done()
- func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream
- func (p Stream) ForAll(fn ForAllFunc)
- func (p Stream) ForEach(fn ForEachFunc)
- func (p Stream) Group(fn KeyFunc) Stream
- func (p Stream) Head(n int64) Stream
- func (p Stream) Map(fn MapFunc, opts ...Option) Stream
- func (p Stream) Merge() Stream
- func (p Stream) Parallel(fn ParallelFunc, opts ...Option)
- func (p Stream) Reduce(fn ReduceFunc) (interface{}, error)
- func (p Stream) Reverse() Stream
- func (p Stream) Sort(less LessFunc) Stream
- func (p Stream) Split(n int) Stream
- func (p Stream) Tail(n int64) Stream
- func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream
- type WalkFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCanceled is the error returned when the context is canceled. ErrCanceled = context.Canceled // ErrTimeout is the error returned when the context's deadline passes. ErrTimeout = context.DeadlineExceeded )
Functions ¶
func DoWithRetry ¶ added in v1.1.5
func DoWithRetry(fn func() error, opts ...RetryOption) error
DoWithRetry runs fn, and retries if failed. Default to retry 3 times.
func DoWithTimeout ¶
DoWithTimeout runs fn with timeout control.
Types ¶
type DoOption ¶ added in v1.1.5
DoOption defines the method to customize a DoWithTimeout call.
func WithContext ¶
WithContext customizes a DoWithTimeout call with given ctx.
type FilterFunc ¶
type FilterFunc func(item interface{}) bool
FilterFunc defines the method to filter a Stream.
type ForAllFunc ¶
type ForAllFunc func(pipe <-chan interface{})
ForAllFunc defines the method to handle all elements in a Stream.
type ForEachFunc ¶
type ForEachFunc func(item interface{})
ForEachFunc defines the method to handle each element in a Stream.
type GenerateFunc ¶
type GenerateFunc func(source chan<- interface{})
GenerateFunc defines the method to send elements into a Stream.
type KeyFunc ¶
type KeyFunc func(item interface{}) interface{}
KeyFunc defines the method to generate keys for the elements in a Stream.
type LessFunc ¶
type LessFunc func(a, b interface{}) bool
LessFunc defines the method to compare the elements in a Stream.
type MapFunc ¶
type MapFunc func(item interface{}) interface{}
MapFunc defines the method to map each element to another object in a Stream.
type Option ¶
type Option func(opts *rxOptions)
Option defines the method to customize a Stream.
func UnlimitedWorkers ¶
func UnlimitedWorkers() Option
UnlimitedWorkers lets the caller to use as many workers as the tasks.
func WithWorkers ¶
WithWorkers lets the caller to customize the concurrent workers.
type ParallelFunc ¶
type ParallelFunc func(item interface{})
ParallelFunc defines the method to handle elements parallelly.
type ReduceFunc ¶
type ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
ReduceFunc defines the method to reduce all the elements in a Stream.
type RetryOption ¶
type RetryOption func(*retryOptions)
RetryOption defines the method to customize DoWithRetry.
func WithRetry ¶ added in v1.1.5
func WithRetry(times int) RetryOption
WithRetry customize a DoWithRetry call with given retry times.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
A Stream is a stream that can be used to do stream processing.
func From ¶
func From(generate GenerateFunc) Stream
From constructs a Stream from the given GenerateFunc.
func Just ¶
func Just(items ...interface{}) Stream
Just converts the given arbitrary items to a Stream.
func Range ¶
func Range(source <-chan interface{}) Stream
Range converts the given channel to a Stream.
func (Stream) Buffer ¶
Buffer buffers the items into a queue with size n. It can balance the producer and the consumer if their processing throughput don't match.
func (Stream) Filter ¶
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream
Filter filters the items by the given FilterFunc.
func (Stream) ForAll ¶
func (p Stream) ForAll(fn ForAllFunc)
ForAll handles the streaming elements from the source and no later streams.
func (Stream) ForEach ¶
func (p Stream) ForEach(fn ForEachFunc)
ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
func (Stream) Map ¶
Map converts each item to another corresponding item, which means it's a 1:1 model.
func (Stream) Parallel ¶
func (p Stream) Parallel(fn ParallelFunc, opts ...Option)
Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (Stream) Reduce ¶
func (p Stream) Reduce(fn ReduceFunc) (interface{}, error)
Reduce is a utility method to let the caller deal with the underlying channel.