async

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2024 License: MIT Imports: 10 Imported by: 18

README


Async is a synchronization and asynchronous computation package for Go.

Overview

  • ConcurrentMap - Implements the generic async.Map interface in a thread-safe manner by delegating load/store operations to the underlying sync.Map.
  • ShardedMap - Implements the generic async.Map interface in a thread-safe manner, delegating load/store operations to one of the underlying async.SynchronizedMaps (shards), using a key hash to calculate the shard number.
  • Future - A placeholder object for a value that may not yet exist.
  • Promise - While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future.
  • Executor - A worker pool for executing asynchronous tasks, where each submission returns a Future instance representing the result of the task.
  • Task - A data type for controlling possibly lazy and asynchronous computations.
  • Once - An object similar to sync.Once having the Do method taking f func() (T, error) and returning (T, error).
  • Value - An object similar to atomic.Value, but without the consistent type constraint.
  • WaitGroupContext - A WaitGroup with the context.Context support for graceful unblocking.
  • ReentrantLock - A mutex that allows goroutines to enter into the lock on a resource more than once.
  • PriorityLock - A non-reentrant mutex that allows for the specification of lock acquisition priority.

Examples

Can be found in the examples directory/tests.

License

Licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrExecutorQueueFull = errors.New("async: executor queue is full")
	ErrExecutorShutDown  = errors.New("async: executor is shut down")
)

Functions

func GoroutineID

func GoroutineID() (uint64, error)

GoroutineID returns the current goroutine id.

Types

type ConcurrentMap added in v0.6.0

type ConcurrentMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ConcurrentMap implements the async.Map interface in a thread-safe manner by delegating load/store operations to the underlying sync.Map. A ConcurrentMap must not be copied.

The sync.Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a sync.Map may significantly reduce lock contention compared to a Go map paired with a separate sync.Mutex or sync.RWMutex.

func NewConcurrentMap added in v0.6.0

func NewConcurrentMap[K comparable, V any]() *ConcurrentMap[K, V]

NewConcurrentMap returns a new ConcurrentMap instance.

func (*ConcurrentMap[K, V]) Clear added in v0.6.0

func (cm *ConcurrentMap[K, V]) Clear()

Clear removes all of the mappings from this map.

func (*ConcurrentMap[K, V]) ComputeIfAbsent added in v0.6.0

func (cm *ConcurrentMap[K, V]) ComputeIfAbsent(key K, mappingFunction func(K) *V) *V

ComputeIfAbsent attempts to compute a value using the given mapping function and enters it into the map, if the specified key is not already associated with a value.

func (*ConcurrentMap[K, V]) ContainsKey added in v0.6.0

func (cm *ConcurrentMap[K, V]) ContainsKey(key K) bool

ContainsKey returns true if this map contains a mapping for the specified key.

func (*ConcurrentMap[K, V]) Get added in v0.6.0

func (cm *ConcurrentMap[K, V]) Get(key K) *V

Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.

func (*ConcurrentMap[K, V]) GetOrDefault added in v0.6.0

func (cm *ConcurrentMap[K, V]) GetOrDefault(key K, defaultValue *V) *V

GetOrDefault returns the value to which the specified key is mapped, or defaultValue if this map contains no mapping for the key.

func (*ConcurrentMap[K, V]) IsEmpty added in v0.6.0

func (cm *ConcurrentMap[K, V]) IsEmpty() bool

IsEmpty returns true if this map contains no key-value mappings.

func (*ConcurrentMap[K, V]) KeySet added in v0.6.0

func (cm *ConcurrentMap[K, V]) KeySet() []K

KeySet returns a slice of the keys contained in this map.

func (*ConcurrentMap[K, V]) Put added in v0.6.0

func (cm *ConcurrentMap[K, V]) Put(key K, value *V)

Put associates the specified value with the specified key in this map.

func (*ConcurrentMap[K, V]) Remove added in v0.6.0

func (cm *ConcurrentMap[K, V]) Remove(key K) *V

Remove removes the mapping for a key from this map if it is present, returning the previous value or nil if none.

func (*ConcurrentMap[K, V]) Size added in v0.6.0

func (cm *ConcurrentMap[K, V]) Size() int

Size returns the number of key-value mappings in this map.

func (*ConcurrentMap[K, V]) Values added in v0.6.0

func (cm *ConcurrentMap[K, V]) Values() []*V

Values returns a slice of the values contained in this map.

type Executor added in v0.9.0

type Executor[T any] struct {
	// contains filtered or unexported fields
}

Executor implements the ExecutorService interface.

func NewExecutor added in v0.9.0

func NewExecutor[T any](ctx context.Context, config *ExecutorConfig) *Executor[T]

NewExecutor returns a new Executor.

func (*Executor[T]) Shutdown added in v0.9.0

func (e *Executor[T]) Shutdown() error

Shutdown shuts down the executor. Once the executor service is shut down, no new tasks can be submitted and any pending tasks will be cancelled.

func (*Executor[T]) Status added in v0.9.0

func (e *Executor[T]) Status() ExecutorStatus

Status returns the current status of the executor.

func (*Executor[T]) Submit added in v0.9.0

func (e *Executor[T]) Submit(f func(context.Context) (T, error)) (Future[T], error)

Submit submits a function to the executor. The function will be executed asynchronously and the result will be available via the returned future.

type ExecutorConfig added in v0.9.0

type ExecutorConfig struct {
	WorkerPoolSize int
	QueueSize      int
}

ExecutorConfig represents the Executor configuration.

func NewExecutorConfig added in v0.9.0

func NewExecutorConfig(workerPoolSize, queueSize int) *ExecutorConfig

NewExecutorConfig returns a new ExecutorConfig. workerPoolSize must be positive and queueSize non-negative.

type ExecutorService added in v0.9.0

type ExecutorService[T any] interface {
	// Submit submits a function to the executor service.
	// The function will be executed asynchronously and the result will be
	// available via the returned future.
	Submit(func(context.Context) (T, error)) (Future[T], error)

	// Shutdown shuts down the executor service.
	// Once the executor service is shut down, no new tasks can be submitted
	// and any pending tasks will be cancelled.
	Shutdown() error

	// Status returns the current status of the executor service.
	Status() ExecutorStatus
}

ExecutorService is an interface that defines a task executor.

type ExecutorStatus added in v0.9.0

type ExecutorStatus uint32

ExecutorStatus represents the status of an ExecutorService.

const (
	ExecutorStatusRunning ExecutorStatus = iota
	ExecutorStatusTerminating
	ExecutorStatusShutDown
)

type Future

type Future[T any] interface {

	// Map creates a new Future by applying a function to the successful
	// result of this Future.
	Map(func(T) (T, error)) Future[T]

	// FlatMap creates a new Future by applying a function to the successful
	// result of this Future.
	FlatMap(func(T) (Future[T], error)) Future[T]

	// Join blocks until the Future is completed and returns either a result
	// or an error.
	Join() (T, error)

	// Get blocks until the Future is completed or context is canceled and
	// returns either a result or an error.
	Get(context.Context) (T, error)

	// Recover handles any error that this Future might contain using a
	// resolver function.
	Recover(func() (T, error)) Future[T]

	// RecoverWith handles any error that this Future might contain using
	// another Future.
	RecoverWith(Future[T]) Future[T]
	// contains filtered or unexported methods
}

Future represents a value which may or may not currently be available, but will be available at some point, or an error if that value could not be made available.

func FutureFirstCompletedOf

func FutureFirstCompletedOf[T any](futures ...Future[T]) Future[T]

FutureFirstCompletedOf asynchronously returns a new Future to the result of the first Future in the list that is completed. This means no matter if it is completed as a success or as a failure.

func FutureSeq

func FutureSeq[T any](futures []Future[T]) Future[[]any]

FutureSeq reduces many Futures into a single Future. The resulting array may contain both T values and errors.

func FutureTimer

func FutureTimer[T any](d time.Duration) Future[T]

FutureTimer returns Future that will have been resolved after given duration; useful for FutureFirstCompletedOf for timeout purposes.

type Map added in v0.6.0

type Map[K comparable, V any] interface {

	// Clear removes all of the mappings from this map.
	Clear()

	// ComputeIfAbsent attempts to compute a value using the given mapping
	// function and enters it into the map, if the specified key is not
	// already associated with a value.
	ComputeIfAbsent(key K, mappingFunction func(K) *V) *V

	// ContainsKey returns true if this map contains a mapping for the
	// specified key.
	ContainsKey(key K) bool

	// Get returns the value to which the specified key is mapped, or nil if
	// this map contains no mapping for the key.
	Get(key K) *V

	// GetOrDefault returns the value to which the specified key is mapped, or
	// defaultValue if this map contains no mapping for the key.
	GetOrDefault(key K, defaultValue *V) *V

	// IsEmpty returns true if this map contains no key-value mappings.
	IsEmpty() bool

	// KeySet returns a slice of the keys contained in this map.
	KeySet() []K

	// Put associates the specified value with the specified key in this map.
	Put(key K, value *V)

	// Remove removes the mapping for a key from this map if it is present,
	// returning the previous value or nil if none.
	Remove(key K) *V

	// Size returns the number of key-value mappings in this map.
	Size() int

	// Values returns a slice of the values contained in this map.
	Values() []*V
}

A Map is an object that maps keys to values.

type Once added in v0.6.0

type Once[T any] struct {
	// contains filtered or unexported fields
}

Once is an object that will execute the given function exactly once. Any subsequent call will return the previous result.

func (*Once[T]) Do added in v0.6.0

func (o *Once[T]) Do(f func() (T, error)) (T, error)

Do calls the function f if and only if Do is being called for the first time for this instance of Once. In other words, given

var once Once[T]

if once.Do(f) is called multiple times, only the first call will invoke f, even if f has a different value in each invocation. A new instance of Once is required for each function to execute.

The return values for each subsequent call will be the result of the first execution.

If f panics, Do considers it to have returned; future calls of Do return without calling f.

type PriorityLock added in v0.9.0

type PriorityLock struct {
	// contains filtered or unexported fields
}

PriorityLock is a non-reentrant mutex that allows specifying a priority level when acquiring the lock. It extends the standard sync.Locker interface with an additional locking method, LockP, which takes a priority level as an argument.

The current implementation may cause starvation for lower priority lock requests.

func NewPriorityLock added in v0.9.0

func NewPriorityLock(maxPriority int) *PriorityLock

NewPriorityLock instantiates and returns a new PriorityLock, specifying the maximum priority level that can be used in the LockP method. It panics if the maximum priority level is non-positive or exceeds the hard limit.

func (*PriorityLock) Lock added in v0.9.0

func (pl *PriorityLock) Lock()

Lock will block the calling goroutine until it acquires the lock, using the highest available priority.

func (*PriorityLock) LockP added in v0.9.0

func (pl *PriorityLock) LockP(priority int)

LockP blocks the calling goroutine until it acquires the lock. Requests with higher priorities acquire the lock first. If the provided priority is outside the valid range, it will be assigned the boundary value.

func (*PriorityLock) Unlock added in v0.9.0

func (pl *PriorityLock) Unlock()

Unlock releases the previously acquired lock. It will panic if the lock is already unlocked.

type Promise

type Promise[T any] interface {

	// Success completes the underlying Future with a value.
	Success(T)

	// Failure fails the underlying Future with an error.
	Failure(error)

	// Future returns the underlying Future.
	Future() Future[T]
}

Promise represents a writable, single-assignment container, which completes a Future.

func NewPromise

func NewPromise[T any]() Promise[T]

NewPromise returns a new Promise.

type ReentrantLock

type ReentrantLock struct {
	// contains filtered or unexported fields
}

ReentrantLock allows goroutines to enter the lock more than once. Implements the sync.Locker interface.

A ReentrantLock must not be copied after first use.

func (*ReentrantLock) Lock

func (r *ReentrantLock) Lock()

Lock locks the resource. Panics if the GoroutineID call returns an error.

func (*ReentrantLock) Unlock

func (r *ReentrantLock) Unlock()

Unlock unlocks the resource. Panics on trying to unlock the unlocked lock.

type ShardedMap added in v0.8.0

type ShardedMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ShardedMap implements the async.Map interface in a thread-safe manner, delegating load/store operations to one of the underlying async.SynchronizedMaps (shards), using a key hash to calculate the shard number. A ShardedMap must not be copied.

func NewShardedMap added in v0.8.0

func NewShardedMap[K comparable, V any](shards int) *ShardedMap[K, V]

NewShardedMap returns a new ShardedMap, where shards is the number of partitions for this map. It uses the 64-bit FNV-1a hash function to calculate the shard number for a key. If the shards argument is not positive, NewShardedMap will panic.

func NewShardedMapWithHash added in v0.8.0

func NewShardedMapWithHash[K comparable, V any](shards int, hashFunc func(K) uint64) *ShardedMap[K, V]

NewShardedMapWithHash returns a new ShardedMap, where shards is the number of partitions for this map, and hashFunc is a hash function to calculate the shard number for a key. If shards is not positive or hashFunc is nil, NewShardedMapWithHash will panic.

func (*ShardedMap[K, V]) Clear added in v0.8.0

func (sm *ShardedMap[K, V]) Clear()

Clear removes all of the mappings from this map.

func (*ShardedMap[K, V]) ComputeIfAbsent added in v0.8.0

func (sm *ShardedMap[K, V]) ComputeIfAbsent(key K, mappingFunction func(K) *V) *V

ComputeIfAbsent attempts to compute a value using the given mapping function and enters it into the map, if the specified key is not already associated with a value.

func (*ShardedMap[K, V]) ContainsKey added in v0.8.0

func (sm *ShardedMap[K, V]) ContainsKey(key K) bool

ContainsKey returns true if this map contains a mapping for the specified key.

func (*ShardedMap[K, V]) Get added in v0.8.0

func (sm *ShardedMap[K, V]) Get(key K) *V

Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.

func (*ShardedMap[K, V]) GetOrDefault added in v0.8.0

func (sm *ShardedMap[K, V]) GetOrDefault(key K, defaultValue *V) *V

GetOrDefault returns the value to which the specified key is mapped, or defaultValue if this map contains no mapping for the key.

func (*ShardedMap[K, V]) IsEmpty added in v0.8.0

func (sm *ShardedMap[K, V]) IsEmpty() bool

IsEmpty returns true if this map contains no key-value mappings.

func (*ShardedMap[K, V]) KeySet added in v0.8.0

func (sm *ShardedMap[K, V]) KeySet() []K

KeySet returns a slice of the keys contained in this map.

func (*ShardedMap[K, V]) Put added in v0.8.0

func (sm *ShardedMap[K, V]) Put(key K, value *V)

Put associates the specified value with the specified key in this map.

func (*ShardedMap[K, V]) Remove added in v0.8.0

func (sm *ShardedMap[K, V]) Remove(key K) *V

Remove removes the mapping for a key from this map if it is present, returning the previous value or nil if none.

func (*ShardedMap[K, V]) Size added in v0.8.0

func (sm *ShardedMap[K, V]) Size() int

Size returns the number of key-value mappings in this map.

func (*ShardedMap[K, V]) Values added in v0.8.0

func (sm *ShardedMap[K, V]) Values() []*V

Values returns a slice of the values contained in this map.

type SynchronizedMap added in v0.8.0

type SynchronizedMap[K comparable, V any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SynchronizedMap implements the async.Map interface in a thread-safe manner, delegating load/store operations to a Go map and using a sync.RWMutex for synchronization.

func NewSynchronizedMap added in v0.8.0

func NewSynchronizedMap[K comparable, V any]() *SynchronizedMap[K, V]

NewSynchronizedMap returns a new SynchronizedMap.

func (*SynchronizedMap[K, V]) Clear added in v0.8.0

func (sync *SynchronizedMap[K, V]) Clear()

Clear removes all of the mappings from this map.

func (*SynchronizedMap[K, V]) ComputeIfAbsent added in v0.8.0

func (sync *SynchronizedMap[K, V]) ComputeIfAbsent(key K, mappingFunction func(K) *V) *V

ComputeIfAbsent attempts to compute a value using the given mapping function and enters it into the map, if the specified key is not already associated with a value.

func (*SynchronizedMap[K, V]) ContainsKey added in v0.8.0

func (sync *SynchronizedMap[K, V]) ContainsKey(key K) bool

ContainsKey returns true if this map contains a mapping for the specified key.

func (*SynchronizedMap[K, V]) Get added in v0.8.0

func (sync *SynchronizedMap[K, V]) Get(key K) *V

Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.

func (*SynchronizedMap[K, V]) GetOrDefault added in v0.8.0

func (sync *SynchronizedMap[K, V]) GetOrDefault(key K, defaultValue *V) *V

GetOrDefault returns the value to which the specified key is mapped, or defaultValue if this map contains no mapping for the key.

func (*SynchronizedMap[K, V]) IsEmpty added in v0.8.0

func (sync *SynchronizedMap[K, V]) IsEmpty() bool

IsEmpty returns true if this map contains no key-value mappings.

func (*SynchronizedMap[K, V]) KeySet added in v0.8.0

func (sync *SynchronizedMap[K, V]) KeySet() []K

KeySet returns a slice of the keys contained in this map.

func (*SynchronizedMap[K, V]) Put added in v0.8.0

func (sync *SynchronizedMap[K, V]) Put(key K, value *V)

Put associates the specified value with the specified key in this map.

func (*SynchronizedMap[K, V]) Remove added in v0.8.0

func (sync *SynchronizedMap[K, V]) Remove(key K) *V

Remove removes the mapping for a key from this map if it is present, returning the previous value or nil if none.

func (*SynchronizedMap[K, V]) Size added in v0.8.0

func (sync *SynchronizedMap[K, V]) Size() int

Size returns the number of key-value mappings in this map.

func (*SynchronizedMap[K, V]) Values added in v0.8.0

func (sync *SynchronizedMap[K, V]) Values() []*V

Values returns a slice of the values contained in this map.

type Task added in v0.4.0

type Task[T any] struct {
	// contains filtered or unexported fields
}

Task is a data type for controlling possibly lazy and asynchronous computations.

func NewTask added in v0.4.0

func NewTask[T any](taskFunc func() (T, error)) *Task[T]

NewTask returns a new Task associated with the specified function.

func (*Task[T]) Call added in v0.4.0

func (task *Task[T]) Call() Future[T]

Call starts executing the task using a goroutine. It returns a Future which can be used to retrieve the result or error of the task when it is completed.

type Value added in v0.7.0

type Value struct {
	// contains filtered or unexported fields
}

A Value provides an atomic load and store of a value of any type. The behavior is analogous to atomic.Value, except that the value is not required to be of the same specific type. Can be useful for storing different implementations of an interface.

func (*Value) CompareAndSwap added in v0.7.0

func (v *Value) CompareAndSwap(old any, new any) (swapped bool)

CompareAndSwap executes the compare-and-swap operation for the Value. The current implementation is not atomic.

func (*Value) Load added in v0.7.0

func (v *Value) Load() (val any)

Load returns the value set by the most recent Store. It returns nil if there has been no call to Store for this Value.

func (*Value) Store added in v0.7.0

func (v *Value) Store(val any)

Store sets the value of the Value v to val. Store(nil) panics.

func (*Value) Swap added in v0.7.0

func (v *Value) Swap(new any) (old any)

Swap stores new into Value and returns the previous value. It returns nil if the Value is empty. Swap(nil) panics.

type WaitGroupContext added in v0.5.0

type WaitGroupContext struct {
	// contains filtered or unexported fields
}

A WaitGroupContext waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished or the given context is done.

func NewWaitGroupContext added in v0.5.0

func NewWaitGroupContext(ctx context.Context) *WaitGroupContext

NewWaitGroupContext returns a new WaitGroupContext with Context ctx.

func (*WaitGroupContext) Add added in v0.5.0

func (wgc *WaitGroupContext) Add(delta int)

Add adds delta, which may be negative, to the WaitGroupContext counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.

func (*WaitGroupContext) Done added in v0.5.0

func (wgc *WaitGroupContext) Done()

Done decrements the WaitGroupContext counter by one.

func (*WaitGroupContext) Wait added in v0.5.0

func (wgc *WaitGroupContext) Wait()

Wait blocks until the wait group counter is zero or ctx is done.

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL