Documentation ¶
Overview ¶
Package sync implements synchronization facililites such as worker pools.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CoreFn ¶ added in v1.4.0
type CoreFn func() int
CoreFn is a function that returns the ID of the CPU currently running this goroutine.
type NewPooledWorkerFn ¶ added in v1.2.0
type NewPooledWorkerFn func(opts NewPooledWorkerOptions) (PooledWorkerPool, error)
NewPooledWorkerFn returns a pooled worker pool that Init must be called on.
type NewPooledWorkerOptions ¶ added in v1.2.0
type NewPooledWorkerOptions struct {
InstrumentOptions instrument.Options
}
NewPooledWorkerOptions is a set of new instrument worker pool options.
type PooledWorkerPool ¶
type PooledWorkerPool interface { // Init initializes the pool. Init() // Go assign the Work to be executed by a Goroutine. Whether or not // it waits for an existing Goroutine to become available or not // is determined by the GrowOnDemand() option. If GrowOnDemand is not // set then the call to Go() will block until a goroutine is available. // If GrowOnDemand() is set then it will expand the pool of goroutines to // accommodate the work. The newly allocated goroutine will temporarily // participate in the pool in an effort to amortize its allocation cost, but // will eventually be killed. This allows the pool to dynamically respond to // workloads without causing excessive memory pressure. The pool will grow in // size when the workload exceeds its capacity and shrink back down to its // original size if/when the burst subsides. Go(work Work) // GoWithTimeout waits up to the given timeout for a worker to become // available, returning true if a worker becomes available, or false // otherwise. GoWithTimeout(work Work, timeout time.Duration) bool // GoWithContext waits until a worker is available or the provided ctx is // canceled. GoWithContext(ctx gocontext.Context, work Work) bool // FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls. // This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize // iterations. // This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not // checked the calling goroutine blocks waiting for a worker. FastContextCheck(batchSize int) PooledWorkerPool }
PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.
In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.
The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.
func NewPooledWorkerPool ¶
func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)
NewPooledWorkerPool creates a new worker pool.
type PooledWorkerPoolOptions ¶
type PooledWorkerPoolOptions interface { // SetGrowOnDemand sets whether the GrowOnDemand feature is enabled. SetGrowOnDemand(value bool) PooledWorkerPoolOptions // GrowOnDemand returns whether the GrowOnDemand feature is enabled. GrowOnDemand() bool // SetNumShards sets the number of worker channel shards. SetNumShards(value int64) PooledWorkerPoolOptions // NumShards returns the number of worker channel shards. NumShards() int64 // SetKillWorkerProbability sets the probability to kill a worker. SetKillWorkerProbability(value float64) PooledWorkerPoolOptions // KillWorkerProbability returns the probability to kill a worker. KillWorkerProbability() float64 // SetNowFn sets the now function. SetNowFn(value NowFn) PooledWorkerPoolOptions // NowFn returns the now function. NowFn() NowFn // SetInstrumentOptions sets the instrument options. SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions // InstrumentOptions returns the now function. InstrumentOptions() instrument.Options }
PooledWorkerPoolOptions is the options for a PooledWorkerPool.
func NewPooledWorkerPoolOptions ¶
func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions
NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options
type ScheduleResult ¶ added in v1.2.0
type ScheduleResult struct { // Available is true if the goroutine was scheduled in the worker pool. False if the request timed out before a // worker became available. Available bool // WaitTime is how long the goroutine had to wait before receiving a worker from the pool or timing out. WaitTime time.Duration }
ScheduleResult is the result of scheduling a goroutine in the worker pool.
type WorkerPool ¶
type WorkerPool interface { // Init initializes the pool. Init() // Go waits until the next worker becomes available and executes it. Go(work Work) // GoInstrument instruments Go with timing information. GoInstrument(work Work) ScheduleResult // GoIfAvailable performs the work inside a worker if one is available and // returns true, or false otherwise. GoIfAvailable(work Work) bool // GoWithTimeout waits up to the given timeout for a worker to become // available, returning true if a worker becomes available, or false // otherwise. GoWithTimeout(work Work, timeout time.Duration) bool // GoWithTimeoutInstrument instruments GoWithTimeout with timing information. GoWithTimeoutInstrument(work Work, timeout time.Duration) ScheduleResult // GoWithContext waits until a worker is available or the provided ctx is canceled. GoWithContext(ctx context.Context, work Work) ScheduleResult // FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls. // This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize // iterations. // This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not // checked the calling goroutine blocks waiting for a worker. FastContextCheck(batchSize int) WorkerPool // Size returns the size of the worker pool. Size() int }
WorkerPool provides a pool for goroutines.
Example ¶
package main import ( "fmt" "log" "sync" xsync "github.com/m3db/m3/src/x/sync" ) type response struct { a int } func main() { var ( wg sync.WaitGroup workers = xsync.NewWorkerPool(3) errorCh = make(chan error, 1) numRequests = 9 responses = make([]response, numRequests) ) wg.Add(numRequests) workers.Init() for i := 0; i < numRequests; i++ { // Capture loop variable. i := i // Execute request on worker pool. workers.Go(func() { defer wg.Done() var err error // Perform some work which may fail. resp := response{a: i} if err != nil { // Return the first error that is encountered. select { case errorCh <- err: default: } return } // Can concurrently modify responses since each iteration updates a // different index. responses[i] = resp }) } // Wait for all requests to finish. wg.Wait() close(errorCh) if err := <-errorCh; err != nil { log.Fatal(err) } var total int for _, r := range responses { total += r.a } fmt.Printf("Total is %v", total) }
Output: Total is 36
func NewWorkerPool ¶
func NewWorkerPool(size int) WorkerPool
NewWorkerPool creates a new worker pool.