Documentation
¶
Overview ¶
Package worker provides a Pool of goroutines where you can submit Jobs to be run by an exisiting goroutine instead of spinning off a new goroutine. This pool provides goroutine reuse in a non-blocking way. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that cause a queue block will cause a new goroutine to be created. These extra goroutines will continue to process jobs until they are idle for a certain amount of time. At that point they will be collected. This prevents work from being blocked by a queue that is full.
This pool can create other syncronization primitives such as a Limited pool that allows reuse while limiting the number of concurrent goroutines. You can also create a Group object that will allow safe execution of a number of goroutines and then wait for them to finish.
The Pool will also provide metrics on the number of goroutines that have been created and are running.
The Pool is NOT for background processing. It is for processing that needs to be done in the foreground but you don't want to spin off a new goroutine for each job. For background processing, you should use the base/concurrency/background package.
Example of creating and using a Pool:
ctx := context.Background() // Use base/context package to create a context. // Create a new Pool. p, err := worker.New(ctx, "myPool") if err != nil { panic(err) } // Wait for the pool to finish and stop all goroutines. Because we didn't set a deadline // this will wait up to 30 seconds. See Close() for details. defer p.Close(ctx) // Submit a job to the pool. // This will be run by an existing goroutine in the pool. // If there are no goroutines available, a new one will be created. // If the context is canceled before the job is run, the job will not be run. // If the context is canceled after the job is run, it is the responsibility of the job to check the context // and return if it is canceled. err = p.Submit( ctx, func() { fmt.Println("Hello, world!") }, })
Generally you don't wait for the pool to finish. You can just let it run and submit jobs to it. You can call close, but it is not necessary. If you need to wait for a specific group of goroutines to finish, you can use the .Group() method.
Example of using the pool for a WaitGroup effect:
g := p.Group() // Spin off a goroutine that will run a job. g.Go( ctx, func(ctx context.Context) error { // The passed context can be used to check for cancellation and // to record information in the OTEL span. if err := context.Cause(ctx); err != nil { return err } fmt.Println("Hello, world!") return nil }, ) // Wait for all goroutines to finish. If ctx is canceled, it will return immediately with an error // (which we aren't capturing as we aren't cancelling). if err := g.Wait(ctx); err != nil { // Do something }
If you need to limit the number of concurrent goroutines that can run for something, you can create a Limited pool from the Pool.
Example of creating and using a Limited pool:
// Create a Limited pool from the Pool. // This will limit the number of concurrent goroutines to 10. l, err := p.Limited(10) if err != nil { panic(err) } l.Submit( ctx, func() { fmt.Println("Hello, world!") }, ) l.Wait() // Again, generally we don't wait for the pool to finish.
You can also use the Limited pool with a WaitGroup effect:
g := l.WaitGroup() g.Go( ctx, func() error { fmt.Println("Hello, world!") return nil }, ) if err := g.Wait(ctx); err != nil { // Do something }
This package also offers a Promise object for submitting a job and getting the result back. This is useful for when you want to run a job, do some other work, and then get the result back.
p := NewPromise[string]() g.Go( ctx, func() error { p.Set(ctx, "Hello, world!", nil) return nil }, ) fmt.Println(p.Get().V)
Index ¶
- func Set(p *Pool)
- func WithRunnerTimeout(timeout time.Duration) func(poolOpts) (poolOpts, error)
- func WithSize(size int) func(poolOpts) (poolOpts, error)
- type Limited
- type Option
- type Pool
- func (p *Pool) Close(ctx context.Context) error
- func (p *Pool) GoRoutines() int
- func (p *Pool) Group() bSync.Group
- func (p *Pool) Len() int
- func (p *Pool) Limited(size int) (*Limited, error)
- func (p *Pool) Running() int
- func (p *Pool) Submit(ctx context.Context, f func()) error
- func (p *Pool) Wait()
- type Promise
- type PromiseOption
- type Response
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Set ¶
func Set(p *Pool)
Set sets the default pool to the given pool. This can be used to override the default pool. However, this is usually used only internally. If using init.Service(), use the appropriate call option to set the default pool.
func WithRunnerTimeout ¶
WithRunnerTimeout sets the time a goroutine runner that is not always available will wait for a job before timing out. If a timeout occurs, the goroutine will be collected. If <= 0, there is no timeout which means that any new goroutine created will always be available and never be collected. That is usually not a good idea that can cause memory leaks via goroutine leaks. Default is 1 second.
func WithSize ¶
WithSize sets the amount of goroutines that are always available. By default this is set to the number of CPUs on the machine. Any submissions that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse.For spikey workloads, the defaults should be sufficient. For constant high loads, you may want to increase this number. Remember that increased number of goroutines over the number of CPUs will cause context switching and slow down processing if doing data intensive work that doesn't require immediate responses.
Types ¶
type Limited ¶
type Limited struct {
// contains filtered or unexported fields
}
Limited is a worker pool that limits the number of concurrent jobs that can be run. This can be created from a Pool using the Limited() method.
func (*Limited) Group ¶
Group returns a Group that can be used to spin off goroutines and then wait for them to finish. This will use the Limited pool to limit the number of concurrent goroutines. Safer than a sync.WaitGroup.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool provides a worker pool that can be used to submit functions to be run by a goroutine. This provides goroutine reuse in a non-blocking way. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse. You can create other syncronization primitives such as a Limited pool that allows reuse while limiting the number of concurrent goroutines. You can also create a Group object that will allow safe execution of a number of goroutines and then wait for them to finish. Generally you only need to use a single Pool for an entire application. If using the base/context package, there is a Pool tied to the context that can be used.
func Default ¶
func Default() *Pool
Default returns the default pool. If it has not been set, it will be created.
func New ¶
New creates a new worker pool. The name is used for logging and metrics. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse. The context should have the meter provider via our context package to allow for metrics to be emitted.
func (*Pool) Close ¶
Close waits for all submitted jobs to stop, then stops all goroutines. Once called the pool is no longer usable. Close will wait until the passed Context deadline for everything to stop. If the deadline is not set, this has a maximum wait time of 30 * time.Second. If the pool is not closed by then, it will return. If you need to wait for all jobs to finish no matter how long it takes, use Wait() then call Close(). However, this can lead to a deadlock if you are waiting for a job that never finishes. If ctx is cancelled, Close will return immediately with the results of context.Cause(ctx).
func (*Pool) GoRoutines ¶
GoRoutines returns the number of goroutines that are currently in the pool.
func (*Pool) Group ¶
Group returns a sync.Group that can be used to spin off goroutines and then wait for them to finish. This will use the Pool. Safer than a sync.Group.
func (*Pool) Limited ¶
Limited creates a Limited pool from the Pool. "size" is the number of goroutines that can execute concurrently.
type Promise ¶
Promise is a promise that can be used to return a value from a goroutine. It is a simple wrapper around a channel that can be used to return a value from a goroutine. This is designed to be used with the base/concurrency/sync.Pool type. It will automatically call Reset() when put back in the pool and should save memory by not allocation a new Promise or channel.
func NewPromise ¶
func NewPromise[I, O any](in I, options ...PromiseOption) Promise[I, O]
NewPromise creates a new Promise.
type PromiseOption ¶
PromiseOption is an option for NewPromise.
func WithPool ¶
func WithPool(p any) PromiseOption
WithPool sets a *sync.Pool[chan Response[T]] that is used to recycle the channel in the promise after it has had Get() called on it. If this is not the correct type, it will panic.