Documentation
¶
Index ¶
- Variables
- func Lock(fn func()) bool
- func SafeClose(closer io.Closer) (justClosed bool, err error)
- func SafeCloseChan[T any](ch chan T) (justClosed bool)
- func SafeSendChan[T any](ch chan T, value T) (ok bool)
- type Broadcaster
- type Exchanger
- type Listener
- type Map
- func (m *Map[K, V]) CompareAndDelete(key K, old V) (deleted bool)
- func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool
- func (m *Map[K, V]) Delete(key K)
- func (m *Map[K, V]) Load(key K) (value V, ok bool)
- func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)
- func (m *Map[K, V]) Range(f func(key K, value V) bool)
- func (m *Map[K, V]) Store(key K, value V)
- func (m *Map[K, V]) Swap(key K, value V) (previous V, loaded bool)
- type Notifier
- type Phaser
- func (p *Phaser) Arrive() int32
- func (p *Phaser) ArriveAndLeave() int32
- func (p *Phaser) ArriveAndWait() int32
- func (p *Phaser) Arrived() int32
- func (p *Phaser) BulkJoin(parties int32) int32
- func (p *Phaser) ForceTermination()
- func (p *Phaser) IsTerminated() bool
- func (p *Phaser) Join() int32
- func (p *Phaser) Leave() int32
- func (p *Phaser) Parties() int32
- func (p *Phaser) Phase() int32
- func (p *Phaser) Wait(phase int) int32
- type Pool
- type Shard
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosedChannel = errors.New("send after close")
ErrClosedChannel is returned when a send is attempted on a closed channel.
Functions ¶
func SafeClose ¶ added in v0.3.0
SafeClose closes the closer safely. The closer can be a file, a network connection, etc.
func SafeCloseChan ¶ added in v0.3.0
SafeCloseChan closes the channel ch safely.
func SafeSendChan ¶ added in v0.3.0
SafeSendChan sends value to the channel ch safely even if ch has been closed.
Types ¶
type Broadcaster ¶ added in v0.6.0
type Broadcaster[T any] struct { // contains filtered or unexported fields }
func NewBroadcaster ¶ added in v0.6.0
func NewBroadcaster[T any]() *Broadcaster[T]
Broadcast sends a signal with a value to all goroutines waiting on the broadcaster.
func (*Broadcaster[T]) Broadcast ¶ added in v0.6.0
func (b *Broadcaster[T]) Broadcast(v T)
Broadcast broadcasts a signal to all waiting function and unblocks them.
func (*Broadcaster[T]) Go ¶ added in v0.6.0
func (b *Broadcaster[T]) Go(fn func(v T))
Go waits until something is broadcasted, and runs the given function in a new goroutine with the value that was broadcasted.
func (*Broadcaster[T]) Reset ¶ added in v0.6.0
func (b *Broadcaster[T]) Reset()
type Exchanger ¶ added in v0.2.2
type Exchanger[T any] struct { // contains filtered or unexported fields }
Exchanger is a synchronization primitive that allows two goroutines to exchange values. It is similar to a rendezvous point or barrier at which two goroutines swap values and proceed. It consists of two channels and each goroutine owns one channel. Each goroutine calls Exchange with the value to give to the other goroutine and receives the value from the other goroutine. It is a rendezvous because both goroutines wait for the other before exchanging values. It is a barrier because both goroutines block until both have called Exchange.
Example ¶
package main import ( "bytes" "fmt" "sync" syncx "github.com/smallnest/exp/sync" ) func main() { buf1 := bytes.NewBuffer(make([]byte, 1024)) buf2 := bytes.NewBuffer(make([]byte, 1024)) exchanger := syncx.NewExchanger[*bytes.Buffer]() var wg sync.WaitGroup wg.Add(2) expect := 0 go func() { defer wg.Done() buf := buf1 for i := 0; i < 10; i++ { for j := 0; j < 1024; j++ { buf.WriteByte(byte(j / 256)) expect += j / 256 } buf = exchanger.Exchange(buf) } }() var got int go func() { defer wg.Done() buf := buf2 for i := 0; i < 10; i++ { buf = exchanger.Exchange(buf) for _, b := range buf.Bytes() { got += int(b) } buf.Reset() } }() wg.Wait() fmt.Println(got) fmt.Println(expect == got) }
Output: 15360 true
func NewExchanger ¶ added in v0.2.2
NewExchanger creates a new exchanger.
func (*Exchanger[T]) Exchange ¶ added in v0.2.2
func (e *Exchanger[T]) Exchange(value T) T
Exchange exchanges value between two goroutines. It returns the value received from the other goroutine.
It panics if called from neither left nor right goroutine.
If the other goroutine has not called Exchange yet, it blocks.
type Listener ¶
type Listener[T any] struct { // contains filtered or unexported fields }
Listener is a handle to a horn listener.
type Map ¶
type Map[K comparable, V any] struct { // contains filtered or unexported fields }
Map is like a Go map[interface{}]interface{} but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
The zero Map is empty and ready for use. A Map must not be copied after first use.
In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. Load, LoadAndDelete, LoadOrStore, Swap, CompareAndSwap, and CompareAndDelete are read operations; Delete, LoadAndDelete, Store, and Swap are write operations; LoadOrStore is a write operation when it returns loaded set to false; CompareAndSwap is a write operation when it returns swapped set to true; and CompareAndDelete is a write operation when it returns deleted set to true.
func (*Map[K, V]) CompareAndDelete ¶
CompareAndDelete deletes the entry for key if its value is equal to old. The old value must be of a comparable type.
If there is no current value for key in the map, CompareAndDelete returns false (even if the old value is the nil interface value).
func (*Map[K, V]) CompareAndSwap ¶
CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. The old value must be of a comparable type.
func (*Map[K, V]) Load ¶
Load returns the value stored in the map for a key, or nil if no value is present. The ok result indicates whether value was found in the map.
func (*Map[K, V]) LoadAndDelete ¶
LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
func (*Map[K, V]) LoadOrStore ¶
LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.
func (*Map[K, V]) Range ¶
Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.
Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently (including by f), Range may reflect any mapping for that key from any point during the Range call. Range does not block other methods on the receiver; even f itself may call any method on m.
Range may be O(N) with the number of elements in the map even if f returns false after a constant number of calls.
type Notifier ¶ added in v0.1.3
type Notifier[T any] struct { // contains filtered or unexported fields }
func NewNotifier ¶ added in v0.1.3
func (*Notifier[T]) Close ¶ added in v0.1.3
func (h *Notifier[T]) Close()
Close will close the horn and all listeners will be closed. Any subsequent calls to Send will return an error.
func (*Notifier[T]) Listen ¶ added in v0.1.3
Listen will return a new listener that can be used to receive values from the horn. The listener will be closed when the horn is closed.
func (*Notifier[T]) Send ¶ added in v0.1.3
Send will send the value to all listeners. If a listener is not ready to receive the value, it will be blocked. If the horn is closed, an error will be returned.
func (*Notifier[T]) SendNonblocking ¶ added in v0.1.3
SendNonblocking will send the value to all listeners. If a listener is not ready to receive the value, it will be skipped. If the horn is closed, an error will be returned.
type Phaser ¶
type Phaser struct {
// contains filtered or unexported fields
}
Phaser is a reusable synchronization barrier, similar in functionality to java Phaser.
func NewPhaserWithAction ¶ added in v0.1.3
func (*Phaser) ArriveAndLeave ¶ added in v0.1.2
ArriveAndLeave arrives at this phaser and leaves from it without waiting for others to arrive. Just like java.util.concurrent.Phaser's arriveAndDeregister() method.
func (*Phaser) ArriveAndWait ¶ added in v0.1.2
ArriveAndWait arrives at this phaser and waits others. Just like java.util.concurrent.Phaser's arriveAndAwaitAdvance() method.
func (*Phaser) BulkJoin ¶ added in v0.1.2
BulkJoin adds a number of new parties to this phaser. Just like java.util.concurrent.Phaser's bulkRegister(int parties) method.
func (*Phaser) ForceTermination ¶
func (p *Phaser) ForceTermination()
ForceTermination forces this phaser to enter termination state.
func (*Phaser) IsTerminated ¶
IsTerminated returns true if this phaser has been terminated.
func (*Phaser) Join ¶ added in v0.1.2
Join adds a new party to this phaser. Just like java.util.concurrent.Phaser's register() method.
func (*Phaser) Leave ¶ added in v0.1.2
Leave leaves from this phaser without waiting for others to arrive. Just like java.util.concurrent.Phaser's deregister() method.
func (*Phaser) Parties ¶ added in v0.1.2
Parties returns the number of parties joined in this phaser.
type Pool ¶ added in v0.4.0
type Pool[T any] struct { // contains filtered or unexported fields }
A Pool is a generic wrapper around a sync.Pool.
func New ¶ added in v0.4.0
New creates a new Pool with the provided new function.
The equivalent sync.Pool construct is "sync.Pool{New: fn}"
type Shard ¶ added in v0.2.0
type Shard[T any] struct { // contains filtered or unexported fields }
Shard is a container of values of the same type have n data of type T. Each P has a shard of n data.
func NewShard ¶ added in v0.2.0
NewShard creates a new Shard and initializes it with runtime.GOMAXPROCS.