forkjoin

package
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: GPL-3.0 Imports: 3 Imported by: 0

Documentation

Overview

Package forkjoin provides an API for "doing work concurrently (fork) and then waiting for the results (join)".

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New[I, O any](rootCtx context.Context, work Work[I, O], opts ...Option) (Fork[I], Join[I, O], context.CancelFunc)

New returns fork, join, and cancel functions with generic input type I and output type O. It provides an API for "doing work concurrently (fork) and then waiting for the results (join)".

It fails fast by default, stopping execution on any error. All active work function contexts are cancelled and no further inputs are executed with remaining result errors being set to context cancelled. See WithoutFailFast.

Usage:

var workFunc := func(ctx context.Context, input MyInput) (MyResult, error) {
  ... do work
  return result, nil
}

fork, join, cancel := forkjoin.New[MyInput,MyResult](ctx, workFunc)
defer cancel() // Release any remaining resources.

for _, in := range inputs {
  fork(in) // Note that calling fork AFTER join panics!
}

resultChan := join()

// Either read results from the channel as they appear
for result := range resultChan { ... }

// Or block until all results are complete and flatten
results, firstErr := resultChan.Flatten()

Types

type Fork

type Fork[I any] func(I)

Fork function enqueues the input to be processed asynchronously. Note Fork may block temporarily while the input buffer is full, see WithInputBuffer. Note Fork will panic if called after Join.

type Join

type Join[I, O any] func() Results[I, O]

Join function closes the input queue and returns the result channel. Note Fork will panic if called after Join. Note Join must only be called once, otherwise panics.

type Option

type Option func(*options)

func WithInputBuffer

func WithInputBuffer(i int) Option

WithInputBuffer returns an option configuring a forkjoin with an input buffer of length i overriding the default of 100. Useful to prevent temporary blocking during calls to Fork if enqueuing more than 100 inputs.

func WithWaitOnCancel

func WithWaitOnCancel() Option

WithWaitOnCancel returns an option configuring a forkjoin to wait for all workers to return when canceling. The default behaviour just cancels the worker context and closes the output channel without waiting for the workers to return.

func WithWorkers

func WithWorkers(w int) Option

WithWorkers returns an option configuring a forkjoin with w number of workers.

func WithoutFailFast

func WithoutFailFast() Option

WithoutFailFast returns an option configuring a forkjoin to not stop execution on any error.

type Result

type Result[I, O any] struct {
	Input  I
	Output O
	Err    error
}

Result contains the input and resulting output from the work function.

type Results

type Results[I, O any] <-chan Result[I, O]

Results contains enqueued results.

func (Results[I, O]) Flatten

func (r Results[I, O]) Flatten() ([]O, error)

Flatten blocks and returns all the outputs when all completed and the first "real error".

A real error is the error that triggered the fail fast, all subsequent results will contain context cancelled errors.

type Work

type Work[I, O any] func(ctx context.Context, input I) (output O, err error)

Work defines the work function signature workers will call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL