sync

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2021 License: Apache-2.0 Imports: 15 Imported by: 55

Documentation

Overview

Package sync implements synchronization facililites such as worker pools.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CPUCore added in v0.15.4

func CPUCore() int

CPUCore returns the current CPU core.

func NumCores added in v0.15.4

func NumCores() int

NumCores returns the number of cores returned from OS-dependent checkNumCores(), if not available only returns 1

Types

type CoreFn added in v1.4.0

type CoreFn func() int

CoreFn is a function that returns the ID of the CPU currently running this goroutine.

type NewPooledWorkerFn added in v1.2.0

type NewPooledWorkerFn func(opts NewPooledWorkerOptions) (PooledWorkerPool, error)

NewPooledWorkerFn returns a pooled worker pool that Init must be called on.

type NewPooledWorkerOptions added in v1.2.0

type NewPooledWorkerOptions struct {
	InstrumentOptions instrument.Options
}

NewPooledWorkerOptions is a set of new instrument worker pool options.

type NowFn

type NowFn func() time.Time

NowFn is a function that returns the current time.

type PooledWorkerPool

type PooledWorkerPool interface {
	// Init initializes the pool.
	Init()

	// Go assign the Work to be executed by a Goroutine. Whether or not
	// it waits for an existing Goroutine to become available or not
	// is determined by the GrowOnDemand() option. If GrowOnDemand is not
	// set then the call to Go() will block until a goroutine is available.
	// If GrowOnDemand() is set then it will expand the pool of goroutines to
	// accommodate the work. The newly allocated goroutine will temporarily
	// participate in the pool in an effort to amortize its allocation cost, but
	// will eventually be killed. This allows the pool to dynamically respond to
	// workloads without causing excessive memory pressure. The pool will grow in
	// size when the workload exceeds its capacity and shrink back down to its
	// original size if/when the burst subsides.
	Go(work Work)

	// GoWithTimeout waits up to the given timeout for a worker to become
	// available, returning true if a worker becomes available, or false
	// otherwise.
	GoWithTimeout(work Work, timeout time.Duration) bool

	// GoWithContext waits until a worker is available or the provided ctx is
	// canceled.
	GoWithContext(ctx gocontext.Context, work Work) bool

	// FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls.
	// This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize
	// iterations.
	// This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not
	// checked the calling goroutine blocks waiting for a worker.
	FastContextCheck(batchSize int) PooledWorkerPool
}

PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.

In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.

The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.

func NewPooledWorkerPool

func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)

NewPooledWorkerPool creates a new worker pool.

type PooledWorkerPoolOptions

type PooledWorkerPoolOptions interface {
	// SetGrowOnDemand sets whether the GrowOnDemand feature is enabled.
	SetGrowOnDemand(value bool) PooledWorkerPoolOptions

	// GrowOnDemand returns whether the GrowOnDemand feature is enabled.
	GrowOnDemand() bool

	// SetNumShards sets the number of worker channel shards.
	SetNumShards(value int64) PooledWorkerPoolOptions

	// NumShards returns the number of worker channel shards.
	NumShards() int64

	// SetKillWorkerProbability sets the probability to kill a worker.
	SetKillWorkerProbability(value float64) PooledWorkerPoolOptions

	// KillWorkerProbability returns the probability to kill a worker.
	KillWorkerProbability() float64

	// SetNowFn sets the now function.
	SetNowFn(value NowFn) PooledWorkerPoolOptions

	// NowFn returns the now function.
	NowFn() NowFn

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions

	// InstrumentOptions returns the now function.
	InstrumentOptions() instrument.Options
}

PooledWorkerPoolOptions is the options for a PooledWorkerPool.

func NewPooledWorkerPoolOptions

func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions

NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options

type ScheduleResult added in v1.2.0

type ScheduleResult struct {
	// Available is true if the goroutine was scheduled in the worker pool. False if the request timed out before a
	// worker became available.
	Available bool
	// WaitTime is how long the goroutine had to wait before receiving a worker from the pool or timing out.
	WaitTime time.Duration
}

ScheduleResult is the result of scheduling a goroutine in the worker pool.

type Work

type Work func()

Work is a unit of item to be worked on.

type WorkerPool

type WorkerPool interface {
	// Init initializes the pool.
	Init()

	// Go waits until the next worker becomes available and executes it.
	Go(work Work)

	// GoInstrument instruments Go with timing information.
	GoInstrument(work Work) ScheduleResult

	// GoIfAvailable performs the work inside a worker if one is available and
	// returns true, or false otherwise.
	GoIfAvailable(work Work) bool

	// GoWithTimeout waits up to the given timeout for a worker to become
	// available, returning true if a worker becomes available, or false
	// otherwise.
	GoWithTimeout(work Work, timeout time.Duration) bool

	// GoWithTimeoutInstrument instruments GoWithTimeout with timing information.
	GoWithTimeoutInstrument(work Work, timeout time.Duration) ScheduleResult

	// GoWithContext waits until a worker is available or the provided ctx is canceled.
	GoWithContext(ctx context.Context, work Work) ScheduleResult

	// FastContextCheck returns a wrapper worker pool that only checks the context deadline every batchSize calls.
	// This is useful for tight looping code that wants to amortize the cost of the ctx deadline check over batchSize
	// iterations.
	// This should only be used for code that can guarantee the wait time for a worker is low since if the ctx is not
	// checked the calling goroutine blocks waiting for a worker.
	FastContextCheck(batchSize int) WorkerPool

	// Size returns the size of the worker pool.
	Size() int
}

WorkerPool provides a pool for goroutines.

Example
package main

import (
	"fmt"
	"log"
	"sync"

	xsync "github.com/m3db/m3/src/x/sync"
)

type response struct {
	a int
}

func main() {
	var (
		wg          sync.WaitGroup
		workers     = xsync.NewWorkerPool(3)
		errorCh     = make(chan error, 1)
		numRequests = 9
		responses   = make([]response, numRequests)
	)

	wg.Add(numRequests)
	workers.Init()

	for i := 0; i < numRequests; i++ {
		// Capture loop variable.
		i := i

		// Execute request on worker pool.
		workers.Go(func() {
			defer wg.Done()

			var err error

			// Perform some work which may fail.
			resp := response{a: i}

			if err != nil {
				// Return the first error that is encountered.
				select {
				case errorCh <- err:
				default:
				}

				return
			}

			// Can concurrently modify responses since each iteration updates a
			// different index.
			responses[i] = resp
		})
	}

	// Wait for all requests to finish.
	wg.Wait()

	close(errorCh)
	if err := <-errorCh; err != nil {
		log.Fatal(err)
	}

	var total int
	for _, r := range responses {
		total += r.a
	}

	fmt.Printf("Total is %v", total)
}
Output:

Total is 36

func NewWorkerPool

func NewWorkerPool(size int) WorkerPool

NewWorkerPool creates a new worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL