Documentation
¶
Overview ¶
Package sync2 provides a set of functions and types for:
- Having context aware functionalities which aren't present in the standard library.
- For offloading memory through the file system.
- To control execution of tasks which can run repetitively, concurrently or asynchronously.
Index ¶
- func Concurrently(fns ...func() error) []error
- func Copy(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error)
- func Go(fns ...func()) (wait func())
- func IsManuallyTriggeredCycle(ctx context.Context) bool
- func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
- func NewTeeInmemory(readers int, blockSize int64) ([]PipeReader, PipeWriter, error)
- func Sleep(ctx context.Context, duration time.Duration) bool
- func WithTimeout(timeout time.Duration, do, onTimeout func())
- type Cooldown
- func (cooldown *Cooldown) Close()
- func (cooldown *Cooldown) Run(ctx context.Context, fn func(ctx context.Context) error) error
- func (cooldown *Cooldown) SetInterval(interval time.Duration)
- func (cooldown *Cooldown) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
- func (cooldown *Cooldown) Stop()
- func (cooldown *Cooldown) Trigger()
- type Cycle
- func (cycle *Cycle) ChangeInterval(interval time.Duration)
- func (cycle *Cycle) Close()
- func (cycle *Cycle) Disabled() bool
- func (cycle *Cycle) Pause()
- func (cycle *Cycle) Restart()
- func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error
- func (cycle *Cycle) SetDelayStart()
- func (cycle *Cycle) SetInterval(interval time.Duration)
- func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
- func (cycle *Cycle) Stop()
- func (cycle *Cycle) Trigger()
- func (cycle *Cycle) TriggerWait()
- type Event
- type Fence
- type Limiter
- type PipeReader
- type PipeWriter
- type ReadCache
- type ReadCacheOf
- func (cache *ReadCacheOf[T]) Get(ctx context.Context, now time.Time) (state T, err error)
- func (cache *ReadCacheOf[T]) Init(refresh time.Duration, stale time.Duration, ...) error
- func (cache *ReadCacheOf[T]) RefreshAndGet(ctx context.Context, now time.Time) (state T, err error)
- func (cache *ReadCacheOf[T]) Run(ctx context.Context) error
- func (cache *ReadCacheOf[T]) Wait(ctx context.Context) (state T, err error)
- type ReceiverClosableChan
- type Semaphore
- type SuccessThreshold
- type Throttle
- func (throttle *Throttle) Consume(amount int64) error
- func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error)
- func (throttle *Throttle) Err() error
- func (throttle *Throttle) Fail(err error)
- func (throttle *Throttle) Produce(amount int64) error
- func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error
- func (throttle *Throttle) WaitUntilAbove(limit int64) error
- func (throttle *Throttle) WaitUntilBelow(limit int64) error
- type WorkGroup
- type Workplace
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Concurrently ¶
Concurrently runs fns concurrently and returns the non-nil errors.
func Go ¶
func Go(fns ...func()) (wait func())
Go runs fns concurrently and returns a func to wait for them to complete.
See also Concurrently and errs2.Group.
func IsManuallyTriggeredCycle ¶
IsManuallyTriggeredCycle returns whether ctx comes from a context that was started due to a `Trigger` or `TriggerWait` call in Cycle.
func NewTeeFile ¶
func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
NewTeeFile returns a tee that uses file-system to offload memory.
func NewTeeInmemory ¶
func NewTeeInmemory(readers int, blockSize int64) ([]PipeReader, PipeWriter, error)
NewTeeInmemory returns a tee that uses inmemory.
func WithTimeout ¶
WithTimeout calls `do` and when the timeout is reached and `do` has not finished, it'll call `onTimeout` concurrently.
When WithTimeout returns it's guaranteed to not call onTimeout.
Types ¶
type Cooldown ¶
type Cooldown struct {
// contains filtered or unexported fields
}
Cooldown implements an event that can only occur once in a given timeframe.
Cooldown control methods PANICS after Close has been called and don't have any effect after Stop has been called.
Start or Run (only one of them, not both) must be only called once.
func NewCooldown ¶
NewCooldown creates a new cooldown with the specified interval.
func (*Cooldown) Close ¶
func (cooldown *Cooldown) Close()
Close closes all resources associated with it.
It MUST NOT be called concurrently.
func (*Cooldown) Run ¶
Run waits for a message on the trigger channel, then runs the specified function. Afterwards it will sleep for the cooldown duration and drain the trigger channel.
Run PANICS if it's called after Stop has been called.
func (*Cooldown) SetInterval ¶
SetInterval allows to change the interval before starting.
type Cycle ¶
type Cycle struct {
// contains filtered or unexported fields
}
Cycle implements a controllable recurring event.
Cycle control methods PANICS after Close has been called and don't have any effect after Stop has been called.
Start or Run (only one of them, not both) must be only called once.
func (*Cycle) ChangeInterval ¶
ChangeInterval allows to change the ticker interval after it has started. interval=-1 (disabled loop) is not allowed to be changed (it will panic).
func (*Cycle) Close ¶
func (cycle *Cycle) Close()
Close closes all resources associated with it.
It MUST NOT be called concurrently.
func (*Cycle) Run ¶
Run runs the specified in an interval.
Every interval `fn` is started. When `fn` is not fast enough, it may skip some of those executions.
Run PANICS if it's called after Stop has been called.
func (*Cycle) SetDelayStart ¶
func (cycle *Cycle) SetDelayStart()
SetDelayStart wait interval before first trigger on start/run.
func (*Cycle) SetInterval ¶
SetInterval allows to change the interval before starting.
func (*Cycle) Start ¶
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
Start runs the specified function with an errgroup.
func (*Cycle) Trigger ¶
func (cycle *Cycle) Trigger()
Trigger ensures that the loop is done at least once. Note that it will not run if the cycle is disabled. TODO: Trigger should probably run the cycle even if the cycle interval is disabled. If it's currently running it waits for the previous to complete and then runs.
func (*Cycle) TriggerWait ¶
func (cycle *Cycle) TriggerWait()
TriggerWait ensures that the loop is done at least once and waits for completion. Note that it will not run if the cycle is disabled. TODO: Trigger should probably run the cycle even if the cycle interval is disabled. If it's currently running it waits for the previous to complete and then runs.
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event allows to signal another goroutine of something happening. This primitive is useful for signaling a single goroutine to update it's internal state.
An Event doesn't need initialization. An Event must not be copied after first use.
func (*Event) Signal ¶
func (event *Event) Signal()
Signal signals once. Signal guarantees that at least one goroutine is released from Wait or the next call to Wait. Multiple signals may be coalesced into a single wait release. In other words N signals results in 1 to N releases from [Wait] or [Signaled].
func (*Event) Signaled ¶
func (event *Event) Signaled() chan struct{}
Signaled returns channel that is notified when a signal happens. Only one goroutine should call `Wait` or `Signaled`. The implementation allows concurrent calls, however the exact behaviour is hard to reason about.
type Fence ¶
type Fence struct {
// contains filtered or unexported fields
}
Fence allows to wait for something to happen.
func (*Fence) Done ¶
func (fence *Fence) Done() chan struct{}
Done returns channel that will be closed when the fence is released.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements concurrent goroutine limiting.
After calling Wait or Close, no new goroutines are allowed to start.
func NewLimiter ¶
NewLimiter creates a new limiter with limit set to n.
func (*Limiter) Close ¶
func (limiter *Limiter) Close()
Close waits for all running goroutines to finish and disallows new goroutines to start.
type PipeReader ¶
type PipeReader interface { io.ReadCloser CloseWithError(reason error) error }
PipeReader allows closing the reader with an error.
type PipeWriter ¶
type PipeWriter interface { io.WriteCloser CloseWithError(reason error) error }
PipeWriter allows closing the writer with an error.
type ReadCache ¶
type ReadCache = ReadCacheOf[any]
ReadCache is a backwards compatible implementation.
type ReadCacheOf ¶
type ReadCacheOf[T any] struct { // contains filtered or unexported fields }
ReadCacheOf implements refreshing of state based on a refresh timeout, but also allows for stale reads up to a certain duration.
func NewReadCache ¶
func NewReadCache[T any](refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) (*ReadCacheOf[T], error)
NewReadCache returns a new ReadCacheOf.
func (*ReadCacheOf[T]) Init ¶
func (cache *ReadCacheOf[T]) Init(refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) error
Init initializes the cache for in-place initialization. This is only needed when NewReadCache was not used to initialize it.
func (*ReadCacheOf[T]) RefreshAndGet ¶
RefreshAndGet refreshes the cache and returns the latest result.
type ReceiverClosableChan ¶
type ReceiverClosableChan[T any] struct { // contains filtered or unexported fields }
ReceiverClosableChan is a channel with altered semantics from the go runtime channels. It is designed to work well in a many-producer, single-receiver environment, where the receiver consumes until it is shut down and must signal to many senders to stop sending.
func MakeReceiverClosableChan ¶
func MakeReceiverClosableChan[T any](bufferSize int) *ReceiverClosableChan[T]
MakeReceiverClosableChan makes a new buffered channel of the given buffer size. A zero buffer size is currently undefined behavior.
func (*ReceiverClosableChan[T]) BlockingSend ¶
func (c *ReceiverClosableChan[T]) BlockingSend(v T) (ok bool)
BlockingSend will send the value into the channel's buffer. If the buffer is full, BlockingSend will block. BlockingSend will fail and return false if StopReceiving is called.
func (*ReceiverClosableChan[T]) Receive ¶
func (c *ReceiverClosableChan[T]) Receive(ctx context.Context) (v T, err error)
Receive returns the next request, until and unless ctx is canceled. Receive does not stop if there are no more requests and StopReceiving has been called, as it is expected that the caller of Receive is who called StopReceiving. The error is not nil if and only if the context was canceled.
func (*ReceiverClosableChan[T]) StopReceiving ¶
func (c *ReceiverClosableChan[T]) StopReceiving() (drained []T)
StopReceiving will cause all currently blocked and future sends to return false. StopReceiving will return what remains in the queue.
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore implements a closable semaphore.
func NewSemaphore ¶
NewSemaphore creates a semaphore with the specified size.
func (*Semaphore) Close ¶
func (sema *Semaphore) Close()
Close closes the semaphore from further use.
type SuccessThreshold ¶
type SuccessThreshold struct {
// contains filtered or unexported fields
}
SuccessThreshold tracks task formed by a known amount of concurrent tasks. It notifies the caller when reached a specific successful threshold without interrupting the remaining tasks.
func NewSuccessThreshold ¶
func NewSuccessThreshold(tasks int, successThreshold float64) (*SuccessThreshold, error)
NewSuccessThreshold creates a SuccessThreshold with the tasks number and successThreshold.
It returns an error if tasks is less or equal than 1 or successThreshold is less or equal than 0 or greater or equal than 1.
func (*SuccessThreshold) Failure ¶
func (threshold *SuccessThreshold) Failure()
Failure tells the SuccessThreshold that one task was a failure.
func (*SuccessThreshold) FailureCount ¶
func (threshold *SuccessThreshold) FailureCount() int
FailureCount returns the number of failures so far.
func (*SuccessThreshold) Success ¶
func (threshold *SuccessThreshold) Success()
Success tells the SuccessThreshold that one tasks was successful.
func (*SuccessThreshold) SuccessCount ¶
func (threshold *SuccessThreshold) SuccessCount() int
SuccessCount returns the number of successes so far.
func (*SuccessThreshold) Wait ¶
func (threshold *SuccessThreshold) Wait(ctx context.Context)
Wait blocks the caller until the successThreshold is reached or all the tasks have finished.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle implements two-sided throttling, between a consumer and producer.
Example ¶
package main import ( "fmt" "io" "math/rand" "sync" "time" "storj.io/common/sync2" ) func main() { throttle := sync2.NewThrottle() var wg sync.WaitGroup // consumer go func() { defer wg.Done() totalConsumed := int64(0) for { available, err := throttle.ConsumeOrWait(8) if err != nil { return } fmt.Println("- consuming ", available, " total=", totalConsumed) totalConsumed += available // do work for available amount time.Sleep(time.Duration(available) * time.Millisecond) } }() // producer go func() { defer wg.Done() step := int64(8) for total := int64(64); total >= 0; total -= step { err := throttle.ProduceAndWaitUntilBelow(step, step*3) if err != nil { return } fmt.Println("+ producing", step, " left=", total) time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond) } throttle.Fail(io.EOF) }() wg.Wait() fmt.Println("done", throttle.Err()) }
Output:
func (*Throttle) ConsumeOrWait ¶
ConsumeOrWait tries to consume at most maxAmount.
func (*Throttle) ProduceAndWaitUntilBelow ¶
ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold.
func (*Throttle) WaitUntilAbove ¶
WaitUntilAbove waits until availability drops below limit.
func (*Throttle) WaitUntilBelow ¶
WaitUntilBelow waits until availability drops below limit.
type WorkGroup ¶
type WorkGroup struct {
// contains filtered or unexported fields
}
WorkGroup implements waitable and closable group of workers.
func (*WorkGroup) Close ¶
func (group *WorkGroup) Close()
Close prevents from new work being started.
func (*WorkGroup) Go ¶
Go starts func and tracks the execution. Returns false when WorkGroup has been closed.
type Workplace ¶
type Workplace struct {
// contains filtered or unexported fields
}
Workplace allows controlling separate jobs that must not run concurrently.
func (*Workplace) Cancel ¶
func (place *Workplace) Cancel()
Cancel cancels any active place and prevents new ones from being started. It does not wait for the active job to be finished.
func (*Workplace) Done ¶
func (place *Workplace) Done() <-chan struct{}
Done returns channel for waiting for the current job to be completed. If there's no active job, it'll return a closed channel.
func (*Workplace) Start ¶
func (place *Workplace) Start(root context.Context, jobTag interface{}, shouldCancel func(jobTag interface{}) bool, fn func(ctx context.Context)) (started bool)
Start tries to spawn a goroutine in background. It returns false, when it cannot cancel the previous work, the context is cancelled or the workplace itself has been canceled.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package mpscqueue is a multi-producer, single-consumer queue.
|
Package mpscqueue is a multi-producer, single-consumer queue. |
Package race2 exposes race detector API such that some assembly code can be manually instrumented for race detector.
|
Package race2 exposes race detector API such that some assembly code can be manually instrumented for race detector. |