sync2

package
v0.25.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2019 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package sync2 provides a set of functions and types for:

  • Having context aware functionalities which aren't present in the standard library.
  • For offloading memory through the file system.
  • To control execution of tasks which can run repetitively, concurrently or asynchronously.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Copy

func Copy(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error)

Copy implements copying with cancellation

func NewPipeFile

func NewPipeFile(tempdir string) (PipeReader, PipeWriter, error)

NewPipeFile returns a pipe that uses file-system to offload memory

func NewPipeMemory

func NewPipeMemory(pipeSize int64) (PipeReader, PipeWriter, error)

NewPipeMemory returns a pipe that uses an in-memory buffer

func NewTeeFile

func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)

NewTeeFile returns a tee that uses file-system to offload memory

func NewTeeInmemory added in v0.12.0

func NewTeeInmemory(readers int, allocMemory int64) ([]PipeReader, PipeWriter, error)

NewTeeInmemory returns a tee that uses inmemory

func Sleep

func Sleep(ctx context.Context, duration time.Duration) bool

Sleep implements sleeping with cancellation

Types

type Cycle

type Cycle struct {
	// contains filtered or unexported fields
}

Cycle implements a controllable recurring event.

Cycle control methods PANICS after Close has been called and don't have any effect after Stop has been called.

Start or Run (only one of them, not both) must be only called once.

func NewCycle

func NewCycle(interval time.Duration) *Cycle

NewCycle creates a new cycle with the specified interval.

func (*Cycle) ChangeInterval

func (cycle *Cycle) ChangeInterval(interval time.Duration)

ChangeInterval allows to change the ticker interval after it has started.

func (*Cycle) Close

func (cycle *Cycle) Close()

Close closes all resources associated with it.

It MUST NOT be called concurrently.

func (*Cycle) Pause

func (cycle *Cycle) Pause()

Pause pauses the cycle.

func (*Cycle) Restart

func (cycle *Cycle) Restart()

Restart restarts the ticker from 0.

func (*Cycle) Run

func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error

Run runs the specified in an interval.

Every interval `fn` is started. When `fn` is not fast enough, it may skip some of those executions.

Run PANICS if it's called after Stop has been called.

func (*Cycle) SetInterval

func (cycle *Cycle) SetInterval(interval time.Duration)

SetInterval allows to change the interval before starting.

func (*Cycle) Start

func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)

Start runs the specified function with an errgroup.

func (*Cycle) Stop

func (cycle *Cycle) Stop()

Stop stops the cycle permanently

func (*Cycle) Trigger

func (cycle *Cycle) Trigger()

Trigger ensures that the loop is done at least once. If it's currently running it waits for the previous to complete and then runs.

func (*Cycle) TriggerWait

func (cycle *Cycle) TriggerWait()

TriggerWait ensures that the loop is done at least once and waits for completion. If it's currently running it waits for the previous to complete and then runs.

type Fence

type Fence struct {
	// contains filtered or unexported fields
}

Fence allows to wait for something to happen.

func (*Fence) Done added in v0.25.0

func (fence *Fence) Done() chan struct{}

Done returns channel that will be closed when the fence is released.

func (*Fence) Release

func (fence *Fence) Release()

Release releases everyone from Wait

func (*Fence) Released

func (fence *Fence) Released() bool

Released returns whether the fence has been released.

func (*Fence) Wait

func (fence *Fence) Wait(ctx context.Context) bool

Wait waits for wait to be unlocked. Returns true when it was successfully released.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements concurrent goroutine limiting

func NewLimiter

func NewLimiter(n int) *Limiter

NewLimiter creates a new limiter with limit set to n

func (*Limiter) Go

func (limiter *Limiter) Go(ctx context.Context, fn func()) bool

Go tries to starts fn as a goroutine. When the limit is reached it will wait until it can run it or the context is canceled.

func (*Limiter) Wait

func (limiter *Limiter) Wait()

Wait waits for all running goroutines to finish

type MultiPipe

type MultiPipe struct {
	// contains filtered or unexported fields
}

MultiPipe is a multipipe backed by a single file

func NewMultiPipeFile

func NewMultiPipeFile(tempdir string, pipeCount, pipeSize int64) (*MultiPipe, error)

NewMultiPipeFile returns a new MultiPipe that is created in tempdir if tempdir == "" the fill will be created it into os.TempDir

func NewMultiPipeMemory

func NewMultiPipeMemory(pipeCount, pipeSize int64) (*MultiPipe, error)

NewMultiPipeMemory returns a new MultiPipe that is using a memory buffer

func (*MultiPipe) Pipe

func (multipipe *MultiPipe) Pipe(index int) (PipeReader, PipeWriter)

Pipe returns the two ends of a block stream pipe

type PipeReader

type PipeReader interface {
	io.ReadCloser
	CloseWithError(reason error) error
}

PipeReader allows closing the reader with an error

type PipeWriter

type PipeWriter interface {
	io.WriteCloser
	CloseWithError(reason error) error
}

PipeWriter allows closing the writer with an error

type ReadAtWriteAtCloser

type ReadAtWriteAtCloser interface {
	io.ReaderAt
	io.WriterAt
	io.Closer
}

ReadAtWriteAtCloser implements all io.ReaderAt, io.WriterAt and io.Closer

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore implements a closable semaphore

func NewSemaphore

func NewSemaphore(size int) *Semaphore

NewSemaphore creates a semaphore with the specified size.

func (*Semaphore) Close

func (sema *Semaphore) Close()

Close closes the semaphore from further use.

func (*Semaphore) Init

func (sema *Semaphore) Init(size int)

Init initializes semaphore to the specified size.

func (*Semaphore) Lock

func (sema *Semaphore) Lock() bool

Lock locks the semaphore.

func (*Semaphore) Unlock

func (sema *Semaphore) Unlock()

Unlock unlocks the semaphore.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle implements two-sided throttling, between a consumer and producer

Example
package main

import (
	"fmt"
	"io"
	"math/rand"
	"sync"
	"time"

	"storj.io/storj/internal/sync2"
)

func main() {
	throttle := sync2.NewThrottle()
	var wg sync.WaitGroup

	// consumer
	go func() {
		defer wg.Done()
		totalConsumed := int64(0)
		for {
			available, err := throttle.ConsumeOrWait(8)
			if err != nil {
				return
			}
			fmt.Println("- consuming ", available, " total=", totalConsumed)
			totalConsumed += available

			// do work for available amount
			time.Sleep(time.Duration(available) * time.Millisecond)
		}
	}()

	// producer
	go func() {
		defer wg.Done()

		step := int64(8)
		for total := int64(64); total >= 0; total -= step {
			err := throttle.ProduceAndWaitUntilBelow(step, step*3)
			if err != nil {
				return
			}

			fmt.Println("+ producing", step, " left=", total)
			time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond)
		}

		throttle.Fail(io.EOF)
	}()

	wg.Wait()

	fmt.Println("done", throttle.Err())
}
Output:

func NewThrottle

func NewThrottle() *Throttle

NewThrottle returns a new Throttle primitive

func (*Throttle) Consume

func (throttle *Throttle) Consume(amount int64) error

Consume subtracts amount from the throttle

func (*Throttle) ConsumeOrWait

func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error)

ConsumeOrWait tries to consume at most maxAmount

func (*Throttle) Err

func (throttle *Throttle) Err() error

Err returns the finishing error

func (*Throttle) Fail

func (throttle *Throttle) Fail(err error)

Fail stops both consumer and allocator

func (*Throttle) Produce

func (throttle *Throttle) Produce(amount int64) error

Produce adds amount to the throttle

func (*Throttle) ProduceAndWaitUntilBelow

func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error

ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold

func (*Throttle) WaitUntilAbove

func (throttle *Throttle) WaitUntilAbove(limit int64) error

WaitUntilAbove waits until availability drops below limit

func (*Throttle) WaitUntilBelow

func (throttle *Throttle) WaitUntilBelow(limit int64) error

WaitUntilBelow waits until availability drops below limit

type WorkGroup

type WorkGroup struct {
	// contains filtered or unexported fields
}

WorkGroup implements waitable and closable group of workers

func (*WorkGroup) Close

func (group *WorkGroup) Close()

Close prevents from new work being started.

func (*WorkGroup) Done

func (group *WorkGroup) Done()

Done finishes a pending work item

func (*WorkGroup) Go

func (group *WorkGroup) Go(fn func()) bool

Go starts func and tracks the execution. Returns false when WorkGroup has been closed.

func (*WorkGroup) Start

func (group *WorkGroup) Start() bool

Start returns true when work can be started

func (*WorkGroup) Wait

func (group *WorkGroup) Wait()

Wait waits for all workers to finish.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL