sema

package
v0.0.0-...-9392aba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: MIT Imports: 4 Imported by: 1

Documentation

Overview

Package sema implements semaphores and semaphore-related scheduling

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Schedule

func Schedule(worker func(uint64) error, count uint64, concurrency Concurrency) (err error)

Schedule schedules count instances of worker to be called. When count is non-positive, no calls are scheduled and immediately returns nil. Each call to worker receives a unique id, from 0 up to count (exclusive count).

Workers are approximately started in order. Any call to worker(i) is called before worker(j) for any i < j. Concurrency determines the amount of concurrency that takes place for scheduling.

There is no synchronization mechanism beyond the limits themselves. In particular for Limit != 1, the order guarantee might be broken: While the invocation of worker(i) occurs before worker(j), no such guarantee for the first statement within those invocation is true.

If an error occurs (indicated by a non-nil return from worker), Schedule waits for all ongoing worker calls to return. It may schedule further calls to worker, as determined by concurrency.Force. It then returns the non-nil error which triggered the error stop.

If no error occurs, schedule returns nil.

Example
var counter atomic.Uint64

// because we have a parallelism of 1, we run exactly in order!
_ = Schedule(func(i uint64) error {
	counter.Add(1)
	return nil
}, 1000, Concurrency{
	Limit: 0,
	Force: false,
})

fmt.Print(counter.Load())
Output:

1000
Example (Error)
err := Schedule(func(i uint64) error {
	// the first invocation produces an error and returns immediately!
	if i == 0 {
		return errors.New("first function error")
	}

	// concurrently with the first invocation, we have at most one other
	// so give the first function some time to produce an error
	time.Sleep(100 * time.Millisecond)

	// the third and fourth invocations should never be called
	// since by the time the first function finishes
	// the second one has already produced an error
	if i > 1 {
		panic("never reached")
	}
	return nil
}, 4, Concurrency{
	Limit: 2,
	Force: false,
})
fmt.Println(err)
Output:

first function error
Example (Force)
var counter atomic.Uint64

err := Schedule(func(i uint64) error {
	// count the number of invocations
	counter.Add(1)

	// the first function returns an error
	// but because of force = True, the execution is not aborted
	if i == 0 {
		return errors.New("first function error")
	}

	// ... work ...
	time.Sleep(50 * time.Millisecond)
	return nil
}, 10, Concurrency{
	Limit: 2,
	Force: true,
})

fmt.Println(err)
fmt.Println(counter.Load(), "workers called")
Output:

first function error
10 workers called
Example (Order)
// because we have a parallelism of 1, we run exactly in order!
_ = Schedule(func(i uint64) error {
	fmt.Print(i, ";")
	return nil
}, 4, Concurrency{
	Limit: 1,
	Force: false,
})
Output:

0;1;2;3;

Types

type Concurrency

type Concurrency struct {
	Limit int  // Limit indicates the maximum number of concurrent operations. 0 or negative implies no limit.
	Force bool // Force indicates if a failed operation should still allow future operations to start
}

Concurrency represents the amount of concurrency of an operation

type Pool

type Pool[V any] struct {
	// New creates a new item in the pool.
	New func() V

	// Discard is called before an item is (permanently) remove from the pool.
	// Unlike a finalizer, Discard is guaranteed to be called.
	Discard func(V)

	// Limit is the maximum number of objects in the pool.
	// Limit <= 0 means there is no limit on the number of objects in the pool.
	// When Use requests an item, and the limit is already reached,
	// Use will block until an item becomes available.
	Limit int
	// contains filtered or unexported fields
}

Pool holds a finite set of lazily created objects.

Example
//spellchecker:words sema
package main

//spellchecker:words errors sync atomic github pkglib sema
import (
	"errors"
	"fmt"
	"sync/atomic"

	"github.com/tkw1536/pkglib/sema"
)

type Thing uint64

func (t *Thing) OK() error {
	fmt.Printf("OK(%d)\n", *t)
	return nil
}

var errTest = errors.New("test error")

func (t *Thing) Fail() error {
	fmt.Printf("Fail(%d)\n", *t)
	return errTest
}

func (t *Thing) Close() {
	fmt.Printf("Close(%d)\n", *t)
}

func main() {
	var counter atomic.Uint64
	p := sema.Pool[*Thing]{
		Limit: 1, // at most one item in the pool
		New: func() *Thing {
			// create a new thing (and print creating it)
			id := counter.Add(1)
			fmt.Printf("New(%d)\n", id)
			return ((*Thing)(&id))
		},
		Discard: (*Thing).Close,
	}
	defer p.Close()

	// the first time an item from the pool is requested, it is created using New()
	_ = p.Use((*Thing).OK)

	// calling it again, re-uses it
	_ = p.Use((*Thing).OK)

	// failing causes it to be destroyed
	_ = p.Use((*Thing).Fail)

	// and calling it again re-creates another one
	_ = p.Use((*Thing).OK)

}
Output:

New(1)
OK(1)
OK(1)
Fail(1)
Close(1)
New(2)
OK(2)
Close(2)

func (*Pool[V]) Close

func (pool *Pool[V]) Close()

Close discards all objects currently in the pool. Note that any active Use calls are not waited for.

func (*Pool[V]) Use

func (pool *Pool[V]) Use(f func(V) error) error

Use borrows an object from the pool, passes it to f, and then returns it to the pool. Use blocks until f has returned. If f returns an error (or f panics) the returned object is discarded, and a new object is created once needed.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore guards parallel access to a shared resource. It should always be created using New.

The resource can be acquired using a call to [Lock]. and released using a call to [Unlock].

func New

func New(size int) Semaphore

New creates a new Semaphore, guarding a resource of at most the given size. The resource is assumed to be entirely available.

A size <= 0 indicates an infinite limit, and all Lock and Unlock calls are no-ops.

Note that if size is statically known to be 1, a Mutex should be used instead.

Example
// create a new semaphore with two elements
sema := New(2)

// some very finite resource pool
var resource atomic.Uint64
resource.Store(2)

// create N = 100 workers that each attempt to use the finite resource
N := 100
var worked atomic.Uint64
var wg sync.WaitGroup
wg.Add(N)
for range N {
	go func() {
		// accounting: keep track that we did some work and that we're done!
		defer wg.Done()
		worked.Add(1)

		// Lock the semaphore
		// the lock can be locked at most twice
		sema.Lock()
		defer sema.Unlock()

		// check that the resource is available
		// since we are protected by the semaphore, this is guaranteed to be the case
		if resource.Load() == 0 {
			panic("no resource available")
		}

		// while we are working, take the resources away
		resource.Add(^uint64(0))
		defer resource.Add(1)

		// ... deep computation ...
		time.Sleep(10 * time.Millisecond)
	}()
}

wg.Wait()

fmt.Printf("Worked %d times", worked.Load())
Output:

Worked 100 times
Example (One)
// a semaphore with value one behaves just like a mutex
sema := New(1)
nothing := time.Nanosecond

// do a bunch of locks and unlocks
N := 1000

for range N {
	sema.Lock()
	time.Sleep(nothing)
	sema.Unlock()
}

// and nothing was blocked!
fmt.Println("nothing blocked")
Output:

nothing blocked
Example (Panic)
sema := New(2)

// an unlock without a corresponding unlock will always panic
didPanic, value := testlib.DoesPanic(func() {
	sema.Unlock()
})
if !didPanic {
	panic("did not panic")
}

fmt.Printf("Unlock() panic = %#v", value)
Output:

Unlock() panic = "Semaphore: Unlock without Lock"
Example (Simple)
sema := New(2)

// we can lock it two times
sema.Lock()
sema.Lock()

// this call would block
// sema.Lock()
fmt.Println("two lock calls")

// before need to unlock to acquire again
sema.Unlock()
sema.Lock()

fmt.Println("another lock call only after unlock")
Output:

two lock calls
another lock call only after unlock
Example (Two)
// a semaphore with value >= 2 is a regular semaphore
sema := New(2)
nothing := time.Nanosecond

// do a bunch of locks and unlocks
N := 1000

// can lock it twice, before requiring an unlock
for range N {
	sema.Lock()
	sema.Lock()

	time.Sleep(nothing)

	sema.Unlock()
	sema.Unlock()
}

fmt.Println("nothing blocked")
Output:

nothing blocked
Example (Zero)
// a zero or negative limit creates a semaphore without any limits
sema := New(0)

N := 1000

// so we can call Lock as many times as we want
for range N {
	sema.Lock()
}

sema.Unlock()

// and nothing was blocked!
fmt.Println("nothing blocked")
Output:

nothing blocked

func (Semaphore) Len

func (s Semaphore) Len() int

Len returns the maximum size of the resource guarded by this semaphore, or 0 if said limit is infinite.

func (Semaphore) Lock

func (s Semaphore) Lock()

Lock atomically acquires a unit of the guarded resource. When the resource is not available, it blocks until such a resource is available.

func (Semaphore) TryLock

func (s Semaphore) TryLock() bool

TryLock attempts to atomically acquire the resource without blocking. When it succeeds, it returns true, otherwise it returns false.

Calls to [TryLock] never block; they always return immediately.

func (Semaphore) Unlock

func (s Semaphore) Unlock()

Unlock releases one unit of the resource that has been previously acquired. Calls to Unlock never block.

Calls to Unlock without an acquired resource are a programming error and may block forever.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL