Documentation ¶
Overview ¶
Package sync implements synchronization facililites such as worker pools.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type PooledWorkerPool ¶
type PooledWorkerPool interface { // Init initializes the pool. Init() // Go assign the Work to be executed by a Goroutine. Whether or not // it waits for an existing Goroutine to become available or not // is determined by the GrowOnDemand() option. If GrowOnDemand is not // set then the call to Go() will block until a goroutine is available. // If GrowOnDemand() is set then it will expand the pool of goroutines to // accommodate the work. The newly allocated goroutine will temporarily // participate in the pool in an effort to amortize its allocation cost, but // will eventually be killed. This allows the pool to dynamically respond to // workloads without causing excessive memory pressure. The pool will grow in // size when the workload exceeds its capacity and shrink back down to its // original size if/when the burst subsides. Go(work Work) // GoWithTimeout waits up to the given timeout for a worker to become // available, returning true if a worker becomes available, or false // otherwise. GoWithTimeout(work Work, timeout time.Duration) bool }
PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.
In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.
The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.
func NewPooledWorkerPool ¶
func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)
NewPooledWorkerPool creates a new worker pool.
type PooledWorkerPoolOptions ¶
type PooledWorkerPoolOptions interface { // SetGrowOnDemand sets whether the GrowOnDemand feature is enabled. SetGrowOnDemand(value bool) PooledWorkerPoolOptions // GrowOnDemand returns whether the GrowOnDemand feature is enabled. GrowOnDemand() bool // SetNumShards sets the number of worker channel shards. SetNumShards(value int64) PooledWorkerPoolOptions // NumShards returns the number of worker channel shards. NumShards() int64 // SetKillWorkerProbability sets the probability to kill a worker. SetKillWorkerProbability(value float64) PooledWorkerPoolOptions // KillWorkerProbability returns the probability to kill a worker. KillWorkerProbability() float64 // SetNowFn sets the now function. SetNowFn(value NowFn) PooledWorkerPoolOptions // NowFn returns the now function. NowFn() NowFn // SetInstrumentOptions sets the instrument options. SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions // InstrumentOptions returns the now function. InstrumentOptions() instrument.Options }
PooledWorkerPoolOptions is the options for a PooledWorkerPool.
func NewPooledWorkerPoolOptions ¶
func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions
NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options
type WorkerPool ¶
type WorkerPool interface { // Init initializes the pool. Init() // Go waits until the next wbyorker becomes available and executes it. Go(work Work) // GoIfAvailable performs the work inside a worker if one is available and // returns true, or false otherwise. GoIfAvailable(work Work) bool // GoWithTimeout waits up to the given timeout for a worker to become // available, returning true if a worker becomes available, or false // otherwise. GoWithTimeout(work Work, timeout time.Duration) bool }
WorkerPool provides a pool for goroutines.
Example ¶
package main import ( "fmt" "log" "sync" xsync "github.com/m3db/m3/src/x/sync" ) type response struct { a int } func main() { var ( wg sync.WaitGroup workers = xsync.NewWorkerPool(3) errorCh = make(chan error, 1) numRequests = 9 responses = make([]response, numRequests) ) wg.Add(numRequests) workers.Init() for i := 0; i < numRequests; i++ { // Capture loop variable. i := i // Execute request on worker pool. workers.Go(func() { defer wg.Done() var err error // Perform some work which may fail. resp := response{a: i} if err != nil { // Return the first error that is encountered. select { case errorCh <- err: default: } return } // Can concurrently modify responses since each iteration updates a // different index. responses[i] = resp }) } // Wait for all requests to finish. wg.Wait() close(errorCh) if err := <-errorCh; err != nil { log.Fatal(err) } var total int for _, r := range responses { total += r.a } fmt.Printf("Total is %v", total) }
Output: Total is 36
func NewWorkerPool ¶
func NewWorkerPool(size int) WorkerPool
NewWorkerPool creates a new worker pool.