Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var RejectWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { ctx, cancel := orerr.CancelWithError(ctx) select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil default: cancel(orerr.LimitExceededError{ Kind: "PoolQueue", }) return r.Run(ctx) } })
RejectWhenFull tries to schedule async.Runner for period when context is alive When underlying buffered channel is full then it cancels the context with orerr.LimitExceededError
var WaitWhenFull = ScheduleBehavior(func(ctx context.Context, queue chan unit, r async.Runner) error { select { case <-ctx.Done(): return r.Run(ctx) case queue <- unit{Context: ctx, Runner: r}: return nil } })
WaitWhenFull tries to schedule async.Runner for period when context is alive It blocks When underlying buffered channel is full
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option interface {
Apply(*Options)
}
Option allows to functional options pattern to configure pool
type OptionFunc ¶
type OptionFunc func(*Options)
OptionFunc help to implement Option interface
func BufferLength ¶
func BufferLength(size int) OptionFunc
BufferLength helps to set BufferLength option
func ResizeEvery ¶
func ResizeEvery(d time.Duration) OptionFunc
ResizeEvery helps to set ResizeEvery option
func (OptionFunc) Apply ¶
func (of OptionFunc) Apply(opts *Options)
Apply implementation of Option interface
type Options ¶
type Options struct { // Size allows to dynamically resolve number of workers that should spawned Size SizeFunc // ResizeEvery defined intervals when pool will be resized (shrank or grown) ResizeEvery time.Duration // ScheduleBehavior defines how exactly will Schedule method behave. // The WaitWhenFull is used by default if no value is provided ScheduleBehavior ScheduleBehavior // BufferLength defines size of buffered channel queue BufferLength int // Pool name for logging reasons Name string }
A Options provides pool configuration
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool structure
Example ¶
package main import ( "context" "fmt" "time" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/async/pool" ) func main() { var ( concurrency = 5 items = 10 sleepFor = 5 * time.Millisecond ) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() // Spawn pool of workers p := pool.New(ctx, pool.ConstantSize(concurrency), pool.ResizeEvery(5*time.Minute), pool.BufferLength(256), pool.WaitWhenFull, ) defer p.Close() // Wrap it with timeout for schedule scheduler := pool.WithTimeout(5*time.Millisecond, p) // Lets wait for all scheduled items from this point scheduler, wait := pool.WithWait(scheduler) output := make(chan string, items) now := time.Now() for i := 0; i < items; i++ { func(i int) { // All input and output is captured by closure scheduler.Schedule(ctx, async.Func(func(ctx context.Context) error { // It is very important to check the context error: // - Given context might be done // - Underlying buffered channel is full // - Pool is in shutdown phase if ctx.Err() != nil { return ctx.Err() } time.Sleep(sleepFor) batchN := (time.Since(now) / (sleepFor)) output <- fmt.Sprintf("task_%d_%d", batchN, i) // returned error is logged but not returned by Schedule function return nil })) }(i) } wait() close(output) for s := range output { fmt.Println(s) } // Not using unordered output since it not deterministic // task_1_3 // task_1_4 // task_1_0 // task_1_1 // task_1_2 // task_2_6 // task_2_9 // task_2_5 // task_2_7 // task_2_8 }
Output:
func New ¶
New creates new instance of Pool and start goroutine that will spawn the workers Call Close() to release pool resource
func (*Pool) Close ¶
func (p *Pool) Close()
Close blocks until all workers finshes current items and terminates
func (*Pool) Schedule ¶
Schedule tries to schedule runner for processing in the pool It is required to check provided context for an error. The async.Runner interface will be called in all cases: - When item gets successfully scheduled and withdrawn by worker - When the given context is Done and item is not scheduled (Timeout, buffered queue full) - When pool is in shutdown phase.
type ScheduleBehavior ¶
ScheduleBehavior defines the behavior of pool Schedule method
func (ScheduleBehavior) Apply ¶
func (sb ScheduleBehavior) Apply(opts *Options)
Apply implementation of Option interface
type Scheduler ¶
type Scheduler interface { // Schedule task for processing in the pool Schedule(ctx context.Context, r async.Runner) error }
func WithTimeout ¶
WithTimeout creates enqueuer that cancel enqueueing after given timeout
type SchedulerFunc ¶
type SizeFunc ¶
type SizeFunc func() int
SizeFunc tells the pool whether it should increase or decrease number of workers