goroutines

package
v0.0.0-...-a5b82e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 3 Imported by: 0

README

Pools Goroutines

GoDoc Go Report Card

Introduction

The packages contained here provide basic pooling for goroutines. These pools provide the ability to gather statistics, limit the number of goroutines in use and allow reuse of goroutines to lower allocations.

These pools are designed to avoid any and sync.Mutex to prioritize speed.

There are two pool implementations:

  • limited
  • pooled

limited simply uses a limiter to prevent more than X goroutines from running at any given time. There is still a separate goroutine for each request. In some cases this can be the best choice while costing extra allocs.

pooled has a set of goroutines running that are reused. This cuts down allocations in the long run.

Benchmarks

BenchmarkPooled-10                     6         184724028 ns/op         7140777 B/op     140006 allocs/op
BenchmarkPoolLimited-10                6         184942646 ns/op         8118250 B/op     152738 allocs/op
BenchmarkStandard-10                   1        1395970416 ns/op         6640000 B/op     120000 allocs/op
BenchmarkTunny-10                      1        1453848666 ns/op         6880048 B/op     139774 allocs/op
PASS
ok      github.com/johnsiilver/pools/goroutines/benchmarks      6.475s
Test Description

The benchmarks all use runtime.NumCPUs() as the pool limit. Each goroutine calculates elliptical curve private keys several thousands times.

  • BenchmarkPooled is the pooled package.
  • BenchmarkPoolLimited is the limited package.
  • BenchmarkStandard is simply spinning off goroutines with a limiter and waitgroup.
  • BenchmarkTunny uses the populare github.com/Jeffail/tunny package.

Full Disclosure

Currently, I don't trust this benchmark. BenchmarkStandard doesn't look like I expect it to look. I expected similar results with BenchmarkPoolLimited, but that isn't the case. I've looked at it several times and I'm not seeing why. I haven't gotten down to pprofing it yet. Until I understand it, I am not confident in the speed differences.

I do know that all data comes out as expected, so I do belive the numbers for BenchmarkPooled and BenchmarkPoolLimited.

Documentation

Overview

Package goroutines provides the interfaces and defintions that goroutine pools must implement/use. Implementations are in sub-directories and can be used directly without using this package.

Pools are generally a low-level construct that are passed to higher level constructs that provide safety and more targeted functionality. The prim package is a good example of this. There you will find an enhanced WaitGroup, functions for processing slices in parallel, map processing and more.

If you are looking to create pipelines, the pipeline package provides concurrent and parallel processing that will be more beneficial that trying to use pools directly.

As this is the parent package, we will show some basic examples that are valid across all implementations using the "pooled" sub-package that implements the Pool interface found here.

Example of using a pool where errors don't matter:

ctx := context.Background()
p, err := pooled.New("name", runtime.NumCPU())
if err != nil {
	panic(err)
}
defer p.Close()

for i := 0; i < 100; i++ {
	i := i

	p.Submit(
		ctx,
		func(ctx context.Context) {
			fmt.Println("Hello number ", i)
		},
	)
}

p.Wait()

Example of using a pool where errors occur, but shouldn't stop execution:

ctx := context.Background()
client := http.Client{}

p, err := pooled.New("name", runtime.NumCPU())
if err != nil {
	panic(err)
}
defer p.Close()

e := goroutines.Errors{}
ch := make(chan *client.Response, runtime.NumCPU())

// urls would just be some []string containing URLs.
for _, url := range urls {
	url := url

	p.Submit(
		ctx,
		func(ctx context.Context) {
			resp, err := client.Get(url)
			if err != nil {
				e.Record(err)
				return
			}
			ch <- resp
		},
	)
}

go func() {
	for _, resp := range ch {
		if resp.Status != 200 {
			fmt.Printf("URL(%s) has response %d", resp.Request.URL, resp.Status)
			continue
		}
		b, err := ReadAll(resp.Body)
		if err != nil {
			fmt.Printf("URL(%s) had error when reading the body: %s", resp.Request.URL, err)
		}
		fmt.Println(string(b))
	}
}()

p.Wait()
close(ch)

for _, err := range e.Errors() {
	fmt.Println("had http.Client error: ", err")
}

Example of using a pool where errors occur and should stop exeuction:

ctx, cancel := context.WithCancel(context.Background())
client := http.Client{}

p, err := pooled.New("name", runtime.NumCPU())
if err != nil {
	panic(err)
}
defer p.Close()

e := goroutines.Errors{}
ch := make(chan *client.Response, runtime.NumCPU())

// urls would just be some []string containing URLs.
for _, url := range urls {
	url := url

	if ctx.Err() != nil {
		break
	}

	p.Submit(
		ctx,
		func(ctx context.Context) {
			if ctx.Err() != nil {
				return
			}
			resp, err := client.Get(url)
			if err != nil {
				e.Record(err)
				cancel()
				return
			}
			ch <- resp
		},
	)
}

... Rest of the code from the last example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Errors

type Errors struct {
	// contains filtered or unexported fields
}

Errors is a concurrency safe way of capturing a set of errors in multiple goroutines.

func (*Errors) Error

func (e *Errors) Error() error

Error returns the first error recieved.

func (*Errors) Errors

func (e *Errors) Errors() []error

Errors returns all errors.

func (*Errors) Record

func (e *Errors) Record(err error)

Record writes an error to Errors.

type Job

type Job func(ctx context.Context)

Job is a job for a Pool to execute.

type Pool

type Pool interface {
	// Submit submits a Job to be run.
	Submit(ctx context.Context, runner Job, options ...SubmitOption) error
	// Close closes the goroutine pool. This will call Wait() before it closes.
	Close()
	// Wait will wait for all goroutines to finish. This should only be called if
	// you have stopped calling Submit().
	Wait()
	// Len indicates how big the pool is.
	Len() int
	// Running returns how many goroutines are currently in flight.
	Running() int

	// GetName gets the name of the goroutines pool. This may differ from the name
	// that was passed in when creating the pool. This can be caused by a naming conflict,
	// which causes the name to be appended by a dash(-) and a number.
	GetName() string

	pool.Preventer // Prevents outside packages from implementing Pool.
}

Pool is the minimum interface that any goroutine pool must implement. This can only be created by using a sub-package that implements this interface.

type SubmitOption

type SubmitOption func(opt pool.SubmitOptions) (pool.SubmitOptions, error)

SubmitOption is an option for Pool.Submit().

Directories

Path Synopsis
internal
register
Package register has a registration method that a goroutine.Pool implementation can use to register the pool.
Package register has a registration method that a goroutine.Pool implementation can use to register the pool.
Package limited provides a groutine execution Pool that spins a goroutine per Submit() but is hard limited to the number of goroutines that can run at any time.
Package limited provides a groutine execution Pool that spins a goroutine per Submit() but is hard limited to the number of goroutines that can run at any time.
Package pooled provides a Pool of goroutines where you can submit Jobs to be run when by an exisiting goroutine instead of spinning off a new goroutine.
Package pooled provides a Pool of goroutines where you can submit Jobs to be run when by an exisiting goroutine instead of spinning off a new goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL