Documentation ¶
Index ¶
- Variables
- func DoWithRetry(fn func() error, opts ...RetryOption) error
- func DoWithRetryCtx(ctx context.Context, fn func(ctx context.Context, retryCount int) error, ...) error
- func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error
- func Parallel(fns ...func())
- type DoOption
- type FilterFunc
- type ForAllFunc
- type ForEachFunc
- type GenerateFunc
- type KeyFunc
- type LessFunc
- type MapFunc
- type Option
- type ParallelFunc
- type ReduceFunc
- type RetryOption
- type Stream
- func (s Stream) AllMach(predicate func(item any) bool) bool
- func (s Stream) AnyMach(predicate func(item any) bool) bool
- func (s Stream) Buffer(n int) Stream
- func (s Stream) Concat(others ...Stream) Stream
- func (s Stream) Count() (count int)
- func (s Stream) Distinct(fn KeyFunc) Stream
- func (s Stream) Done()
- func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream
- func (s Stream) First() any
- func (s Stream) ForAll(fn ForAllFunc)
- func (s Stream) ForEach(fn ForEachFunc)
- func (s Stream) Group(fn KeyFunc) Stream
- func (s Stream) Head(n int64) Stream
- func (s Stream) Last() (item any)
- func (s Stream) Map(fn MapFunc, opts ...Option) Stream
- func (s Stream) Max(less LessFunc) any
- func (s Stream) Merge() Stream
- func (s Stream) Min(less LessFunc) any
- func (s Stream) NoneMatch(predicate func(item any) bool) bool
- func (s Stream) Parallel(fn ParallelFunc, opts ...Option)
- func (s Stream) Reduce(fn ReduceFunc) (any, error)
- func (s Stream) Reverse() Stream
- func (s Stream) Skip(n int64) Stream
- func (s Stream) Sort(less LessFunc) Stream
- func (s Stream) Split(n int) Stream
- func (s Stream) Tail(n int64) Stream
- func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream
- type WalkFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCanceled is the error returned when the context is canceled. ErrCanceled = context.Canceled // ErrTimeout is the error returned when the context's deadline passes. ErrTimeout = context.DeadlineExceeded )
Functions ¶
func DoWithRetry ¶
func DoWithRetry(fn func() error, opts ...RetryOption) error
DoWithRetry runs fn, and retries if failed. Default to retry 3 times. Note that if the fn function accesses global variables outside the function and performs modification operations, it is best to lock them, otherwise there may be data race issues
func DoWithRetryCtx ¶
func DoWithRetryCtx(ctx context.Context, fn func(ctx context.Context, retryCount int) error, opts ...RetryOption) error
DoWithRetryCtx runs fn, and retries if failed. Default to retry 3 times. fn retryCount indicates the current number of retries, starting from 0 Note that if the fn function accesses global variables outside the function and performs modification operations, it is best to lock them, otherwise there may be data race issues
func DoWithTimeout ¶
DoWithTimeout runs fn with timeout control.
Types ¶
type DoOption ¶
DoOption defines the method to customize a DoWithTimeout call.
func WithContext ¶
WithContext customizes a DoWithTimeout call with given ctx.
type FilterFunc ¶
FilterFunc defines the method to filter a Stream.
type ForAllFunc ¶
type ForAllFunc func(pipe <-chan any)
ForAllFunc defines the method to handle all elements in a Stream.
type ForEachFunc ¶
type ForEachFunc func(item any)
ForEachFunc defines the method to handle each element in a Stream.
type GenerateFunc ¶
type GenerateFunc func(source chan<- any)
GenerateFunc defines the method to send elements into a Stream.
type Option ¶
type Option func(opts *rxOptions)
Option defines the method to customize a Stream.
func UnlimitedWorkers ¶
func UnlimitedWorkers() Option
UnlimitedWorkers lets the caller use as many workers as the tasks.
func WithWorkers ¶
WithWorkers lets the caller customize the concurrent workers.
type ParallelFunc ¶
type ParallelFunc func(item any)
ParallelFunc defines the method to handle elements parallelly.
type ReduceFunc ¶
ReduceFunc defines the method to reduce all the elements in a Stream.
type RetryOption ¶
type RetryOption func(*retryOptions)
RetryOption defines the method to customize DoWithRetry.
func WithInterval ¶
func WithInterval(interval time.Duration) RetryOption
func WithRetry ¶
func WithRetry(times int) RetryOption
WithRetry customize a DoWithRetry call with given retry times.
func WithTimeout ¶
func WithTimeout(timeout time.Duration) RetryOption
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
A Stream is a stream that can be used to do stream processing.
func From ¶
func From(generate GenerateFunc) Stream
From constructs a Stream from the given GenerateFunc.
func (Stream) AllMach ¶
AllMach returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.
func (Stream) AnyMach ¶
AnyMach returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated.
func (Stream) Buffer ¶
Buffer buffers the items into a queue with size n. It can balance the producer and the consumer if their processing throughput don't match.
func (Stream) Filter ¶
func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream
Filter filters the items by the given FilterFunc.
func (Stream) ForAll ¶
func (s Stream) ForAll(fn ForAllFunc)
ForAll handles the streaming elements from the source and no later streams.
func (Stream) ForEach ¶
func (s Stream) ForEach(fn ForEachFunc)
ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
func (Stream) Map ¶
Map converts each item to another corresponding item, which means it's a 1:1 model.
func (Stream) NoneMatch ¶
NoneMatch returns whether all elements of this stream don't match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.
func (Stream) Parallel ¶
func (s Stream) Parallel(fn ParallelFunc, opts ...Option)
Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (Stream) Reduce ¶
func (s Stream) Reduce(fn ReduceFunc) (any, error)
Reduce is an utility method to let the caller deal with the underlying channel.