Documentation ¶
Overview ¶
Package workerpool defines abstractions for parallelizing tasks.
Example (HTTP) ¶
package main import ( "context" "fmt" "io" "net/http" workerpool "github.com/abcxyz/pkg/workerpool" ) func main() { ctx := context.TODO() w := workerpool.New[string](&workerpool.Config{ Concurrency: 0, }) urls := []string{ "https://apple.com", "https://example.com", "https://google.com", } for _, u := range urls { // Make a local copy for the closure. u := u if err := w.Do(ctx, func() (string, error) { resp, err := http.Get(u) if err != nil { return "", err } defer resp.Body.Close() b, err := io.ReadAll(resp.Body) if err != nil { return "", err } return string(b), nil }); err != nil { // TODO: check err } } results, err := w.Done(ctx) if err != nil { // TODO: check err } for i, result := range results { fmt.Printf("%s: body(%d), err(%v)\n", urls[i], len(result.Value), result.Error) } }
Output:
Example (Sleep) ¶
package main import ( "context" "time" workerpool "github.com/abcxyz/pkg/workerpool" ) func main() { ctx := context.TODO() pool := workerpool.New[*workerpool.Void](&workerpool.Config{ Concurrency: 3, }) for i := 0; i < 5; i++ { if err := pool.Do(ctx, func() (*workerpool.Void, error) { time.Sleep(10 * time.Millisecond) return nil, nil }); err != nil { // TODO: check err } } results, err := pool.Done(ctx) if err != nil { // TODO: check err } _ = results }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrStopped = fmt.Errorf("worker is stopped")
ErrStopped is the error returned when the worker is stopped.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Concurrency is the maximum number of jobs to run in parallel. Concurrency int64 // StopOnError instructs the worker pool to stop processing new work after the // first error is returned. In-flight jobs may still be processed, even if // they complete after the first error is returned. StopOnError bool }
Config represents the input configuration to the worker.
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool represents an instance of a worker pool. It is same for concurrent use, but see function documentation for more specific semantics.
func New ¶
New creates a new worker pool that executes work in parallel, up to the maximum provided concurrency. Work is guaranteed to be executed in the order in which it was enqueued, but is not guaranteed to complete in the order in which it was enqueued (i.e. this is not a pipeline).
If the provided concurrency is less than 1, it defaults to the number of CPU cores.
func (*Pool[T]) Do ¶
Do adds new work into the queue. If there are no available workers in the pool, it blocks until a worker becomes available or until the provided context is cancelled. The function returns when the work has been successfully scheduled.
To wait for all work to be completed and read the results, call Pool.Done. This function only returns an error on two conditions:
- The worker pool was stopped via a call to Pool.Done. You should not enqueue more work. The error will be ErrStopped.
- The incoming context was cancelled. You should probably not enqueue more work, but this is an application-specific decision. The error will be context.DeadlineExceeded or context.Canceled.
Never call Do from within a Do function because it will deadlock.
func (*Pool[T]) Done ¶
Done immediately stops the worker pool and prevents new work from being enqueued. Then it waits for all existing work to finish and results the results.
The results are returned in the order in which jobs were enqueued into the worker pool. Each result will include a result value or corresponding error type.
The function will return an error if:
- The incoming context is cancelled. The error will be context.DeadlineExceeded or context.Canceled.
- Any of the worker jobs returned a non-nil error. The error will be a multi-error errors.Unwrap.
If the worker pool is already done, it returns ErrStopped.