Documentation ¶
Index ¶
- type Collector
- type Stream
- func (s *Stream[T]) Collect() []T
- func (s *Stream[T]) CollectToMap(mapper func(*T) (any, any)) map[any]any
- func (s *Stream[T]) Filter(predicate func(*T) bool) *Stream[T]
- func (s *Stream[T]) FilterOut(predicate func(*T) bool) *Stream[T]
- func (s *Stream[T]) Map(mapper func(*T) any) *TerminalStream[any]
- func (s *Stream[T]) ToTerminal() *TerminalStream[T]
- type TerminalStream
- func (s *TerminalStream[T]) Collect() []T
- func (s *TerminalStream[T]) CollectToMap(mapper func(*T) (any, T)) map[any][]T
- func (s *TerminalStream[T]) ParallelExecute(fn func(int, *T), parallelism int)
- func (s *TerminalStream[T]) ParallelExecuteWithTimeout(fn func(int, *T), cancel func(int, *T), timeout time.Duration, parallelism int)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
func (*Stream[T]) CollectToMap ¶
CollectToMap uses the uarray.ToMultiMap function to collect elements of the Stream into a map.
func (*Stream[T]) Map ¶
func (s *Stream[T]) Map(mapper func(*T) any) *TerminalStream[any]
Map wraps the uarray.Map function in the stream API. This operation can only be the last in a pipeline.
func (*Stream[T]) ToTerminal ¶
func (s *Stream[T]) ToTerminal() *TerminalStream[T]
type TerminalStream ¶
type TerminalStream[T any] struct { // contains filtered or unexported fields }
TerminalStream represents a stream that can only be collected.
func NewTerminalStream ¶
func NewTerminalStream[T any](values []T) *TerminalStream[T]
func (*TerminalStream[T]) Collect ¶
func (s *TerminalStream[T]) Collect() []T
func (*TerminalStream[T]) CollectToMap ¶
func (s *TerminalStream[T]) CollectToMap(mapper func(*T) (any, T)) map[any][]T
CollectToMap uses the uarray.ToMultiMap function to collect elements of the Stream into a map.
func (*TerminalStream[T]) ParallelExecute ¶
func (s *TerminalStream[T]) ParallelExecute(fn func(int, *T), parallelism int)
ParallelExecute executes the given function concurrently on each element of the stream's values using the specified level of parallelism.
func (*TerminalStream[T]) ParallelExecuteWithTimeout ¶ added in v1.11.0
func (s *TerminalStream[T]) ParallelExecuteWithTimeout(fn func(int, *T), cancel func(int, *T), timeout time.Duration, parallelism int)
ParallelExecuteWithTimeout executes a batch of tasks in parallel with a specified level of parallelism and timeout. This method divides the work into batches according to the parallelism parameter and executes each task asynchronously. Each task will be cancelled if it does not complete within the specified timeout duration.
Parameters: - fn: The function to execute for each item in the TerminalStream. This function receives an index and a pointer to the item. - cancel: The cancel function to call for each item if it exceeds the timeout. This function receives an index and a pointer to the item. - timeout: The maximum duration to wait for each task to complete before cancelling it. If a task exceeds this duration, it is considered failed. - parallelism: The maximum number of tasks to execute concurrently. This controls the level of parallelism and helps manage resource utilization.
Panics: This method panics if the parallelism parameter is less than or equal to zero, as it indicates an invalid configuration.
Usage Example: Assuming a TerminalStream of some data type, you can process each item in parallel, with a specific timeout and level of parallelism:
s := NewTerminalStream[MyType](...your data...) s.ParallelExecuteWithTimeout(func(i int, item *MyType) { // Process item }, func(i int, item *MyType) { // Cancel item processing }, 5*time.Second, 10) // Timeout of 5 seconds, with a parallelism of 10
Note: The actual processing function (fn) does not return a value. If you need to collect results or errors from each task, you might need to use a different approach or modify the method accordingly.