worker

package
v0.0.0-...-d6c7d84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: MIT Imports: 13 Imported by: 1

Documentation

Overview

Package worker provides a Pool of goroutines where you can submit Jobs to be run by an exisiting goroutine instead of spinning off a new goroutine. This pool provides goroutine reuse in a non-blocking way. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that cause a queue block will cause a new goroutine to be created. These extra goroutines will continue to process jobs until they are idle for a certain amount of time. At that point they will be collected. This prevents work from being blocked by a queue that is full.

This pool can create other syncronization primitives such as a Limited pool that allows reuse while limiting the number of concurrent goroutines. You can also create a Group object that will allow safe execution of a number of goroutines and then wait for them to finish.

The Pool will also provide metrics on the number of goroutines that have been created and are running.

The Pool is NOT for background processing. It is for processing that needs to be done in the foreground but you don't want to spin off a new goroutine for each job. For background processing, you should use the base/concurrency/background package.

Example of creating and using a Pool:

ctx := context.Background() // Use base/context package to create a context.

// Create a new Pool.
p, err := worker.New(ctx, "myPool")
if err != nil {
	panic(err)
}
// Wait for the pool to finish and stop all goroutines. Because we didn't set a deadline
// this will wait up to 30 seconds. See Close() for details.
defer p.Close(ctx)

// Submit a job to the pool.
// This will be run by an existing goroutine in the pool.
// If there are no goroutines available, a new one will be created.
// If the context is canceled before the job is run, the job will not be run.
// If the context is canceled after the job is run, it is the responsibility of the job to check the context
// and return if it is canceled.
err = p.Submit(
	ctx,
	func() { fmt.Println("Hello, world!") },
})

Generally you don't wait for the pool to finish. You can just let it run and submit jobs to it. You can call close, but it is not necessary. If you need to wait for a specific group of goroutines to finish, you can use the .Group() method.

Example of using the pool for a WaitGroup effect:

g := p.Group()

// Spin off a goroutine that will run a job.
g.Go(
	ctx,
	func(ctx context.Context) error {
	 	// The passed context can be used to check for cancellation and
		// to record information in the OTEL span.
		if err := context.Cause(ctx); err != nil {
			return err
		}
		fmt.Println("Hello, world!")
		return nil
	},
)

// Wait for all goroutines to finish. If ctx is canceled, it will return immediately with an error
// (which we aren't capturing as we aren't cancelling).
if err := g.Wait(ctx); err != nil {
	// Do something
}

If you need to limit the number of concurrent goroutines that can run for something, you can create a Limited pool from the Pool.

Example of creating and using a Limited pool:

// Create a Limited pool from the Pool.
// This will limit the number of concurrent goroutines to 10.
l, err := p.Limited(10)
if err != nil {
	panic(err)
}

l.Submit(
	ctx,
	func() { fmt.Println("Hello, world!") },
)

l.Wait() // Again, generally we don't wait for the pool to finish.

You can also use the Limited pool with a WaitGroup effect:

g := l.WaitGroup()

g.Go(
	ctx,
	func() error {
		fmt.Println("Hello, world!")
		return nil
	},
)

if err := g.Wait(ctx); err != nil {
	// Do something
}

This package also offers a Promise object for submitting a job and getting the result back. This is useful for when you want to run a job, do some other work, and then get the result back.

p := NewPromise[string]()

g.Go(
	ctx,
	func() error {
		p.Set(ctx, "Hello, world!", nil)
		return nil
	},
)

fmt.Println(p.Get().V)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Set

func Set(p *Pool)

Set sets the default pool to the given pool. This can be used to override the default pool. However, this is usually used only internally. If using init.Service(), use the appropriate call option to set the default pool.

func WithRunnerTimeout

func WithRunnerTimeout(timeout time.Duration) func(poolOpts) (poolOpts, error)

WithRunnerTimeout sets the time a goroutine runner that is not always available will wait for a job before timing out. If a timeout occurs, the goroutine will be collected. If <= 0, there is no timeout which means that any new goroutine created will always be available and never be collected. That is usually not a good idea that can cause memory leaks via goroutine leaks. Default is 1 second.

func WithSize

func WithSize(size int) func(poolOpts) (poolOpts, error)

WithSize sets the amount of goroutines that are always available. By default this is set to the number of CPUs on the machine. Any submissions that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse.For spikey workloads, the defaults should be sufficient. For constant high loads, you may want to increase this number. Remember that increased number of goroutines over the number of CPUs will cause context switching and slow down processing if doing data intensive work that doesn't require immediate responses.

Types

type Limited

type Limited struct {
	// contains filtered or unexported fields
}

Limited is a worker pool that limits the number of concurrent jobs that can be run. This can be created from a Pool using the Limited() method.

func (*Limited) Group

func (l *Limited) Group() bSync.Group

Group returns a Group that can be used to spin off goroutines and then wait for them to finish. This will use the Limited pool to limit the number of concurrent goroutines. Safer than a sync.WaitGroup.

func (*Limited) Submit

func (l *Limited) Submit(ctx context.Context, f func()) error

Submit submits function f to be run. Context can be cancelled before submit, however if the function is already submitted it is the responsibility of the function to honor/not honor cancellation.

func (*Limited) Wait

func (l *Limited) Wait()

Wait will wait for all goroutines in the pool to finish.

type Option

type Option func(poolOpts) (poolOpts, error)

Option is an option for New().

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool provides a worker pool that can be used to submit functions to be run by a goroutine. This provides goroutine reuse in a non-blocking way. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse. You can create other syncronization primitives such as a Limited pool that allows reuse while limiting the number of concurrent goroutines. You can also create a Group object that will allow safe execution of a number of goroutines and then wait for them to finish. Generally you only need to use a single Pool for an entire application. If using the base/context package, there is a Pool tied to the context that can be used.

func Default

func Default() *Pool

Default returns the default pool. If it has not been set, it will be created.

func New

func New(ctx context.Context, name string, options ...Option) (*Pool, error)

New creates a new worker pool. The name is used for logging and metrics. The pool will have an always available number of goroutines equal to the number of CPUs on the machine. Any Submit() calls that exceed this number will cause a new goroutine to be created and stored in a sync.Pool for reuse. The context should have the meter provider via our context package to allow for metrics to be emitted.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close waits for all submitted jobs to stop, then stops all goroutines. Once called the pool is no longer usable. Close will wait until the passed Context deadline for everything to stop. If the deadline is not set, this has a maximum wait time of 30 * time.Second. If the pool is not closed by then, it will return. If you need to wait for all jobs to finish no matter how long it takes, use Wait() then call Close(). However, this can lead to a deadlock if you are waiting for a job that never finishes. If ctx is cancelled, Close will return immediately with the results of context.Cause(ctx).

func (*Pool) GoRoutines

func (p *Pool) GoRoutines() int

GoRoutines returns the number of goroutines that are currently in the pool.

func (*Pool) Group

func (p *Pool) Group() bSync.Group

Group returns a sync.Group that can be used to spin off goroutines and then wait for them to finish. This will use the Pool. Safer than a sync.Group.

func (*Pool) Len

func (p *Pool) Len() int

Len returns the current size of the pool.

func (*Pool) Limited

func (p *Pool) Limited(size int) (*Limited, error)

Limited creates a Limited pool from the Pool. "size" is the number of goroutines that can execute concurrently.

func (*Pool) Running

func (p *Pool) Running() int

Running returns the number of running jobs in the pool.

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, f func()) error

Submit submits the function to be executed. If the context is canceled before the function is executed, the function will not be executed. Once the function is executed, it is the responsibility of the function to check the context and return if it is canceled.

func (*Pool) Wait

func (p *Pool) Wait()

Wait will wait for all goroutines in the pool to finish execution. The pool's goroutines will continue to run and be available for reuse. Rarely used. Generally you should use .Group() to wait for a specific group of goroutines to finish.

type Promise

type Promise[I, O any] = promises.Promise[I, O]

Promise is a promise that can be used to return a value from a goroutine. It is a simple wrapper around a channel that can be used to return a value from a goroutine. This is designed to be used with the base/concurrency/sync.Pool type. It will automatically call Reset() when put back in the pool and should save memory by not allocation a new Promise or channel.

func NewPromise

func NewPromise[I, O any](in I, options ...PromiseOption) Promise[I, O]

NewPromise creates a new Promise.

type PromiseOption

type PromiseOption = promises.Option

PromiseOption is an option for NewPromise.

func WithPool

func WithPool(p any) PromiseOption

WithPool sets a *sync.Pool[chan Response[T]] that is used to recycle the channel in the promise after it has had Get() called on it. If this is not the correct type, it will panic.

type Response

type Response[T any] = promises.Response[T]

Response is a response to some type of call that contains the value and an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL