Documentation
¶
Overview ¶
Package sync is a replacement for the sync package in the Go standard library. This package provides replacements for the sync.WaitGroup and sync.Pool types. These replacements provide additional safety and functionality over the standard library versions.
The Group type is a replacement for the stdlib/sync.WaitGroup type. It provides additional safety and functionality over the standard library version. It has some DNA from the golang.org/x/sync/errgroup package. Using it as a replacement for WaitGroup is as follows:
g.Group{} for i := 0; i < 100; i++ { g.Go( // Generally you can ignore the error, as this only happens if context is cancelled. ctx, func(ctx context.Context) error { // Do some work return nil }, ) } // err will be of the Errors type, which implements the error interface. // This will be nil if no errors were returned. // If it is not nil, it will contains IndexErr that wraps the error. // If you did not use the WithIndex() option, the IndexErr will have an index of -1. err := g.Wait(ctx)
You can also cause the Group to cancel all goroutines if any of them return an error by supplying the CancelOnErr field. This context.Cancel() will be called the moment an error is encountered. All inflight goroutines will be cancelled, but any goroutines that have already started will continue to run. Here is an example:
ctx, cancel := context.WithCancel(context.Background()) g := Group{CancelOnErr: cancel} for i := 0; i < 100; i++ { err := g.Go( ctx, func(ctx context.Context) error { if i == 3{ return errors.New("error") } return nil }, ) // The returned err is simply used to allow us to stop the loop when we hit an error. // It does not contain any errors from the Group. if err != nil { break } } if err := g.Wait(ctx); err != nil { fmt.Println(err) }
You can also use an exponential backoff to retry failed function calls. This is done by setting the Backoff field.
Finally, you can also set the .Pool so this uses a worker.Pool type for concurrency control and reuse. However, it is recommended to generally create a Group{} from one of the types in the worker package. This will prepopulate the Group with a pool and if using a Limited pool, it will have the prescribed concurrency level.
And in most situations, you should get the Pool from our context.Context object. This will provide a default worker.Pool.
This package also introduces the Pool type, which is a replacement for the stdlib/sync.Pool type. It provides additional safety and functionality over the standard library version in that it uses generics to ensure that you cannot put the wrong type into the pool. It also provides some advanced functionality for resetting types that implement the Resetter interface. Finally it is tied in with Open Telemetry to provide metrics.
Using it as a replacement for Pool is as follows:
p := sync.Pool[*bytes.Buffer]{ New: func() *Bytes.Buffer { return bytes.NewBuffer(make([]byte, 0, 1024)) }, } b := p.Get(ctx) defer p.Put(b)
If you want to get all the benefits of Pool metrics, create the Pool with NewPool():
p := sync.NewPool[*bytes.Buffer]( ctx, name: "bufferPool", func() *Bytes.Buffer { return bytes.NewBuffer(make([]byte, 0, 1024)) }, )
There are also a few new syncronization primitives for reducing lock contention.
WProtect provides a type that allows you to protect a value via atomic.Pointer. This is useful for when you have a value that is read frequently and written to infrequently. It is not a replacement for a mutex as you must make a deep copy of the value to change it.
Example:
wp := bsync.WProtect[Record, *Record] wp.Set(&Record{Value: 1}) // This takes a lock that only locks our writers, readers are never blocked.
The key thing here is that you must make a deep copy of the value to change it. This is because the value is protected by an atomic.Pointer. This can be done via the GetModifySet() method.
This also provides a ShardedMap type. These are sharded maps that allow you to reduce lock contention by sharding the map into multiple maps. This reduces lock contention when you have maps that are written to frequently.
Using the benchmarks I found for tidwall's sharded map, I modified the benchmarks for testing this:
go version go1.23.4 darwin/arm64 number of cpus: 10 number of keys: 1000000 keysize: 10 random seed: 1737043406258270000 -- sync.Map -- set: 1,000,000 ops over 10 threads in 781ms, 1,280,559/sec, 780 ns/op get: 1,000,000 ops over 10 threads in 343ms, 2,911,610/sec, 343 ns/op rng: 100 ops over 10 threads in 591ms, 169/sec, 5913101 ns/op del: 1,000,000 ops over 10 threads in 387ms, 2,584,923/sec, 386 ns/op -- stdlib map -- set: 1,000,000 ops over 10 threads in 511ms, 1,956,468/sec, 511 ns/op get: 1,000,000 ops over 10 threads in 146ms, 6,828,640/sec, 146 ns/op rng: 100 ops over 10 threads in 127ms, 787/sec, 1269165 ns/op del: 1,000,000 ops over 10 threads in 351ms, 2,848,299/sec, 351 ns/op -- github.com/orcaman/concurrent-map -- set: 1,000,000 ops over 10 threads in 134ms, 7,477,008/sec, 133 ns/op get: 1,000,000 ops over 10 threads in 30ms, 33,587,102/sec, 29 ns/op rng: 100 ops over 10 threads in 2228ms, 44/sec, 22282607 ns/op del: 1,000,000 ops over 10 threads in 76ms, 13,135,001/sec, 76 ns/op -- github.com/tidwall/shardmap -- set: 1,000,000 ops over 10 threads in 61ms, 16,479,736/sec, 60 ns/op get: 1,000,000 ops over 10 threads in 29ms, 34,268,482/sec, 29 ns/op rng: 100 ops over 10 threads in 139ms, 718/sec, 1392182 ns/op del: 1,000,000 ops over 10 threads in 48ms, 20,699,879/sec, 48 ns/op -- sync.ShardedMap -- [ours] set: 1,000,000 ops over 10 threads in 199ms, 5,027,980/sec, 198 ns/op get: 1,000,000 ops over 10 threads in 66ms, 15,164,247/sec, 65 ns/op del: 1,000,000 ops over 10 threads in 154ms, 6,475,097/sec, 154 ns/op This has ours as an improvement over the stdlib map with locks and the sync.Map one. It is not as fast as tidwall's sharded map, but it is generic and can use non-string keys. We may improve this in the future by using a faster hashmap implementation, which is what tidwall's sharded map uses. However, we have a new map implemtation coming to the stdlib in 1.24, so we are going to wait to see how that performs. In case anyone is wondering, if the sharded versions use a sync.Pool for the maphash, this adds significant overhead. maphash currently beats fnv 32 by significant margins. The key generation that is done with fmt.Sprintf is as fast as unsafe methods I tried.
Index ¶
- func OnceFunc(f func()) func()
- func OnceValue[T any](f func() T) func() T
- func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
- type Cond
- type Copier
- type Errors
- type GoOption
- type Group
- type IndexErr
- type Locker
- type Map
- type Mutex
- type Once
- type Option
- type Pool
- type RWMutex
- type Resetter
- type ShardedMap
- type WProtect
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OnceFunc ¶
func OnceFunc(f func()) func()
OnceFunc returns a function that invokes f only once. The returned function may be called concurrently. If f panics, the returned function will panic with the same value on every call.
func OnceValue ¶
func OnceValue[T any](f func() T) func() T
OnceValue returns a function that invokes f only once and returns the value returned by f. The returned function may be called concurrently.
If f panics, the returned function will panic with the same value on every call.
func OnceValues ¶
func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceValues returns a function that invokes f only once and returns the values returned by f. The returned function may be called concurrently.
If f panics, the returned function will panic with the same value on every call.
Types ¶
type Cond ¶
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
Each Cond has an associated Locker L (often a *Mutex or *RWMutex), which must be held when changing the condition and when calling the [Cond.Wait] method.
A Cond must not be copied after first use.
In the terminology of the Go memory model, Cond arranges that a call to [Cond.Broadcast] or [Cond.Signal] “synchronizes before” any Wait call that it unblocks.
For many simple use cases, users will be better off using channels than a Cond (Broadcast corresponds to closing a channel, and Signal corresponds to sending on a channel).
For more on replacements for sync.Cond, see Roberto Clapis's series on advanced concurrency patterns, as well as Bryan Mills's talk on concurrency patterns.
type Copier ¶
type Copier[T any] interface { // Copy returns a copy of the value. Thread-safe. Copy() *T }
Copier is a type that can be copied. The copy must not modify the original value and must make a deep copy of the value. This included making copies of any pointer or reference types. Aka, you must make new maps and slices and copy the old values into the new ones before modifying them.
type Errors ¶
type Errors struct {
// contains filtered or unexported fields
}
Errors implements the error interface and stores a collection of errors.
func (*Errors) Error ¶
Error returns all the errors joined together with errors.Join() and then converted to a string.
type GoOption ¶
type GoOption func(goOpts) goOpts
GoOption is an option for the .Go method.
func WithBackoff ¶
func WithBackoff(b *exponential.Backoff) GoOption
WithBackoff sets a backoff to use for retries. This overrides any backoff set on the Group for this call to Go().
func WithIndex ¶
WithIndex sets the index of the error in the Errors. This allows you to track an error back to data that was entered in some type of order, like from a slice. So if you take data from []string, you can tell which ones failed (like index 2) so that you can reprocess or do whatever is required.
type Group ¶
type Group struct { // Pool is an optional goroutines.Pool for concurrency control and reuse. Pool WorkerPool // CancelOnErr holds a CancelFunc that will be called if any goroutine // returns an error. This will automatically be called when Wait() is // finished and then reset to nil to allow reuse. CancelOnErr context.CancelFunc // Backoff is an optional backoff to use for retries. // If nil, no retries will be done. Backoff *exponential.Backoff // contains filtered or unexported fields }
Group provides a Group implementation that allows launching goroutines in safer way by handling the .Add() and .Done() methods in a standard sync.WaitGroup. This prevents problems where you forget to increment or decrement the sync.WaitGroup. In addition you can use a goroutines.Pool object to allow concurrency control and goroutine reuse (if you don't, it just uses a goroutine per call). It provides a Running() method that keeps track of how many goroutines are running. This can be used with the goroutines.Pool stats to understand what goroutines are in use. It has a CancelOnErr() method to allow mimicing of the golang.org/x/sync/errgroup package. Finally we provide OTEL support in the Group that can be named via the Group.Name string. This will provide span messages on the current span when Wait() is called and record any errors in the span.
func (*Group) Go ¶
func (w *Group) Go(ctx context.Context, f func(ctx context.Context) error, options ...GoOption) error
Go spins off a goroutine that executes f(). This will use the underlying Pool if provided. The context is used to allow cancellation of work before it is submitted to the underlying pool for execution. If the context passed on the first call to Go() has a span attached to it, the Group will create a new span to record all execution in. That span will be attached to the context passed Go() for each subsequent call. This is done once per Group until the Group is reset. The passed context is then passed to the function f, which must deal with individual context cancellation and recording any span information. The returned error only occurrs the function "f" is not run. This happends when the Context is cancelled.
type IndexErr ¶
type IndexErr struct { // Index is the index of the error. Index int // Err is the error that was returned. Err error }
IndexErr is an error that includes the index of the error. This will correlate with the order in which Group.Go() was called.
type Map ¶
Map is like a Go map[any]any but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
The zero Map is empty and ready for use. A Map must not be copied after first use.
In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. [Map.Load], [Map.LoadAndDelete], [Map.LoadOrStore], [Map.Swap], [Map.CompareAndSwap], and [Map.CompareAndDelete] are read operations; [Map.Delete], [Map.LoadAndDelete], [Map.Store], and [Map.Swap] are write operations; [Map.LoadOrStore] is a write operation when it returns loaded set to false; [Map.CompareAndSwap] is a write operation when it returns swapped set to true; and [Map.CompareAndDelete] is a write operation when it returns deleted set to true.
type Mutex ¶
A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.
A Mutex must not be copied after first use.
In the terminology of the Go memory model, the n'th call to [Mutex.Unlock] “synchronizes before” the m'th call to [Mutex.Lock] for any n < m. A successful call to [Mutex.TryLock] is equivalent to a call to Lock. A failed call to TryLock does not establish any “synchronizes before” relation at all.
type Once ¶
Once is an object that will perform exactly one action.
A Once must not be copied after first use.
In the terminology of the Go memory model, the return from f “synchronizes before” the return from any call of once.Do(f).
type Option ¶
type Option func(opts) (opts, error)
Option is an option for constructors in this package.
func WithBuffer ¶
WithBuffer sets the buffer for the pool. This is a channel of available values that are not in the pool. If this is set to 10 a channel of capacity of 10 will be created. The Pool will always use the buffer before creating new values. It will always attempt to put values back into the buffer before putting them in the pool. If not set, the pool will only use the sync.Pool.
func WithMeterOptions ¶
func WithMeterOptions(meterOpts ...metric.MeterOption) Option
WithMeterOptions sets the options for a meter.
func WithMeterPrefixLevel ¶
WithMeterPrefixLevel sets the prefix for a meter. This is "[package path]/[package name]" of the caller of NewPool. However, occassionally you may want this to be a level higher in the call stack. 0 will be the same as the default(the caller of NewPool), 1 would be the caller of the function that called NewPool, etc.
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool is an advanced generics based sync.Pool. The generics make for less verbose code and prevent accidental assignments of the wrong type which can cause a panic. This is NOT a drop in replacement for sync.Pool as we need to provide methods a context object.
In addition it can provide:
- provides OTEL tracing for the pool. - provides OTEL metrics for the pool if created with NewPool(). - If the type T implements the Resetter interface, the Reset() method will be called on the value before it is returned to the pool.
If you have a type implementing Resetter, use the ./reset package to validate your reset in tests.
func NewPool ¶
NewPool creates a new Pool for use. This returns a non-pointer value. If passing Pool, you must use a reference to it like a normal sync.Pool (aka use &pool not pool). "name" is used to create a new meter with the name:
"[package path]/[package name]:sync.Pool([type stored])/[name]".
If you are providing a type that implementing Resetter, use the ./reset package to validate your reset in tests.
type RWMutex ¶
A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.
A RWMutex must not be copied after first use.
If any goroutine calls [RWMutex.Lock] while the lock is already held by one or more readers, concurrent calls to [RWMutex.RLock] will block until the writer has acquired (and released) the lock, to ensure that the lock eventually becomes available to the writer. Note that this prohibits recursive read-locking.
In the terminology of the Go memory model, the n'th call to [RWMutex.Unlock] “synchronizes before” the m'th call to Lock for any n < m, just as for Mutex. For any call to RLock, there exists an n such that the n'th call to Unlock “synchronizes before” that call to RLock, and the corresponding call to [RWMutex.RUnlock] “synchronizes before” the n+1'th call to Lock.
type Resetter ¶
type Resetter interface {
Reset()
}
Resetter is an interface that can be implemented by a type to allow its values to be reset. You can do validations that your resetter is doing what you want in tests using the ./reset package.
type ShardedMap ¶
type ShardedMap[K comparable, V any] struct { // N is the number of maps in the ShardedMap. If N is 0, it will be set to runtime.NumCPU(). N int // contains filtered or unexported fields }
ShardedMap is a map that is sharded by a hash of the key. This allows for multiple readers and writers to access the map without blocking as long as they are accessing different maps. This is useful for when you have a large number of keys and you want to reduce contention on the map. There is no iteration over the map because there is no good way to do this without locking all maps and worrying about deadlocks.
func (*ShardedMap[K, V]) Del ¶
func (s *ShardedMap[K, V]) Del(k K) (prev V, ok bool)
Del deletes the value for the given key. It returns the previous value and if the key existed. If ok is false, prev is the zero value for the value type.
func (*ShardedMap[K, V]) Get ¶
func (s *ShardedMap[K, V]) Get(k K) (value V, ok bool)
Get returns the value for the given key. If ok is false, the key was not found.
func (*ShardedMap[K, V]) Map ¶
func (s *ShardedMap[K, V]) Map() map[K]V
Map will take all maps and merge them into a single map. This is a blocking operation that must lock all maps.
func (*ShardedMap[K, V]) Set ¶
func (s *ShardedMap[K, V]) Set(k K, v V) (prev V, ok bool)
Set sets the value for the given key. This will return the previous value and if the key existed. If ok is false, prev is the zero value for the value type.
type WProtect ¶
WProtect provides a type that protects a value from concurrent writes and reads, but only locks for writes. This is useful for values that are not updated often but have heavy reads you do not wish to lock for. This is highly performant in comparison to a RWMutex, but it does require that you do not modify the value retrieved and any value you store must be a modified copy, not containing any references to the original value. V is the value type that is stored, C is the same value as a pointer but as a pointer that enforces that the value has a Copy() method. The result of the Copy() method is what is stored when Set() is called.
func (*WProtect[V, C]) Get ¶
func (m *WProtect[V, C]) Get() *V
Get returns the value. The value must not be mutated in any way. Thread-safe.
func (*WProtect[V, C]) GetModifySet ¶
func (m *WProtect[V, C]) GetModifySet(modifier func(v *V))
GetModifySet is a function that allows you to modify the value in a thread-safe manner. The modifier function will receive a copy of the value currently stored by using the type's Copy() method.
type WorkerPool ¶
type WorkerPool interface { // Submit submits a function to the pool for execution. The context is for the // pool use, not the function itself. Submit(ctx context.Context, f func()) error }
WorkerPool is a interface that represents a pool of workers that can be used to submit work to.
Directories
¶
Path | Synopsis |
---|---|
benchmarks
|
|
Package reset provides a validator to validate that your Reset() method on your type works as intended.
|
Package reset provides a validator to validate that your Reset() method on your type works as intended. |