chans

package
v0.0.0-...-a5b82e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package chans provides functions for operating on recv channels in parallel. This is useful when processing a large amount of data with complex transformations. We will provide simple examples, though in reality those would probably be better served with a standard for loop in a single goroutine.

The first function is Access() and uses an Accessor to process data from a channel in parallel. You can utilize a goroutine pool from the goroutines package or just use the default parallelism.

Let's say you want to simply read numbers and print the number multiplied by 5 as they come in from a channel:

input := make(chan int, 1)
go func() {
	defer close(input)
	for i := 0; i < 1000; i++ {
		input <- i
	}
}()

accessor := func(ctx context.Context, i int) error {
	fmt.Println(i * 5)
	return nil
}

err := chans.Access(ctx, input, accessor)
if err != nil {
	// Do something
}

Because I didn't provide any options, this will process until either input is closed or Context is cancelled. If you want this to cancel on an error, you can pass WithStopOnErr().

And we are using defaults here, so it will create a new goroutine pool with runtime.NumCPU() number of goroutines.

If there are multiple errors, they are each wrapped in the final error.

The second function Modify() works in a similar fashion, as it is a fancy wrapper around Access().

Modify() takes in a stream from input and outputs to a stream. The input and output do not have to be the same. And you get the additional benefit of knowing which entries had an error inside StreamResult.

Let's say we simply want to take in a channel of ints and turn them into floats. Here's how we can do that:

input := make(chan int, 1)

go func() {
	defer close(input)
	for i := 0; i < 1000; i++ {
		input <- i
	}
}()

modifier := func(ctx context.Context, i int) (float64, error){
	return float64(i), nil
}

output := make(chan StreamResult[float64]), 1)

var err error

go func() {
	err = chans.Modify(ctx, input, output chan, modifier)
}()

for sr := range output {
	fmt.Println(sr.Value)
}

if err != nil {
	// handle error
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Access

func Access[T any](ctx context.Context, input <-chan T, access Accessor[T], options ...Option) error

Access applies the Accessor" to each element in "input" using the goroutines Pool. If WithPool() isn't provided, we use a limited.Pool using up to runtime.NumCPU(). If the Accessor has an error, the error will be returned but this will not stop processing unless WithStopOnErr() is provided. You can cancel the context to stop processing early.

func Modify

func Modify[T, R any](ctx context.Context, input <-chan T, output chan<- StreamResult[R], mod Modifier[T, R], options ...Option) error

Modify applies the Modifier to each element in "input" using the goroutines Pool in parallel. If WithPool() isn't provided, we use a limited.Pool using up to runtime.NumCPU(). If the Modifier has an error, the error will be returned but this will not stop processing unless WithStopOnErr() is provided. The output channel will be closed when processing is complete. You can cancel the context to stop processing early.

func WithPoolOptions

func WithPoolOptions(pool goroutines.Pool, options ...goroutines.SubmitOption) interface {
	Option
	calloptions.CallOption
}

WithPool sets a goroutines.Pool and its submit options used in a function call. This can be used as a: - Option

func WithStopOnErr

func WithStopOnErr() interface {
	Option
	calloptions.CallOption
}

WithStopOnErr causes the operation to stop if an error occurs. Since operations are parallel, this may not stop all operations. This can be used as a: - Option

Types

type Accessor

type Accessor[V any] func(context.Context, V) error

Accessor is called for all values in a channel.

type Modifier

type Modifier[T, R any] func(context.Context, T) (R, error)

type Option

type Option interface {
	// contains filtered or unexported methods
}

ChanOption is an option for Chan().

type StreamResult

type StreamResult[T any] struct {
	// Value is the value returned in the stream.
	Value T
	// Err is the error returned in the stream.
	Err error
}

StreamResult is a result from a Stream operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL