parallel

package
v0.0.0-...-d3172bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 5 Imported by: 145

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FanOutIn

func FanOutIn(gen func(chan<- func() error)) error

FanOutIn is useful to quickly parallelize a group of tasks.

You pass it a function which is expected to push simple `func() error` closures into the provided chan. Each function will be executed in parallel and their error results will be collated.

The function blocks until all functions are executed, and an errors.MultiError is returned if one or more of your fan-out tasks failed, otherwise this function returns nil.

This function is equivalent to WorkPool(0, gen).

func Ignore

func Ignore(ch <-chan error)

Ignore can be used to consume the channel from Run. It blocks on all errors in the channel and discards them.

func Must

func Must(ch <-chan error)

Must can be used to consume the channel from Run. It asserts that none of the functions run return an error. If one returns non-nil, this will panic with the first error encountered (which may cause the channel to remain open and unprocessed, blocking other tasks).

func Run

func Run(workers int, gen func(chan<- func() error)) <-chan error

Run executes a generator function, dispatching each generated task to the Runner. Run returns immediately with an error channel that can be used to reap the results of those tasks.

The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).

Note that there is no association between error channel's error order and the generated task order. However, the channel will return exactly one error result for each generated task.

If workers is <= 0, it will be unbounded; otherwise, a pool of at most workers sustained goroutines will be used to execute the task.

func RunMulti

func RunMulti(ctx context.Context, workers int, fn func(MultiRunner) error) error

RunMulti initiates a nested RunMulti operation. It invokes an entry function, passing it a MultiRunner instance bound to the supplied constraints. Any nested parallel operations scheduled through that MultiRunner will not starve each other regardless of execution order.

This is useful when sharing the same outer Runner constraints with multiple tiers of parallel operations. A naive approach would be to re-use a Runner's WorkC() or Run() functions, but this can result in deadlock if the outer functions consume all available resources running their inner payloads, forcing their inner payloads to block forever.

The supplied Context will be monitored for cancellation. If the Context is canceled, new work dispatch will be inhibited. Any methods added to the work channel will not be executed, and RunMulti will treat them as if they ran and immediately returned the Context's Err() value.

func WorkPool

func WorkPool(workers int, gen func(chan<- func() error)) error

WorkPool creates a fixed-size pool of worker goroutines. A supplied generator method creates task functions and passes them through to the work pool.

WorkPool will use at most workers goroutines to execute the supplied tasks. If workers is <= 0, WorkPool will be unbounded and behave like FanOutIn.

WorkPool blocks until all the generator completes and all workers have finished their tasks.

Example
val := int32(0)
err := WorkPool(16, func(workC chan<- func() error) {
	for i := 0; i < 256; i++ {
		workC <- func() error {
			atomic.AddInt32(&val, 1)
			return nil
		}
	}
})

if err != nil {
	fmt.Printf("Unexpected error: %s", err.Error())
}

fmt.Printf("got: %d", val)
Output:

got: 256

Types

type Buffer

type Buffer struct {
	Runner
	// contains filtered or unexported fields
}

A Buffer embeds a Runner, overriding its RunOne method to buffer tasks indefinitely without blocking.

func (*Buffer) Close

func (b *Buffer) Close()

Close flushes the remaining tasks in the Buffer and Closes the underlying Runner.

Adding new tasks to the Buffer after Close has been invoked will cause a panic.

func (*Buffer) Run

func (b *Buffer) Run(gen func(chan<- func() error)) <-chan error

Run implements the same semantics as Runner's Run. However, if the dispatch pipeline is full, Run will buffer the work and return immediately rather than block.

func (*Buffer) RunOne

func (b *Buffer) RunOne(f func() error) <-chan error

RunOne implements the same semantics as Runner's RunOne. However, if the dispatch pipeline is full, RunOne will buffer the work and return immediately rather than block.

func (*Buffer) SetFIFO

func (b *Buffer) SetFIFO(fifo bool)

SetFIFO sets the Buffer's task dispatch order to FIFO (true) or LIFO (false). This determines the order in which buffered tasks will be dispatched. In FIFO (first in, first out) mode, the first tasks to be buffered will be dispatchd first. In LIFO (last in, last out) mode, the last tasks to be buffered will be dispatched first.

func (*Buffer) WorkC

func (b *Buffer) WorkC() chan<- WorkItem

WorkC implements the same semantics as Runner's WorkC. However, this channel will not block pending work dispatch. Any tasks written to this channel that would block are instead buffered pending dispatch availability.

type MultiRunner

type MultiRunner interface {
	// RunMulti runs the supplied generator, returning an errors.MultiError with
	// the task results.
	//
	// Since it blocks on result, RunMulti is safe to chain with other RunMulti
	// operations without risk of deadlock, as the caller's blocking counts as one
	// of the run tokens.
	//
	// Note that there is no association between the MultiError's error order and
	// the generated task order.
	RunMulti(func(chan<- func() error)) error
}

MultiRunner can execute nested RunMulti against the same outer Runner.

type Runner

type Runner struct {
	// Sustained is the number of sustained goroutines to use in this Runner.
	// Sustained goroutines are spawned on demand, but continue running to
	// dispatch future work until the Runner is closed.
	//
	// If Sustained is <= 0, no sustained goroutines will be executed.
	//
	// This value will be ignored after the first task has been dispatched.
	Sustained int

	// Maximum is the maximum number of goroutines to spawn at any given time.
	//
	// If Maximum is <= 0, no maximum will be enforced.
	//
	// This value will be ignored after the first task has been dispatched.
	Maximum int
	// contains filtered or unexported fields
}

Runner manages parallel function dispatch.

The zero value of a Runner accepts an unbounded number of tasks and maintains no sustained goroutines.

Once started, a Runner must not be copied.

Once a task has been dispatched to Runner, it will continue accepting tasks and consuming resources (namely, its dispatch goroutine) until its Close method is called.

func (*Runner) Close

func (r *Runner) Close()

Close will instruct the Runner to not accept any more jobs and block until all current work is finished.

Close may only be called once; additional calls will panic.

The Runner's dispatch methods will panic if new work is dispatched after Close has been called.

func (*Runner) Run

func (r *Runner) Run(gen func(chan<- func() error)) <-chan error

Run executes a generator function, dispatching each generated task to the Runner. Run returns immediately with an error channel that can be used to reap the results of those tasks.

The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).

Note that there is no association between error channel's error order and the generated task order. However, the channel will return exactly one error result for each generated task.

If the Runner has been closed, this will panic with a reference to the closed dispatch channel.

func (*Runner) RunOne

func (r *Runner) RunOne(f func() error) <-chan error

RunOne executes a single task in the Runner, returning with a channel that can be used to reap the result of that task.

The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).

If the Runner has been closed, this will panic with a reference to the closed dispatch channel.

func (*Runner) WorkC

func (r *Runner) WorkC() chan<- WorkItem

WorkC returns a channel which WorkItem can be directly written to.

type Semaphore

type Semaphore chan SemaphoreToken

Semaphore is a sync.Locker that implements a n-semaphore.

Lock the semaphore acquires a semaphore token, possibly blocking until one is available.

Unlock releases an owned token, returning it to the semaphore.

For semaphore s, len(s) is the current number of acquired resources, and cap(s) is the total resource size of the semaphore.

Example

ExampleSemaphore demonstrates Semaphore's usage by processing 20 units of data in parallel. It protects the processing with a semaphore Locker to ensure that at most 5 units are processed at any given time.

sem := make(Semaphore, 5)

done := make([]int, 20)
wg := sync.WaitGroup{}
for i := 0; i < len(done); i++ {
	i := i

	wg.Add(1)
	sem.Lock()
	go func() {
		defer wg.Done()
		defer sem.Unlock()
		done[i] = i
	}()
}

wg.Wait()
sort.Ints(done)
fmt.Println("Got:", done)
Output:

Got: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19]

func (Semaphore) Lock

func (s Semaphore) Lock()

Lock acquires a semaphore resource, blocking until one is available.

func (Semaphore) TakeAll

func (s Semaphore) TakeAll()

TakeAll blocks until it holds all available semaphore resources. When it returns, the caller owns all of the resources in the semaphore.

func (Semaphore) Unlock

func (s Semaphore) Unlock()

Unlock releases a single semaphore resource.

type SemaphoreToken

type SemaphoreToken struct{}

SemaphoreToken is a semaphore token.

type WorkItem

type WorkItem struct {
	// F is the work function to execute. This must be non-nil.
	F func() error
	// ErrC is the channel that will receive F's result. If nil or F panics, no
	// error will be sent.
	ErrC chan<- error

	// After, if not nil, is a callback method that will be invoked after the
	// result of F has been passed to ErrC.
	//
	// After is called by the same worker goroutine as F, so it will similarly
	// consume one worker during its execution.
	//
	// If F panics, After will still be called, and can be used to recover from
	// the panic.
	After func()
}

WorkItem is a single item of work that a Runner will execute. The supplied function, F, will be executed by a Runner goroutine and the result will be written to ErrC.

An optional callback method, After, may be supplied to operate in response to work completion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL