syncs

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: MIT Imports: 4 Imported by: 15

README

Syncs - additional synchronization primitives

Build Status Go Report Card Coverage Status

The syncs package offers extra synchronization primitives, such as Semaphore, SizedGroup, and ErrSizedGroup, to help manage concurrency in Go programs. With syncs package, you can efficiently manage concurrency in your Go programs using additional synchronization primitives. Use them according to your specific use-case requirements to control and limit concurrent goroutines while handling errors and early termination effectively.

Install and update

go get -u github.com/go-pkgz/syncs

Details

Semaphore

Semaphore implements the sync.Locker interface with an additional TryLock function and a specified capacity. It is thread-safe. The Lock function increases the count, while Unlock decreases it. When the count is 0, Unlock will block, and Lock will block until the count is greater than 0. The TryLock function will return false if locking failed (i.e. semaphore is locked) and true otherwise.

	sema := syncs.NewSemaphore(10) // make semaphore with 10 initial capacity
	for i :=0; i<10; i++ {
		sema.Lock() // all 10 locks will pass, i.w. won't lock
	}
	sema.Lock() // this is 11 - will lock for real

	// in some other place/goroutine
	sema.Unlock() // decrease semaphore counter
	ok := sema.TryLock() // try to lock, will return false if semaphore is locked 
SizedGroup

SizedGroup combines Semaphore and WaitGroup to provide a wait group that allows a limited number of goroutines to run in parallel.

By default, locking happens inside the goroutine. This means every call will be non-blocking, but some goroutines may wait if the semaphore is locked. Technically, it doesn't limit the number of goroutines but rather the number of running (active) goroutines.

To block goroutines from starting, use the Preemptive option. Important: With Preemptive, the Go call can block. If the maximum size is reached, the call will wait until the number of running goroutines drops below the maximum. This not only limits the number of running goroutines but also the number of waiting goroutines.

	swg := syncs.NewSizedGroup(5) // wait group with max size=5
	for i :=0; i<10; i++ {
		swg.Go(func(ctx context.Context){
			doThings(ctx) // only 5 of these will run in parallel
	    })
	}
	swg.Wait()

Another option is Discard, which will skip (won't start) goroutines if the semaphore is locked. In other words, if a defined number of goroutines are already running, the call will be discarded. Discard is useful when you don't care about the results of extra goroutines; i.e., you just want to run some tasks in parallel but can allow some number of them to be ignored. This flag sets Preemptive as well, because otherwise, it doesn't make sense.

	swg := syncs.NewSizedGroup(5, Discard) // wait group with max size=5 and discarding extra goroutines
	for i :=0; i<10; i++ {
		swg.Go(func(ctx context.Context){
			doThings(ctx) // only 5 of these will run in parallel and 5 other can be discarded
		})
	}
	swg.Wait()
ErrSizedGroup

ErrSizedGroup is a SizedGroup with error control. It works the same as errgrp.Group, i.e., it returns the first error. It can work as a regular errgrp.Group or with early termination. It is thread-safe.

ErrSizedGroup supports both in-goroutine-wait as well as outside of goroutine wait with Preemptive and Discard options (see above). Other options include TermOnErr, which skips (won't start) all other goroutines if any error is returned, and Context for early termination/timeouts.

	ewg := syncs.NewErrSizedGroup(5, syncs.Preemptive) // error wait group with max size=5, don't try to start more if any error happened
	for i :=0; i<10; i++ {
		ewg.Go(func(ctx context.Context) error { // Go here could be blocked if trying to run >5 at the same time 
			err := doThings(ctx)     // only 5 of these will run in parallel
			return err
		})
	}
	err := ewg.Wait()

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Discard added in v1.3.0

func Discard(o *options)

Discard will discard new goroutines if semaphore is full, i.e. no more goroutines allowed

func Preemptive

func Preemptive(o *options)

Preemptive sets locking mode preventing spawning waiting goroutine. May cause Go call to block!

func TermOnErr

func TermOnErr(o *options)

TermOnErr prevents new goroutines to start after first error

Types

type ErrSizedGroup

type ErrSizedGroup struct {
	// contains filtered or unexported fields
}

ErrSizedGroup is a SizedGroup with error control. Works the same as errgrp.Group, i.e. returns first error. Can work as regular errgrp.Group or with early termination. Thread safe. ErrSizedGroup interface enforces constructor usage and doesn't allow direct creation of errSizedGroup

Example (Go)

illustrates the use of a SizedGroup for concurrent, limited execution of goroutines.

// create sized waiting group allowing maximum 10 goroutines
grp := NewErrSizedGroup(10)

var c uint32
for i := 0; i < 1000; i++ {
	// Go call is non-blocking, like regular go statement
	grp.Go(func() error {
		// do some work in 10 goroutines in parallel
		atomic.AddUint32(&c, 1)
		time.Sleep(10 * time.Millisecond)
		return nil
	})
}
// Note: grp.Go acts like go command - never blocks. This code will be executed right away
log.Print("all 1000 jobs submitted")

// wait for completion
if err := grp.Wait(); err != nil {
	panic(err)
}
Output:

func NewErrSizedGroup

func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup

NewErrSizedGroup makes wait group with limited size alive goroutines. By default, all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options. TermOnErr will skip (won't start) all other goroutines if any error returned.

func (*ErrSizedGroup) Go

func (g *ErrSizedGroup) Go(f func() error)

Go calls the given function in a new goroutine. The first call to return a non-nil error cancels the group if termOnError; its error will be returned by Wait. If no termOnError all errors will be collected in multierror.

func (*ErrSizedGroup) Wait

func (g *ErrSizedGroup) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns all errors (if any) wrapped with multierror from them.

type GroupOption

type GroupOption func(o *options)

GroupOption functional option type

func Context

func Context(ctx context.Context) GroupOption

Context passes ctx to group, goroutines will be canceled if ctx is canceled

type Locker added in v1.3.0

type Locker interface {
	sync.Locker
	TryLock() bool
}

Locker is a superset of sync.Locker interface with TryLock method.

func NewSemaphore

func NewSemaphore(capacity int) Locker

NewSemaphore makes Semaphore with given capacity

type MultiError added in v1.3.2

type MultiError struct {
	// contains filtered or unexported fields
}

MultiError is a thread safe container for multi-error type that implements error interface

func (*MultiError) Error added in v1.3.2

func (m *MultiError) Error() string

Error returns multi-error string

func (*MultiError) ErrorOrNil added in v1.3.2

func (m *MultiError) ErrorOrNil() error

ErrorOrNil returns nil if no errors or multierror if errors occurred

func (*MultiError) Errors added in v1.3.2

func (m *MultiError) Errors() []error

type SizedGroup

type SizedGroup struct {
	// contains filtered or unexported fields
}

SizedGroup has the same role as WaitingGroup but adds a limit of the amount of goroutines started concurrently. Uses similar Go() scheduling as errgrp.Group, thread safe. SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup

Example (Go)

illustrates the use of a SizedGroup for concurrent, limited execution of goroutines.

grp := NewSizedGroup(10) // create sized waiting group allowing maximum 10 goroutines

var c uint32
for i := 0; i < 1000; i++ {
	grp.Go(func(ctx context.Context) { // Go call is non-blocking, like regular go statement
		// do some work in 10 goroutines in parallel
		atomic.AddUint32(&c, 1)
		time.Sleep(10 * time.Millisecond)
	})
}
// Note: grp.Go acts like go command - never blocks. This code will be executed right away
log.Print("all 1000 jobs submitted")

grp.Wait() // wait for completion
Output:

func NewSizedGroup

func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup

NewSizedGroup makes wait group with limited size alive goroutines

func (*SizedGroup) Go

func (g *SizedGroup) Go(fn func(ctx context.Context))

Go calls the given function in a new goroutine. Every call will be unblocked, but some goroutines may wait if semaphore locked.

func (*SizedGroup) Wait

func (g *SizedGroup) Wait()

Wait blocks until the SizedGroup counter is zero. See sync.WaitGroup documentation for more information.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL