Documentation ¶
Overview ¶
Package pool implements an async pool
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var RejectWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { ctx, cancel := orerr.CancelWithError(ctx) select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil default: cancel(orerr.LimitExceededError{ Kind: "PoolQueue", }) return r.Run(ctx) } })
RejectWhenFull tries to schedule async.Runner for period when context is alive When underlying buffered channel is full then it cancels the context with orerr.LimitExceededError
var WaitWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil } })
WaitWhenFull tries to schedule async.Runner for period when context is alive It blocks When underlying buffered channel is full
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option interface {
Apply(*Options)
}
Option allows to functional options pattern to configure pool
type OptionFunc ¶
type OptionFunc func(*Options)
OptionFunc help to implement Option interface
func BufferLength ¶
func BufferLength(size int) OptionFunc
BufferLength helps to set BufferLength option
func ConstantSize
deprecated
func ConstantSize(size int) OptionFunc
ConstantSize provides constant size for the pool.
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no equivalent because calls to (*Pool).Go() will block if there are no free workers. Control the size of the worker pool by calling (*Pool).WithMaxGoroutines(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
func ResizeEvery ¶
func ResizeEvery(d time.Duration) OptionFunc
ResizeEvery helps to set ResizeEvery option
func (OptionFunc) Apply ¶
func (of OptionFunc) Apply(opts *Options)
Apply implementation of Option interface
type Options ¶
type Options struct { // Size allows to dynamically resolve number of workers that should spawned Size SizeFunc // ResizeEvery defined intervals when pool will be resized (shrank or grown) ResizeEvery time.Duration // ScheduleBehavior defines how exactly will Schedule method behave. // The WaitWhenFull is used by default if no value is provided ScheduleBehavior ScheduleBehavior // BufferLength defines size of buffered channel queue BufferLength int // Pool name for logging reasons Name string }
A Options provides pool configuration
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool structure
Example ¶
package main import ( "context" "fmt" "time" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/async/pool" ) func main() { var ( concurrency = 5 items = 10 sleepFor = 5 * time.Millisecond ) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() // Spawn pool of workers p := pool.New(ctx, pool.ConstantSize(concurrency), pool.ResizeEvery(5*time.Minute), pool.BufferLength(256), pool.WaitWhenFull, ) defer p.Close() // Wrap it with timeout for schedule scheduler := pool.WithTimeout(5*time.Millisecond, p) // Lets wait for all scheduled items from this point scheduler, wait := pool.WithWait(scheduler) scheduler = pool.WithLogging("test-worker-pool", scheduler) output := make(chan string, items) now := time.Now() for i := 0; i < items; i++ { func(i int) { // All input and output is captured by closure scheduler.Schedule(ctx, async.Func(func(ctx context.Context) error { // It is very important to check the context error: // - Given context might be done // - Underlying buffered channel is full // - Pool is in shutdown phase if ctx.Err() != nil { return ctx.Err() } time.Sleep(sleepFor) batchN := (time.Since(now) / (sleepFor)) output <- fmt.Sprintf("task_%d_%d", batchN, i) // returned error is logged but not returned by Schedule function return nil })) }(i) } wait() close(output) for s := range output { fmt.Println(s) } // Not using unordered output since it not deterministic // task_1_3 // task_1_4 // task_1_0 // task_1_1 // task_1_2 // task_2_6 // task_2_9 // task_2_5 // task_2_7 // task_2_8 }
Output:
func New
deprecated
New creates new instance of Pool and start goroutine that will spawn the workers Call Close() to release pool resource
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Use (*Pool).New().WithContext() instead. Replace calls to Schedule with (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
func (*Pool) Close ¶
func (p *Pool) Close()
Close blocks until all workers finshes current items and terminates
func (*Pool) Schedule
deprecated
Schedule tries to schedule runner for processing in the pool It is required to check provided context for an error. The async.Runner interface will be called in all cases: - When item gets successfully scheduled and withdrawn by worker - When the given context is Done and item is not scheduled (Timeout, buffered queue full) - When pool is in shutdown phase.
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Replace calls to Schedule with (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
type ScheduleBehavior ¶
ScheduleBehavior defines the behavior of pool Schedule method
func (ScheduleBehavior) Apply ¶
func (sb ScheduleBehavior) Apply(opts *Options)
Apply implementation of Option interface
type Scheduler ¶
type Scheduler interface { // Schedule task for processing in the pool // // Deprecated: This library is being deprecated in favor of using // https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. // Replaces calls to Schedule with (*Pool).Go(). For more information, // see the README: // https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md Schedule(ctx context.Context, r async.Runner) error }
func WithLogging
deprecated
added in
v1.47.2
WithLogging creates a scheduler which logs the errors returned from the scheduling as well as executing phase.
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no replacement for this function. Instead, log on each item when calling (*Pool).Go(). For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
func WithTimeout
deprecated
WithTimeout creates enqueuer that cancel enqueueing after given timeout
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. There is no equivalent to this function in the new library. For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
func WithWait
deprecated
WithWait wraps a scheduler and returns a new scheduler and a function that blocks until all scheduled tasks are processed or have failed to enqueue.
Deprecated: This library is being deprecated in favor of using https://pkg.go.dev/github.com/sourcegraph/conc/pool instead. Use (*Pool).Wait() instead. For more information, see the README: https://github.com/getoutreach/gobox/tree/main/pkg/async/pool/README.md
type SchedulerFunc ¶
type SizeFunc ¶
type SizeFunc func() int
SizeFunc tells the pool whether it should increase or decrease number of workers