Documentation
¶
Overview ¶
Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
Index ¶
- func Chain[T any](iters ...*fun.Iterator[T]) *fun.Iterator[T]
- func Contains[T comparable](ctx context.Context, item T, iter *fun.Iterator[T]) bool
- func DropZeroValues[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]
- func Generate[T any](fn fun.Producer[T], optp ...fun.OptionProvider[*fun.WorkerGroupConf]) *fun.Iterator[T]
- func Indexed[T any](iter *fun.Iterator[T]) *fun.Iterator[dt.Pair[int, T]]
- func JSON[T any](in io.Reader) *fun.Iterator[T]
- func Map[T any, O any](input *fun.Iterator[T], mapFn func(context.Context, T) (O, error), ...) *fun.Iterator[O]
- func MapReduce[T any, O any, R any](input *fun.Iterator[T], mapFn func(context.Context, T) (O, error), ...) fun.Producer[R]
- func MergeSliceIterators[T any](iter *fun.Iterator[[]T]) *fun.Iterator[T]
- func MergeSlices[T any](sls ...[]T) *fun.Iterator[T]
- func Monotonic(maxVal int) *fun.Iterator[int]
- func ParallelForEach[T any](ctx context.Context, iter *fun.Iterator[T], fn fun.Processor[T], ...) error
- func Process[T any](ctx context.Context, iter *fun.Iterator[T], fn fun.Processor[T], ...) error
- func RateLimit[T any](iter *fun.Iterator[T], num int, window time.Duration) *fun.Iterator[T]
- func Reduce[T any, O any](ctx context.Context, iter *fun.Iterator[T], reducer func(T, O) (O, error), ...) (value O, err error)
- func Uniq[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]
- func Worker[OP fun.Worker | fun.Operation](ctx context.Context, iter *fun.Iterator[OP], ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Chain ¶ added in v0.10.0
Chain, like merge, takes a sequence of iterators and produces a combined iterator. Chain processes each iterator provided in sequence, where merge reads from all iterators in at once.
func Contains ¶ added in v0.9.1
Contains processes an iterator of compareable type returning true after the first element that equals item, and false otherwise.
func DropZeroValues ¶ added in v0.10.0
func DropZeroValues[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]
DropZeroValues processes an iterator removing all zero values.
func Generate ¶ added in v0.2.0
func Generate[T any]( fn fun.Producer[T], optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) *fun.Iterator[T]
Generate creates an iterator using a generator pattern which produces items until the generator function returns io.EOF, or the context (passed to the first call to Next()) is canceled. Parallel operation, and continue on error/continue-on-panic semantics are available and share configuration with the Map and ParallelForEach operations.
func Indexed ¶ added in v0.10.4
Indexed produces an iterator that keeps track of and reports the sequence/index id of the item in the iteration sequence.
func JSON ¶ added in v0.10.0
JSON takes a stream of line-oriented JSON and marshals those documents into objects in the form of an iterator.
func Map ¶
func Map[T any, O any]( input *fun.Iterator[T], mapFn func(context.Context, T) (O, error), optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) *fun.Iterator[O]
Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.
If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. The mapping operation respects the fun.ErrIterationSkip error, If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error can be unwrapped or converted to a slice with the ers.Unwind function. Panics in the map function are converted to errors and always collected but may abort the operation if ContinueOnPanic is not set.
func MapReduce ¶ added in v0.10.0
func MapReduce[T any, O any, R any]( input *fun.Iterator[T], mapFn func(context.Context, T) (O, error), reduceFn func(O, R) (R, error), initialReduceValue R, optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) fun.Producer[R]
MapReduce combines the map and reduce operations to process an iterator (in parallel, according to configuration) into an output iterator, and then process that iterator with the reduce function.
MapReduce itself returns a fun.Producer function, which functions as a future, and the entire operation, does not begin running until the producer function is called, and the
This works as a pull: the Reduce operation starts and waits for the map operation to produce a value, the map operation waits for the input iterator to produce values.
func MergeSliceIterators ¶ added in v0.10.5
MergeSliceIterators converts an iterator of slices to an flattened iterator of their elements.
func MergeSlices ¶ added in v0.10.5
MergeSlices converts an arbitrary number of slices and returns a single iterator for their items.
func Monotonic ¶ added in v0.10.0
Monotonic creates an iterator that produces increasing numbers until a specified maximum.
func ParallelForEach ¶ added in v0.4.0
func ParallelForEach[T any]( ctx context.Context, iter *fun.Iterator[T], fn fun.Processor[T], optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) error
ParallelForEach processes the iterator in parallel, and is essentially an iterator-driven worker pool. The input iterator is split dynamically into iterators for every worker (determined by Options.NumWorkers,) with the division between workers determined by their processing speed (e.g. workers should not suffer from head-of-line blocking,) and input iterators are consumed (safely) as work is processed.
func Process ¶ added in v0.10.0
func Process[T any]( ctx context.Context, iter *fun.Iterator[T], fn fun.Processor[T], optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) error
Process provides a (potentially) more sensible alternate name for ParallelForEach, but otherwise is identical.
func RateLimit ¶ added in v0.10.9
RateLimit wraps an iterator with a rate-limiter to ensure that the output iterator will produce no more than <num> items in any given <window>. Does not garuntee
func Reduce ¶
func Reduce[T any, O any]( ctx context.Context, iter *fun.Iterator[T], reducer func(T, O) (O, error), initalValue O, ) (value O, err error)
Reduce processes an input iterator with a reduce function and outputs the final value. The initial value may be a zero or nil value.
func Uniq ¶ added in v0.9.4
func Uniq[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]
Uniq iterates over an iterator of comparable items, and caches them in a map, returning the first instance of each equivalent object, and skipping subsequent items
func Worker ¶ added in v0.10.0
func Worker[OP fun.Worker | fun.Operation]( ctx context.Context, iter *fun.Iterator[OP], optp ...fun.OptionProvider[*fun.WorkerGroupConf], ) error
Worker takes iterators of fun.Worker or fun.Operation lambdas and processes them in according to the configuration.
All operations functions are processed using their respective Safe() methods, which means that the functions themselves will never panic, and the ContinueOnPanic option will not impact the outcome of the operation (unless the iterator returns a nil operation.)
This operation is particularly powerful in combination with the iterator for a pubsub.Distributor, interfaces which provide synchronized, blocking, and destructive (e.g. so completed workloads do not remain in memory) containers.
Worker is implemented using ParallelForEach.
Types ¶
This section is empty.