pool

package
v1.98.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

README

async/pool

This package is currently deprecated in favor of github.com/sourcegraph/conc/pool.

Migrating

Most of the functionality from the original package is available in the conc/pool package. The main difference is that there is no ability to control what happens when a worker is unavailable. Previously, one could include a dynamically resizable buffer or constant size buffer and how Schedule would react when the buffer was full. This is no longer possible, instead there is no buffer and the Go method will always block until a worker is available.

Creating a constant pool for workers (pool.New)

The majority use case of this package was to create a pool of goroutines, of a specific size, that would process work.

Old
p := pool.New(ctx,
  pool.ConstantSize(numOfWorkers),
)

p.Schedule(ctx, async.Func(func(ctx context.Context) error {
  // Do work
}))
New
p := pool.New().WithContext(ctx).WithMaxGoroutines(numOfWorkers)

// blocks if there is no available worker.
//
// Note: Use `pool.New().WithErrors()` if you want to keep the
// `func(ctx) error` signature.
p.Go(func(ctx context.Context) {
  // Do work
})

Creating a Pool and waiting for it to finish (pool.WithWait)

The pool.WithWait function is no longer necessary. Instead, the pool.Pool type has a Wait method that will block until all workers have finished.

Old
p := pool.New(ctx,
  pool.ConstantSize(numOfWorkers),
)

var wait func()
p, wait = pool.WithWait(p)

go p.Schedule(ctx, async.Func(func(ctx context.Context) error {
  // Do work
}))

wait()
New
p := pool.New().WithContext(ctx).WithMaxGoroutines(numOfWorkers)

// Note: Use `pool.New().WithErrors()` if you want to keep the
// `func(ctx) error` signature.
go p.Go(func(ctx context.Context) {
  // Do work
})

p.Wait()

Original README

See ORIGINAL_README.md for the original README.

Documentation

Overview

Package pool implements an async pool

Index

Examples

Constants

This section is empty.

Variables

View Source
var RejectWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error {
	ctx, cancel := orerr.CancelWithError(ctx)
	select {
	case <-ctx.Done():
		return r.Run(ctx)
	case queue <- unit{Context: ctx, Runner: r}:
		return nil
	default:
		cancel(orerr.LimitExceededError{
			Kind: "PoolQueue",
		})
		return r.Run(ctx)
	}
})

RejectWhenFull tries to schedule async.Runner for period when context is alive When underlying buffered channel is full then it cancels the context with orerr.LimitExceededError

View Source
var WaitWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error {
	select {
	case <-ctx.Done():
		return r.Run(ctx)
	case queue <- unit{Context: ctx, Runner: r}:
		return nil
	}
})

WaitWhenFull tries to schedule async.Runner for period when context is alive It blocks When underlying buffered channel is full

Functions

This section is empty.

Types

type Option

type Option interface {
	Apply(*Options)
}

Option allows to functional options pattern to configure pool

type OptionFunc

type OptionFunc func(*Options)

OptionFunc help to implement Option interface

func BufferLength

func BufferLength(size int) OptionFunc

BufferLength helps to set BufferLength option

func ConstantSize deprecated

func ConstantSize(size int) OptionFunc

ConstantSize provides constant size for the pool.

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no equivalent because calls to (*Pool).Go() will block if there are no free workers. Control the size of the worker pool by calling (*Pool).WithMaxGoroutines(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

func ResizeEvery

func ResizeEvery(d time.Duration) OptionFunc

ResizeEvery helps to set ResizeEvery option

func Size

func Size(sizeFunc SizeFunc) OptionFunc

Size helps to set Size option

func (OptionFunc) Apply

func (of OptionFunc) Apply(opts *Options)

Apply implementation of Option interface

type Options

type Options struct {
	// Size allows to dynamically resolve number of workers that should spawned
	Size SizeFunc

	// ResizeEvery defined intervals when pool will be resized (shrank or grown)
	ResizeEvery time.Duration

	// ScheduleBehavior defines how exactly will Schedule method behave.
	// The WaitWhenFull is used by default if no value is provided
	ScheduleBehavior ScheduleBehavior

	// BufferLength defines size of buffered channel queue
	BufferLength int

	// Pool name for logging reasons
	Name string
}

A Options provides pool configuration

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool structure

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/getoutreach/gobox/pkg/async"
	"github.com/getoutreach/gobox/pkg/async/pool"
)

func main() {
	var (
		concurrency = 5
		items       = 10
		sleepFor    = 5 * time.Millisecond
	)
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	// Spawn pool of workers
	p := pool.New(ctx,
		pool.ConstantSize(concurrency),
		pool.ResizeEvery(5*time.Minute),
		pool.BufferLength(256),
		pool.WaitWhenFull,
	)
	defer p.Close()
	// Wrap it with timeout for schedule
	scheduler := pool.WithTimeout(5*time.Millisecond, p)

	// Lets wait for all scheduled items from this point
	scheduler, wait := pool.WithWait(scheduler)
	scheduler = pool.WithLogging("test-worker-pool", scheduler)

	output := make(chan string, items)
	now := time.Now()

	for i := 0; i < items; i++ {
		func(i int) {
			// All input and output is captured by closure
			scheduler.Schedule(ctx, async.Func(func(ctx context.Context) error {
				// It is very important to check the context error:
				// - Given context might be done
				// - Underlying buffered channel is full
				// - Pool is in shutdown phase
				if ctx.Err() != nil {
					return ctx.Err()
				}
				time.Sleep(sleepFor)
				batchN := (time.Since(now) / (sleepFor))
				output <- fmt.Sprintf("task_%d_%d", batchN, i)
				// returned error is logged but not returned by Schedule function
				return nil
			}))
		}(i)
	}
	wait()
	close(output)
	for s := range output {
		fmt.Println(s)
	}
	// Not using unordered output since it not deterministic
	// task_1_3
	// task_1_4
	// task_1_0
	// task_1_1
	// task_1_2
	// task_2_6
	// task_2_9
	// task_2_5
	// task_2_7
	// task_2_8
}
Output:

func New deprecated

func New(ctx context.Context, options ...Option) *Pool

New creates new instance of Pool and start goroutine that will spawn the workers Call Close() to release pool resource

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Use (*Pool).New().WithContext() instead. Replace calls to Schedule with (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

func (*Pool) Close

func (p *Pool) Close()

Close blocks until all workers finshes current items and terminates

func (*Pool) Schedule deprecated

func (p *Pool) Schedule(ctx context.Context, r async.Runner) error

Schedule tries to schedule runner for processing in the pool It is required to check provided context for an error. The async.Runner interface will be called in all cases: - When item gets successfully scheduled and withdrawn by worker - When the given context is Done and item is not scheduled (Timeout, buffered queue full) - When pool is in shutdown phase.

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Replace calls to Schedule with (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

type ScheduleBehavior

type ScheduleBehavior func(context.Context, chan unit, async.Runner) error

ScheduleBehavior defines the behavior of pool Schedule method

func (ScheduleBehavior) Apply

func (sb ScheduleBehavior) Apply(opts *Options)

Apply implementation of Option interface

type Scheduler

type Scheduler interface {
	// Schedule task for processing in the pool
	//
	// Deprecated: This library is being deprecated in favor of using
	// https://pkg.go.dev/github.com/sourcegraph/conc/pool instead.
	// Replaces calls to Schedule with (*Pool).Go().  For more information,
	// see the README:
	// https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
	Schedule(ctx context.Context, r async.Runner) error
}

func WithLogging deprecated added in v1.47.2

func WithLogging(name string, s Scheduler) Scheduler

WithLogging creates a scheduler which logs the errors returned from the scheduling as well as executing phase.

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no replacement for this function. Instead, log on each item when calling (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

func WithTimeout deprecated

func WithTimeout(timeout time.Duration, scheduler Scheduler) Scheduler

WithTimeout creates enqueuer that cancel enqueueing after given timeout

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no equivalent to this function in the new library. For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

func WithWait deprecated

func WithWait(s Scheduler) (scheduler Scheduler, wait func())

WithWait wraps a scheduler and returns a new scheduler and a function that blocks until all scheduled tasks are processed or have failed to enqueue.

Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Use (*Pool).Wait() instead. For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md

type SchedulerFunc

type SchedulerFunc func(ctx context.Context, r async.Runner) error

func (SchedulerFunc) Schedule

func (sf SchedulerFunc) Schedule(ctx context.Context, r async.Runner) error

type SizeFunc

type SizeFunc func() int

SizeFunc tells the pool whether it should increase or decrease number of workers

type Wait

type Wait struct {
	Scheduler Scheduler
	sync.WaitGroup
}

Wait is a scheduler that allows you to wait until all scheduled tasks are processed or have failed to enqueue. It can be used when you need to wait for all items from one batch to be processed.

func (*Wait) Schedule

func (w *Wait) Schedule(ctx context.Context, r async.Runner) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL