ustream

package
v1.24.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Collector

type Collector[T any] interface {
	Collect() []T
	CollectToMap(func(*T) (any, any)) map[any]any
}

Collector defines the interface for collecting elements from a stream.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

func Of added in v1.12.0

func Of[T any](values []T) *Stream[T]

Of creates a new Stream from the given slice.

func (*Stream[T]) Collect

func (s *Stream[T]) Collect() []T

func (*Stream[T]) CollectToMap

func (s *Stream[T]) CollectToMap(mapper func(*T) (any, any)) map[any]any

CollectToMap uses the uarray.ToMultiMap function to collect elements of the Stream into a map.

func (*Stream[T]) Filter

func (s *Stream[T]) Filter(predicate func(*T) bool) *Stream[T]

Filter wraps the uarray.Filter function in the stream API.

func (*Stream[T]) FilterOut

func (s *Stream[T]) FilterOut(predicate func(*T) bool) *Stream[T]

FilterOut wraps the uarray.FilterOut function in the stream API.

func (*Stream[T]) Map

func (s *Stream[T]) Map(mapper func(*T) any) *TerminalStream[any]

Map wraps the uarray.Map function in the stream API. This operation can only be the last in a pipeline.

func (*Stream[T]) ToTerminal

func (s *Stream[T]) ToTerminal() *TerminalStream[T]

type TerminalStream

type TerminalStream[T any] struct {
	// contains filtered or unexported fields
}

TerminalStream represents a stream that can only be collected.

func NewTerminalStream

func NewTerminalStream[T any](values []T) *TerminalStream[T]

func (*TerminalStream[T]) Collect

func (s *TerminalStream[T]) Collect() []T

func (*TerminalStream[T]) CollectToMap

func (s *TerminalStream[T]) CollectToMap(mapper func(*T) (any, T)) map[any][]T

CollectToMap uses the uarray.ToMultiMap function to collect elements of the Stream into a map.

func (*TerminalStream[T]) ParallelExecute

func (s *TerminalStream[T]) ParallelExecute(fn func(int, *T), parallelism int)

ParallelExecute executes the given function concurrently on each element of the stream's values using the specified level of parallelism.

func (*TerminalStream[T]) ParallelExecuteWithTimeout added in v1.11.0

func (s *TerminalStream[T]) ParallelExecuteWithTimeout(fn func(int, *T), cancel func(int, *T), timeout time.Duration, parallelism int)

ParallelExecuteWithTimeout executes a batch of tasks in parallel with a specified level of parallelism and timeout. This method divides the work into batches according to the parallelism parameter and executes each task asynchronously. Each task will be cancelled if it does not complete within the specified timeout duration.

Parameters: - fn: The function to execute for each item in the TerminalStream. This function receives an index and a pointer to the item. - cancel: The cancel function to call for each item if it exceeds the timeout. This function receives an index and a pointer to the item. - timeout: The maximum duration to wait for each task to complete before cancelling it. If a task exceeds this duration, it is considered failed. - parallelism: The maximum number of tasks to execute concurrently. This controls the level of parallelism and helps manage resource utilization.

Panics: This method panics if the parallelism parameter is less than or equal to zero, as it indicates an invalid configuration.

Usage Example: Assuming a TerminalStream of some data type, you can process each item in parallel, with a specific timeout and level of parallelism:

s := NewTerminalStream[MyType](...your data...)
s.ParallelExecuteWithTimeout(func(i int, item *MyType) {
    // Process item
}, func(i int, item *MyType) {
    // Cancel item processing
}, 5*time.Second, 10) // Timeout of 5 seconds, with a parallelism of 10

Note: The actual processing function (fn) does not return a value. If you need to collect results or errors from each task, you might need to use a different approach or modify the method accordingly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL