Documentation ¶
Index ¶
- func FanOutIn(gen func(chan<- func() error)) error
- func Ignore(ch <-chan error)
- func Must(ch <-chan error)
- func Run(workers int, gen func(chan<- func() error)) <-chan error
- func RunMulti(c context.Context, workers int, fn func(MultiRunner) error) error
- func WorkPool(workers int, gen func(chan<- func() error)) error
- type Buffer
- type MultiRunner
- type Runner
- type Semaphore
- type SemaphoreToken
- type WorkItem
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FanOutIn ¶
FanOutIn is useful to quickly parallelize a group of tasks.
You pass it a function which is expected to push simple `func() error` closures into the provided chan. Each function will be executed in parallel and their error results will be collated.
The function blocks until all functions are executed, and an errors.MultiError is returned if one or more of your fan-out tasks failed, otherwise this function returns nil.
This function is equivalent to WorkPool(0, gen).
func Ignore ¶
func Ignore(ch <-chan error)
Ignore can be used to consume the channel from Run. It blocks on all errors in the channel and discards them.
func Must ¶
func Must(ch <-chan error)
Must can be used to consume the channel from Run. It asserts that none of the functions run return an error. If one returns non-nil, this will panic with the first error encountered (which may cause the channel to remain open and unprocessed, blocking other tasks).
func Run ¶
Run executes a generator function, dispatching each generated task to the Runner. Run returns immediately with an error channel that can be used to reap the results of those tasks.
The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).
Note that there is no association between error channel's error order and the generated task order. However, the channel will return exactly one error result for each generated task.
If workers is <= 0, it will be unbounded; otherwise, a pool of at most workers sustained goroutines will be used to execute the task.
func RunMulti ¶
RunMulti initiates a nested RunMulti operation. It invokes an entry function, passing it a MultiRunner instance bound to the supplied constraints. Any nested parallel operations scheduled through that MultiRunner will not starve each other regardless of execution order.
This is useful when sharing the same outer Runner constraints with multiple tiers of parallel operations. A naive approach would be to re-use a Runner's WorkC() or Run() functions, but this can result in deadlock if the outer functions consume all available resources running their inner payloads, forcing their inner payloads to block forever.
The supplied Context will be monitored for cancellation. If the Context is canceled, new work dispatch will be inhibited. Any methods added to the work channel will not be executed, and RunMulti will treat them as if they ran and immediately returned the Context's Err() value.
func WorkPool ¶
WorkPool creates a fixed-size pool of worker goroutines. A supplied generator method creates task functions and passes them through to the work pool.
WorkPool will use at most workers goroutines to execute the supplied tasks. If workers is <= 0, WorkPool will be unbounded and behave like FanOutIn.
WorkPool blocks until all the generator completes and all workers have finished their tasks.
Example ¶
val := int32(0) err := WorkPool(16, func(workC chan<- func() error) { for i := 0; i < 256; i++ { workC <- func() error { atomic.AddInt32(&val, 1) return nil } } }) if err != nil { fmt.Printf("Unexpected error: %s", err.Error()) } fmt.Printf("got: %d", val)
Output: got: 256
Types ¶
type Buffer ¶
type Buffer struct { Runner // contains filtered or unexported fields }
A Buffer embeds a Runner, overriding its RunOne method to buffer tasks indefinitely without blocking.
func (*Buffer) Close ¶
func (b *Buffer) Close()
Close flushes the remaining tasks in the Buffer and Closes the underlying Runner.
Adding new tasks to the Buffer after Close has been invoked will cause a panic.
func (*Buffer) Run ¶
Run implements the same semantics as Runner's Run. However, if the dispatch pipeline is full, Run will buffer the work and return immediately rather than block.
func (*Buffer) RunOne ¶
RunOne implements the same semantics as Runner's RunOne. However, if the dispatch pipeline is full, RunOne will buffer the work and return immediately rather than block.
func (*Buffer) SetFIFO ¶
SetFIFO sets the Buffer's task dispatch order to FIFO (true) or LIFO (false). This determines the order in which buffered tasks will be dispatched. In FIFO (first in, first out) mode, the first tasks to be buffered will be dispatchd first. In LIFO (last in, last out) mode, the last tasks to be buffered will be dispatched first.
type MultiRunner ¶
type MultiRunner interface { // RunMulti runs the supplied generator, returning an errors.MultiError with // the task results. // // Since it blocks on result, RunMulti is safe to chain with other RunMulti // operations without risk of deadlock, as the caller's blocking counts as one // of the run tokens. // // Note that there is no association between the MultiError's error order and // the generated task order. RunMulti(func(chan<- func() error)) error }
MultiRunner can execute nested RunMulti against the same outer Runner.
type Runner ¶
type Runner struct { // Sustained is the number of sustained goroutines to use in this Runner. // Sustained goroutines are spawned on demand, but continue running to // dispatch future work until the Runner is closed. // // If Sustained is <= 0, no sustained goroutines will be executed. // // This value will be ignored after the first task has been dispatched. Sustained int // Maximum is the maximum number of goroutines to spawn at any given time. // // If Maximum is <= 0, no maximum will be enforced. // // This value will be ignored after the first task has been dispatched. Maximum int // contains filtered or unexported fields }
Runner manages parallel function dispatch.
The zero value of a Runner accepts an unbounded number of tasks and maintains no sustained goroutines.
Once started, a Runner must not be copied.
Once a task has been dispatched to Runner, it will continue accepting tasks and consuming resources (namely, its dispatch goroutine) until its Close method is called.
func (*Runner) Close ¶
func (r *Runner) Close()
Close will instruct the Runner to not accept any more jobs and block until all current work is finished.
Close may only be called once; additional calls will panic.
The Runner's dispatch methods will panic if new work is dispatched after Close has been called.
func (*Runner) Run ¶
Run executes a generator function, dispatching each generated task to the Runner. Run returns immediately with an error channel that can be used to reap the results of those tasks.
The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).
Note that there is no association between error channel's error order and the generated task order. However, the channel will return exactly one error result for each generated task.
If the Runner has been closed, this will panic with a reference to the closed dispatch channel.
func (*Runner) RunOne ¶
RunOne executes a single task in the Runner, returning with a channel that can be used to reap the result of that task.
The returned error channel must be consumed, or it can block additional functions from being run from gen. A common consumption function is errors.MultiErrorFromErrors, which will buffer all non-nil errors into an errors.MultiError. Other functions to consider are Must and Ignore (in this package).
If the Runner has been closed, this will panic with a reference to the closed dispatch channel.
type Semaphore ¶
type Semaphore chan SemaphoreToken
Semaphore is a sync.Locker that implements a n-semaphore.
Lock the semaphore acquires a semaphore token, possibly blocking until one is available.
Unlock releases an owned token, returning it to the semaphore.
For semaphore s, len(s) is the current number of acquired resources, and cap(s) is the total resource size of the semaphore.
Example ¶
ExampleSemaphore demonstrates Semaphore's usage by processing 20 units of data in parallel. It protects the processing with a semaphore Locker to ensure that at most 5 units are processed at any given time.
sem := make(Semaphore, 5) done := make([]int, 20) wg := sync.WaitGroup{} for i := 0; i < len(done); i++ { i := i wg.Add(1) sem.Lock() go func() { defer wg.Done() defer sem.Unlock() done[i] = i }() } wg.Wait() sort.Ints(done) fmt.Println("Got:", done)
Output: Got: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19]
func (Semaphore) Lock ¶
func (s Semaphore) Lock()
Lock acquires a semaphore resource, blocking until one is available.
type WorkItem ¶
type WorkItem struct { // F is the work function to execute. This must be non-nil. F func() error // ErrC is the channel that will receive F's result. If nil or F panics, no // error will be sent. ErrC chan<- error // After, if not nil, is a callback method that will be invoked after the // result of F has been passed to ErrC. // // After is called by the same worker goroutine as F, so it will similarly // consume one worker during its execution. // // If F panics, After will still be called, and can be used to recover from // the panic. After func() }
WorkItem is a single item of work that a Runner will execute. The supplied function, F, will be executed by a Runner goroutine and the result will be written to ErrC.
An optional callback method, After, may be supplied to operate in response to work completion.