sync

package
v1.0.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2020 License: Apache-2.0 Imports: 12 Imported by: 55

Documentation

Overview

Package sync implements synchronization facililites such as worker pools.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CPUCore added in v0.15.4

func CPUCore() int

CPUCore returns the current CPU core.

func NumCores added in v0.15.4

func NumCores() int

NumCores returns the number of cores returned from /proc/cpuinfo, if not available only returns 1

Types

type NowFn

type NowFn func() time.Time

NowFn is a function that returns the current time.

type PooledWorkerPool

type PooledWorkerPool interface {
	// Init initializes the pool.
	Init()

	// Go assign the Work to be executed by a Goroutine. Whether or not
	// it waits for an existing Goroutine to become available or not
	// is determined by the GrowOnDemand() option. If GrowOnDemand is not
	// set then the call to Go() will block until a goroutine is available.
	// If GrowOnDemand() is set then it will expand the pool of goroutines to
	// accommodate the work. The newly allocated goroutine will temporarily
	// participate in the pool in an effort to amortize its allocation cost, but
	// will eventually be killed. This allows the pool to dynamically respond to
	// workloads without causing excessive memory pressure. The pool will grow in
	// size when the workload exceeds its capacity and shrink back down to its
	// original size if/when the burst subsides.
	Go(work Work)

	// GoWithTimeout waits up to the given timeout for a worker to become
	// available, returning true if a worker becomes available, or false
	// otherwise.
	GoWithTimeout(work Work, timeout time.Duration) bool
}

PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.

In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.

The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.

func NewPooledWorkerPool

func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)

NewPooledWorkerPool creates a new worker pool.

type PooledWorkerPoolOptions

type PooledWorkerPoolOptions interface {
	// SetGrowOnDemand sets whether the GrowOnDemand feature is enabled.
	SetGrowOnDemand(value bool) PooledWorkerPoolOptions

	// GrowOnDemand returns whether the GrowOnDemand feature is enabled.
	GrowOnDemand() bool

	// SetNumShards sets the number of worker channel shards.
	SetNumShards(value int64) PooledWorkerPoolOptions

	// NumShards returns the number of worker channel shards.
	NumShards() int64

	// SetKillWorkerProbability sets the probability to kill a worker.
	SetKillWorkerProbability(value float64) PooledWorkerPoolOptions

	// KillWorkerProbability returns the probability to kill a worker.
	KillWorkerProbability() float64

	// SetNowFn sets the now function.
	SetNowFn(value NowFn) PooledWorkerPoolOptions

	// NowFn returns the now function.
	NowFn() NowFn

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions

	// InstrumentOptions returns the now function.
	InstrumentOptions() instrument.Options
}

PooledWorkerPoolOptions is the options for a PooledWorkerPool.

func NewPooledWorkerPoolOptions

func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions

NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options

type Work

type Work func()

Work is a unit of item to be worked on.

type WorkerPool

type WorkerPool interface {
	// Init initializes the pool.
	Init()

	// Go waits until the next wbyorker becomes available and executes it.
	Go(work Work)

	// GoIfAvailable performs the work inside a worker if one is available and
	// returns true, or false otherwise.
	GoIfAvailable(work Work) bool

	// GoWithTimeout waits up to the given timeout for a worker to become
	// available, returning true if a worker becomes available, or false
	// otherwise.
	GoWithTimeout(work Work, timeout time.Duration) bool
}

WorkerPool provides a pool for goroutines.

Example
package main

import (
	"fmt"
	"log"
	"sync"

	xsync "github.com/m3db/m3/src/x/sync"
)

type response struct {
	a int
}

func main() {
	var (
		wg          sync.WaitGroup
		workers     = xsync.NewWorkerPool(3)
		errorCh     = make(chan error, 1)
		numRequests = 9
		responses   = make([]response, numRequests)
	)

	wg.Add(numRequests)
	workers.Init()

	for i := 0; i < numRequests; i++ {
		// Capture loop variable.
		i := i

		// Execute request on worker pool.
		workers.Go(func() {
			defer wg.Done()

			var err error

			// Perform some work which may fail.
			resp := response{a: i}

			if err != nil {
				// Return the first error that is encountered.
				select {
				case errorCh <- err:
				default:
				}

				return
			}

			// Can concurrently modify responses since each iteration updates a
			// different index.
			responses[i] = resp
		})
	}

	// Wait for all requests to finish.
	wg.Wait()

	close(errorCh)
	if err := <-errorCh; err != nil {
		log.Fatal(err)
	}

	var total int
	for _, r := range responses {
		total += r.a
	}

	fmt.Printf("Total is %v", total)
}
Output:

Total is 36

func NewWorkerPool

func NewWorkerPool(size int) WorkerPool

NewWorkerPool creates a new worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL