parexec

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Continue async execution (zero-value).
	CmdProceed = Command(iota)
	// Try to stop as soon as possible (start canceling new tasks and stop scheduling alreday submitted).
	CmdStop
)

Variables

View Source
var CPULimiter = NewLimiter(2 * runtime.NumCPU())

CPULimiter is a shared limiter for all CPU-bound tasks.

View Source
var DiskIOLimiter = NewLimiter(4)

DiskIOLimiter is a shared limiter for all Disk IO tasks.

View Source
var NoLimit = (*Limiter)(nil)

NoLimit doesn't limit the degree of parallel execution.

Functions

func GoWithArg

func GoWithArg[I, T any](pe *Executor[T], fn func(I) T) func(I)

Transforms fn (func from I to T) into func(I) that executes in Executor and returns the result T into executor's "processor" function.

func GoWithArgs

func GoWithArgs[I1, I2, T any](pe *Executor[T], fn func(I1, I2) T) func(I1, I2)

Transforms fn (func from (I1, I2) to T) into func(I1, I2) that executes in Executor and returns the result T into executor's "processor" function.

func Map

func Map[I, T any](pe *Executor[T], input []I, fn func(I) T)

Applies func fn to the copies of the elements of slice in Executor and returns the results T into executor's "processor" function. Note: when the results of the execution are collected by the processor func of the executor indexes of slice items would be contigous and ordered.

func MapChan

func MapChan[I, T any](pe *Executor[T], inputC <-chan I, fn func(I) T) (consumedFully bool)

Applies func fn to the channel values in Executor and returns the results T into executor's "processor" function. Function returns when the `input` channel is closed or Executor is stopped Note: when the results of the execution are collected by the processor func of the executor indexes of slice items would be ordered, but __not__ contigous.

func MapRef

func MapRef[I, T any](pe *Executor[T], slice []I, fn func(*I) T)

Applies func fn to the references of the elements of slice in Executor and returns the results T into executor's "processor" function. Note: when the results of the execution are collected by the processor func of the executor indexes of slice items would be contigous and ordered Safety: the array must not be modified until the executor is done!

func SetAt

func SetAt[T any](slice []T, idx int, val T) []T

Sets slice[idx] = val, growing the slice if needed, and returns the updated slice.

Types

type Command

type Command uint8

Controls the scheduler. Values: CmdProceed (zero-value) and CmdStop.

type Executor

type Executor[T any] struct {
	// contains filtered or unexported fields
}

Parallel executor combined with a sync.Locker for results.

func New

func New[T any](limiter *Limiter, processor func(res T, idx int) Command) *Executor[T]

Create a new parallel executor

'processor' func is called synchronously (under lock) with result of execution and idx – a monotonically increasing from 0 number, reflecting the order in which the tasks were scheduled

ParExecutor is also a mutex around data captured by the "processor" closure as soon as it's created. To be safe, use WaitDoneAndLock() to access this data.

func (*Executor[T]) Go

func (pe *Executor[T]) Go(f func() T)

Execute function f in parallel executor, returns the result into executor's "processor" function.

func (*Executor[T]) Lock

func (pe *Executor[T]) Lock()

Acquires exclusive access to the state captured by the `processor` func of the executor. Does _not_ prevent tasks from being scheduled or run, just prevents calls to the processor function. Done goroutines would pile up and wait on the lock, so you shouldn't hold this value for long.

func (*Executor[T]) Unlock

func (pe *Executor[T]) Unlock()

Releases the exclusive access to the state captured by the `processor` func of the executor. Tasks are able to return their results to the processor function again.

func (*Executor[T]) UnlockResume

func (pe *Executor[T]) UnlockResume()

Releases the exclusive access to the state captured by the `processor` func of the executor. Tasks are able to return their results to the processor function again. Addidionally resets the stopped status of the executor, making it possible to shedule and run new tasks.

func (*Executor[T]) WaitDoneAndLock

func (pe *Executor[T]) WaitDoneAndLock() (wasStopped bool)

Wait unill all scheduled parallel tasks are done (or cancelled), then acquire the lock that guarantees exclusive access to the state captured by the `processor` func of the executor. Returns wheather the executor was stopped (result function returned parexec.CmdStop).

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

func NewLimiter

func NewLimiter(limit int) *Limiter

Limits parallel executions to at most limit simultaneously.

Can be shared between multiple [ParExecutor]s.

func (*Limiter) Return

func (l *Limiter) Return()

Returns a token taken with Take.

func (*Limiter) Take

func (l *Limiter) Take()

Takes a limiter token. Must [Return] it after.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL