Documentation ¶
Overview ¶
Package forkjoin provides an API for "doing work concurrently (fork) and then waiting for the results (join)". It is similar to errgroups, except that each "group" doesn't only return an error, but also a result.
This package was copied from Obol's Charon repo. See https://github.com/ObolNetwork/charon/blob/main/app/forkjoin/forkjoin.go.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New[I, O any](rootCtx context.Context, work Work[I, O], opts ...Option) (Fork[I], Join[I, O], context.CancelFunc)
New returns fork, join, and cancel functions with generic input type I and output type O. It provides an API for "doing work concurrently (fork) and then waiting for the results (join)".
It fails fast by default, stopping execution on any error. All active work function contexts are canceled and no further inputs are executed with remaining result errors being set to context canceled. See WithoutFailFast.
Usage:
var workFunc := func(ctx context.Context, input MyInput) (MyResult, error) { ... do work return result, nil } fork, join, cancel := forkjoin.New[MyInput,MyResult](ctx, workFunc) defer cancel() // Release any remaining resources. for _, in := range inputs { fork(in) // Note that calling fork AFTER join panics! } resultChan := join() // Either read results from the channel as they appear for result := range resultChan { ... } // Or block until all results are complete and flatten results, firstErr := resultChan.Flatten()
Types ¶
type Fork ¶
type Fork[I any] func(I)
Fork function enqueues the input to be processed asynchronously. Note Fork may block temporarily while the input buffer is full, see WithInputBuffer. Note Fork will panic if called after Join.
type Join ¶
Join function closes the input queue and returns the result channel. Note Fork will panic if called after Join. Note Join must only be called once, otherwise panics.
type Option ¶
type Option func(*options)
func WithInputBuffer ¶
WithInputBuffer returns an option configuring a forkjoin with an input buffer of length i overriding the default of 100. Useful to prevent temporary blocking during calls to Fork if enqueuing more than 100 inputs.
func WithWaitOnCancel ¶
func WithWaitOnCancel() Option
WithWaitOnCancel returns an option configuring a forkjoin to wait for all workers to return when canceling. The default behavior just cancels the worker context and closes the output channel without waiting for the workers to return.
func WithWorkers ¶
WithWorkers returns an option configuring a forkjoin with w number of workers.
func WithoutFailFast ¶
func WithoutFailFast() Option
WithoutFailFast returns an option configuring a forkjoin to not stop execution on any error.
type Results ¶
Results contains enqueued results.
func NewWithInputs ¶
func NewWithInputs[I, O any](ctx context.Context, work Work[I, O], inputs []I, opts ...Option, ) (Results[I, O], context.CancelFunc)
NewWithInputs is a convenience function that calls New and then forks all the inputs returning the join result and a cancel function.