itertool

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 10 Imported by: 4

Documentation

Overview

Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Chain added in v0.10.0

func Chain[T any](iters ...*fun.Iterator[T]) *fun.Iterator[T]

Chain, like merge, takes a sequence of iterators and produces a combined iterator. Chain processes each iterator provided in sequence, where merge reads from all iterators in at once.

func Contains added in v0.9.1

func Contains[T comparable](ctx context.Context, item T, iter *fun.Iterator[T]) bool

Contains processes an iterator of compareable type returning true after the first element that equals item, and false otherwise.

func DropZeroValues added in v0.10.0

func DropZeroValues[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]

DropZeroValues processes an iterator removing all zero values.

func Generate added in v0.2.0

func Generate[T any](
	fn fun.Producer[T],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) *fun.Iterator[T]

Generate creates an iterator using a generator pattern which produces items until the generator function returns io.EOF, or the context (passed to the first call to Next()) is canceled. Parallel operation, and continue on error/continue-on-panic semantics are available and share configuration with the Map and ParallelForEach operations.

func Indexed added in v0.10.4

func Indexed[T any](iter *fun.Iterator[T]) *fun.Iterator[dt.Pair[int, T]]

Indexed produces an iterator that keeps track of and reports the sequence/index id of the item in the iteration sequence.

func JSON added in v0.10.0

func JSON[T any](in io.Reader) *fun.Iterator[T]

JSON takes a stream of line-oriented JSON and marshals those documents into objects in the form of an iterator.

func Map

func Map[T any, O any](
	input *fun.Iterator[T],
	mapFn func(context.Context, T) (O, error),
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) *fun.Iterator[O]

Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.

If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. The mapping operation respects the fun.ErrIterationSkip error, If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error can be unwrapped or converted to a slice with the ers.Unwind function. Panics in the map function are converted to errors and always collected but may abort the operation if ContinueOnPanic is not set.

func MapReduce added in v0.10.0

func MapReduce[T any, O any, R any](
	input *fun.Iterator[T],
	mapFn func(context.Context, T) (O, error),
	reduceFn func(O, R) (R, error),
	initialReduceValue R,
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) fun.Producer[R]

MapReduce combines the map and reduce operations to process an iterator (in parallel, according to configuration) into an output iterator, and then process that iterator with the reduce function.

MapReduce itself returns a fun.Producer function, which functions as a future, and the entire operation, does not begin running until the producer function is called, and the

This works as a pull: the Reduce operation starts and waits for the map operation to produce a value, the map operation waits for the input iterator to produce values.

func MergeSliceIterators added in v0.10.5

func MergeSliceIterators[T any](iter *fun.Iterator[[]T]) *fun.Iterator[T]

MergeSliceIterators converts an iterator of slices to an flattened iterator of their elements.

func MergeSlices added in v0.10.5

func MergeSlices[T any](sls ...[]T) *fun.Iterator[T]

MergeSlices converts an arbitrary number of slices and returns a single iterator for their items.

func Monotonic added in v0.10.0

func Monotonic(maxVal int) *fun.Iterator[int]

Monotonic creates an iterator that produces increasing numbers until a specified maximum.

func ParallelForEach added in v0.4.0

func ParallelForEach[T any](
	ctx context.Context,
	iter *fun.Iterator[T],
	fn fun.Processor[T],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) error

ParallelForEach processes the iterator in parallel, and is essentially an iterator-driven worker pool. The input iterator is split dynamically into iterators for every worker (determined by Options.NumWorkers,) with the division between workers determined by their processing speed (e.g. workers should not suffer from head-of-line blocking,) and input iterators are consumed (safely) as work is processed.

func Process added in v0.10.0

func Process[T any](
	ctx context.Context,
	iter *fun.Iterator[T],
	fn fun.Processor[T],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) error

Process provides a (potentially) more sensible alternate name for ParallelForEach, but otherwise is identical.

func RateLimit added in v0.10.9

func RateLimit[T any](iter *fun.Iterator[T], num int, window time.Duration) *fun.Iterator[T]

RateLimit wraps an iterator with a rate-limiter to ensure that the output iterator will produce no more than <num> items in any given <window>. Does not garuntee

func Reduce

func Reduce[T any, O any](
	ctx context.Context,
	iter *fun.Iterator[T],
	reducer func(T, O) (O, error),
	initalValue O,
) (value O, err error)

Reduce processes an input iterator with a reduce function and outputs the final value. The initial value may be a zero or nil value.

func Uniq added in v0.9.4

func Uniq[T comparable](iter *fun.Iterator[T]) *fun.Iterator[T]

Uniq iterates over an iterator of comparable items, and caches them in a map, returning the first instance of each equivalent object, and skipping subsequent items

func Worker added in v0.10.0

func Worker[OP fun.Worker | fun.Operation](
	ctx context.Context,
	iter *fun.Iterator[OP],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) error

Worker takes iterators of fun.Worker or fun.Operation lambdas and processes them in according to the configuration.

All operations functions are processed using their respective Safe() methods, which means that the functions themselves will never panic, and the ContinueOnPanic option will not impact the outcome of the operation (unless the iterator returns a nil operation.)

This operation is particularly powerful in combination with the iterator for a pubsub.Distributor, interfaces which provide synchronized, blocking, and destructive (e.g. so completed workloads do not remain in memory) containers.

Worker is implemented using ParallelForEach.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL