Documentation ¶
Overview ¶
Package sync2 provides a set of functions and types for:
- Having context aware functionalities which aren't present in the standard library.
- For offloading memory through the file system.
- To control execution of tasks which can run repetitively, concurrently or asynchronously.
Index ¶
- func Copy(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error)
- func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error)
- func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error)
- func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
- func NewTeeInmemory(readers int, allocMemory int64) ([]PipeReader, PipeWriter, error)
- func Sleep(ctx context.Context, duration time.Duration) bool
- type Cycle
- func (cycle *Cycle) ChangeInterval(interval time.Duration)
- func (cycle *Cycle) Close()
- func (cycle *Cycle) Pause()
- func (cycle *Cycle) Restart()
- func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error
- func (cycle *Cycle) SetInterval(interval time.Duration)
- func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
- func (cycle *Cycle) Stop()
- func (cycle *Cycle) Trigger()
- func (cycle *Cycle) TriggerWait()
- type Fence
- type Limiter
- type MultiPipe
- type PipeReader
- type PipeWriter
- type ReadAtWriteAtCloser
- type Semaphore
- type Throttle
- func (throttle *Throttle) Consume(amount int64) error
- func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error)
- func (throttle *Throttle) Err() error
- func (throttle *Throttle) Fail(err error)
- func (throttle *Throttle) Produce(amount int64) error
- func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error
- func (throttle *Throttle) WaitUntilAbove(limit int64) error
- func (throttle *Throttle) WaitUntilBelow(limit int64) error
- type WorkGroup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewPipeFile ¶
func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error)
NewPipeFile returns a pipe that uses file-system to offload memory
func NewPipeMemory ¶
func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error)
NewPipeMemory returns a pipe that uses an in-memory buffer
func NewTeeFile ¶
func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
NewTeeFile returns a tee that uses file-system to offload memory
func NewTeeInmemory ¶ added in v0.12.0
func NewTeeInmemory(readers int, allocMemory int64) ([]PipeReader, PipeWriter, error)
NewTeeInmemory returns a tee that uses inmemory
Types ¶
type Cycle ¶
type Cycle struct {
// contains filtered or unexported fields
}
Cycle implements a controllable recurring event.
Cycle control methods PANICS after Close has been called and don't have any effect after Stop has been called.
Start or Run (only one of them, not both) must be only called once.
func (*Cycle) ChangeInterval ¶
ChangeInterval allows to change the ticker interval after it has started.
func (*Cycle) Close ¶
func (cycle *Cycle) Close()
Close closes all resources associated with it.
It MUST NOT be called concurrently.
func (*Cycle) Run ¶
Run runs the specified in an interval.
Every interval `fn` is started. When `fn` is not fast enough, it may skip some of those executions.
Run PANICS if it's called after Stop has been called.
func (*Cycle) SetInterval ¶
SetInterval allows to change the interval before starting.
func (*Cycle) Start ¶
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
Start runs the specified function with an errgroup.
func (*Cycle) Trigger ¶
func (cycle *Cycle) Trigger()
Trigger ensures that the loop is done at least once. If it's currently running it waits for the previous to complete and then runs.
func (*Cycle) TriggerWait ¶
func (cycle *Cycle) TriggerWait()
TriggerWait ensures that the loop is done at least once and waits for completion. If it's currently running it waits for the previous to complete and then runs.
type Fence ¶
type Fence struct {
// contains filtered or unexported fields
}
Fence allows to wait for something to happen.
func (*Fence) Done ¶ added in v0.25.0
func (fence *Fence) Done() chan struct{}
Done returns channel that will be closed when the fence is released.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements concurrent goroutine limiting
func NewLimiter ¶
NewLimiter creates a new limiter with limit set to n
type MultiPipe ¶
type MultiPipe struct {
// contains filtered or unexported fields
}
MultiPipe is a multipipe backed by a single file
func NewMultiPipeFile ¶
NewMultiPipeFile returns a new MultiPipe that is created in tempdir if tempdir == "" the fill will be created it into os.TempDir
func NewMultiPipeMemory ¶
NewMultiPipeMemory returns a new MultiPipe that is using a memory buffer
func (*MultiPipe) Pipe ¶
func (multipipe *MultiPipe) Pipe(index int) (PipeReader, PipeWriter)
Pipe returns the two ends of a block stream pipe
type PipeReader ¶
type PipeReader interface { io.ReadCloser CloseWithError(reason error) error }
PipeReader allows closing the reader with an error
type PipeWriter ¶
type PipeWriter interface { io.WriteCloser CloseWithError(reason error) error }
PipeWriter allows closing the writer with an error
type ReadAtWriteAtCloser ¶
ReadAtWriteAtCloser implements all io.ReaderAt, io.WriterAt and io.Closer
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore implements a closable semaphore
func NewSemaphore ¶
NewSemaphore creates a semaphore with the specified size.
func (*Semaphore) Close ¶
func (sema *Semaphore) Close()
Close closes the semaphore from further use.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle implements two-sided throttling, between a consumer and producer
Example ¶
package main import ( "fmt" "io" "math/rand" "sync" "time" "storj.io/storj/internal/sync2" ) func main() { throttle := sync2.NewThrottle() var wg sync.WaitGroup // consumer go func() { defer wg.Done() totalConsumed := int64(0) for { available, err := throttle.ConsumeOrWait(8) if err != nil { return } fmt.Println("- consuming ", available, " total=", totalConsumed) totalConsumed += available // do work for available amount time.Sleep(time.Duration(available) * time.Millisecond) } }() // producer go func() { defer wg.Done() step := int64(8) for total := int64(64); total >= 0; total -= step { err := throttle.ProduceAndWaitUntilBelow(step, step*3) if err != nil { return } fmt.Println("+ producing", step, " left=", total) time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond) } throttle.Fail(io.EOF) }() wg.Wait() fmt.Println("done", throttle.Err()) }
Output:
func (*Throttle) ConsumeOrWait ¶
ConsumeOrWait tries to consume at most maxAmount
func (*Throttle) ProduceAndWaitUntilBelow ¶
ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold
func (*Throttle) WaitUntilAbove ¶
WaitUntilAbove waits until availability drops below limit
func (*Throttle) WaitUntilBelow ¶
WaitUntilBelow waits until availability drops below limit
type WorkGroup ¶
type WorkGroup struct {
// contains filtered or unexported fields
}
WorkGroup implements waitable and closable group of workers
func (*WorkGroup) Close ¶
func (group *WorkGroup) Close()
Close prevents from new work being started.
func (*WorkGroup) Go ¶
Go starts func and tracks the execution. Returns false when WorkGroup has been closed.