Documentation ¶
Index ¶
- func Copy(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error)
- func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error)
- func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error)
- func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
- func Sleep(ctx context.Context, duration time.Duration) bool
- type Cycle
- func (cycle *Cycle) ChangeInterval(interval time.Duration)
- func (cycle *Cycle) Close()
- func (cycle *Cycle) Pause()
- func (cycle *Cycle) Restart()
- func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error
- func (cycle *Cycle) SetInterval(interval time.Duration)
- func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
- func (cycle *Cycle) Stop()
- func (cycle *Cycle) Trigger()
- func (cycle *Cycle) TriggerWait()
- type Fence
- type Limiter
- type MultiPipe
- type PipeReader
- type PipeWriter
- type ReadAtWriteAtCloser
- type Semaphore
- type Throttle
- func (throttle *Throttle) Consume(amount int64) error
- func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error)
- func (throttle *Throttle) Err() error
- func (throttle *Throttle) Fail(err error)
- func (throttle *Throttle) Produce(amount int64) error
- func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error
- func (throttle *Throttle) WaitUntilAbove(limit int64) error
- func (throttle *Throttle) WaitUntilBelow(limit int64) error
- type WorkGroup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewPipeFile ¶
func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error)
NewPipeFile returns a pipe that uses file-system to offload memory
func NewPipeMemory ¶
func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error)
NewPipeMemory returns a pipe that uses an in-memory buffer
func NewTeeFile ¶
func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)
NewTeeFile returns a tee that uses file-system to offload memory
Types ¶
type Cycle ¶
type Cycle struct {
// contains filtered or unexported fields
}
Cycle implements a controllable recurring event.
Cycle control methods don't have any effect after the cycle has completed.
func (*Cycle) ChangeInterval ¶
ChangeInterval allows to change the ticker interval after it has started.
func (*Cycle) Run ¶
Run runs the specified in an interval.
Every interval `fn` is started. When `fn` is not fast enough, it may skip some of those executions.
func (*Cycle) SetInterval ¶
SetInterval allows to change the interval before starting.
func (*Cycle) Start ¶
func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)
Start runs the specified function with an errgroup
func (*Cycle) Trigger ¶
func (cycle *Cycle) Trigger()
Trigger ensures that the loop is done at least once. If it's currently running it waits for the previous to complete and then runs.
func (*Cycle) TriggerWait ¶
func (cycle *Cycle) TriggerWait()
TriggerWait ensures that the loop is done at least once and waits for completion. If it's currently running it waits for the previous to complete and then runs.
type Fence ¶
type Fence struct {
// contains filtered or unexported fields
}
Fence allows to wait for something to happen.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements concurrent goroutine limiting
func NewLimiter ¶
NewLimiter creates a new limiter with limit set to n
type MultiPipe ¶
type MultiPipe struct {
// contains filtered or unexported fields
}
MultiPipe is a multipipe backed by a single file
func NewMultiPipeFile ¶
NewMultiPipeFile returns a new MultiPipe that is created in tempdir if tempdir == "" the fill will be created it into os.TempDir
func NewMultiPipeMemory ¶
NewMultiPipeMemory returns a new MultiPipe that is using a memory buffer
func (*MultiPipe) Pipe ¶
func (multipipe *MultiPipe) Pipe(index int) (PipeReader, PipeWriter)
Pipe returns the two ends of a block stream pipe
type PipeReader ¶
type PipeReader interface { io.ReadCloser CloseWithError(reason error) error }
PipeReader allows closing the reader with an error
type PipeWriter ¶
type PipeWriter interface { io.WriteCloser CloseWithError(reason error) error }
PipeWriter allows closing the writer with an error
type ReadAtWriteAtCloser ¶
ReadAtWriteAtCloser implements all io.ReaderAt, io.WriterAt and io.Closer
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore implements a closable semaphore
func NewSemaphore ¶
NewSemaphore creates a semaphore with the specified size.
func (*Semaphore) Close ¶
func (sema *Semaphore) Close()
Close closes the semaphore from further use.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle implements two-sided throttling, between a consumer and producer
Example ¶
package main import ( "fmt" "io" "math/rand" "sync" "time" "storj.io/storj/internal/sync2" ) func main() { throttle := sync2.NewThrottle() var wg sync.WaitGroup // consumer go func() { defer wg.Done() totalConsumed := int64(0) for { available, err := throttle.ConsumeOrWait(8) if err != nil { return } fmt.Println("- consuming ", available, " total=", totalConsumed) totalConsumed += available // do work for available amount time.Sleep(time.Duration(available) * time.Millisecond) } }() // producer go func() { defer wg.Done() step := int64(8) for total := int64(64); total >= 0; total -= step { err := throttle.ProduceAndWaitUntilBelow(step, step*3) if err != nil { return } fmt.Println("+ producing", step, " left=", total) time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond) } throttle.Fail(io.EOF) }() wg.Wait() fmt.Println("done", throttle.Err()) }
Output:
func (*Throttle) ConsumeOrWait ¶
ConsumeOrWait tries to consume at most maxAmount
func (*Throttle) ProduceAndWaitUntilBelow ¶
ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold
func (*Throttle) WaitUntilAbove ¶
WaitUntilAbove waits until availability drops below limit
func (*Throttle) WaitUntilBelow ¶
WaitUntilBelow waits until availability drops below limit
type WorkGroup ¶
type WorkGroup struct {
// contains filtered or unexported fields
}
WorkGroup implements waitable and closable group of workers
func (*WorkGroup) Close ¶
func (group *WorkGroup) Close()
Close prevents from new work being started.
func (*WorkGroup) Go ¶
Go starts func and tracks the execution. Returns false when WorkGroup has been closed.