Documentation ¶
Overview ¶
Package sema implements semaphores and semaphore-related scheduling
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Schedule ¶
func Schedule(worker func(uint64) error, count uint64, concurrency Concurrency) (err error)
Schedule schedules count instances of worker to be called. When count is non-positive, no calls are scheduled and immediately returns nil. Each call to worker receives a unique id, from 0 up to count (exclusive count).
Workers are approximately started in order. Any call to worker(i) is called before worker(j) for any i < j. Concurrency determines the amount of concurrency that takes place for scheduling.
There is no synchronization mechanism beyond the limits themselves. In particular for Limit != 1, the order guarantee might be broken: While the invocation of worker(i) occurs before worker(j), no such guarantee for the first statement within those invocation is true.
If an error occurs (indicated by a non-nil return from worker), Schedule waits for all ongoing worker calls to return. It may schedule further calls to worker, as determined by concurrency.Force. It then returns the non-nil error which triggered the error stop.
If no error occurs, schedule returns nil.
Example ¶
var counter atomic.Uint64 // because we have a parallelism of 1, we run exactly in order! _ = Schedule(func(i uint64) error { counter.Add(1) return nil }, 1000, Concurrency{ Limit: 0, Force: false, }) fmt.Print(counter.Load())
Output: 1000
Example (Error) ¶
err := Schedule(func(i uint64) error { // the first invocation produces an error and returns immediately! if i == 0 { return errors.New("first function error") } // concurrently with the first invocation, we have at most one other // so give the first function some time to produce an error time.Sleep(100 * time.Millisecond) // the third and fourth invocations should never be called // since by the time the first function finishes // the second one has already produced an error if i > 1 { panic("never reached") } return nil }, 4, Concurrency{ Limit: 2, Force: false, }) fmt.Println(err)
Output: first function error
Example (Force) ¶
var counter atomic.Uint64 err := Schedule(func(i uint64) error { // count the number of invocations counter.Add(1) // the first function returns an error // but because of force = True, the execution is not aborted if i == 0 { return errors.New("first function error") } // ... work ... time.Sleep(50 * time.Millisecond) return nil }, 10, Concurrency{ Limit: 2, Force: true, }) fmt.Println(err) fmt.Println(counter.Load(), "workers called")
Output: first function error 10 workers called
Example (Order) ¶
// because we have a parallelism of 1, we run exactly in order! _ = Schedule(func(i uint64) error { fmt.Print(i, ";") return nil }, 4, Concurrency{ Limit: 1, Force: false, })
Output: 0;1;2;3;
Types ¶
type Concurrency ¶
type Concurrency struct { Limit int // Limit indicates the maximum number of concurrent operations. 0 or negative implies no limit. Force bool // Force indicates if a failed operation should still allow future operations to start }
Concurrency represents the amount of concurrency of an operation
type Pool ¶
type Pool[V any] struct { // New creates a new item in the pool. New func() V // Discard is called before an item is (permanently) remove from the pool. // Unlike a finalizer, Discard is guaranteed to be called. Discard func(V) // Limit is the maximum number of objects in the pool. // Limit <= 0 means there is no limit on the number of objects in the pool. // When Use requests an item, and the limit is already reached, // Use will block until an item becomes available. Limit int // contains filtered or unexported fields }
Pool holds a finite set of lazily created objects.
Example ¶
//spellchecker:words sema package main //spellchecker:words errors sync atomic github pkglib sema import ( "errors" "fmt" "sync/atomic" "github.com/tkw1536/pkglib/sema" ) type Thing uint64 func (t *Thing) OK() error { fmt.Printf("OK(%d)\n", *t) return nil } var errTest = errors.New("test error") func (t *Thing) Fail() error { fmt.Printf("Fail(%d)\n", *t) return errTest } func (t *Thing) Close() { fmt.Printf("Close(%d)\n", *t) } func main() { var counter atomic.Uint64 p := sema.Pool[*Thing]{ Limit: 1, // at most one item in the pool New: func() *Thing { // create a new thing (and print creating it) id := counter.Add(1) fmt.Printf("New(%d)\n", id) return ((*Thing)(&id)) }, Discard: (*Thing).Close, } defer p.Close() // the first time an item from the pool is requested, it is created using New() _ = p.Use((*Thing).OK) // calling it again, re-uses it _ = p.Use((*Thing).OK) // failing causes it to be destroyed _ = p.Use((*Thing).Fail) // and calling it again re-creates another one _ = p.Use((*Thing).OK) }
Output: New(1) OK(1) OK(1) Fail(1) Close(1) New(2) OK(2) Close(2)
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore guards parallel access to a shared resource. It should always be created using New.
The resource can be acquired using a call to [Lock]. and released using a call to [Unlock].
func New ¶
New creates a new Semaphore, guarding a resource of at most the given size. The resource is assumed to be entirely available.
A size <= 0 indicates an infinite limit, and all Lock and Unlock calls are no-ops.
Note that if size is statically known to be 1, a Mutex should be used instead.
Example ¶
// create a new semaphore with two elements sema := New(2) // some very finite resource pool var resource atomic.Uint64 resource.Store(2) // create N = 100 workers that each attempt to use the finite resource N := 100 var worked atomic.Uint64 var wg sync.WaitGroup wg.Add(N) for range N { go func() { // accounting: keep track that we did some work and that we're done! defer wg.Done() worked.Add(1) // Lock the semaphore // the lock can be locked at most twice sema.Lock() defer sema.Unlock() // check that the resource is available // since we are protected by the semaphore, this is guaranteed to be the case if resource.Load() == 0 { panic("no resource available") } // while we are working, take the resources away resource.Add(^uint64(0)) defer resource.Add(1) // ... deep computation ... time.Sleep(10 * time.Millisecond) }() } wg.Wait() fmt.Printf("Worked %d times", worked.Load())
Output: Worked 100 times
Example (One) ¶
// a semaphore with value one behaves just like a mutex sema := New(1) nothing := time.Nanosecond // do a bunch of locks and unlocks N := 1000 for range N { sema.Lock() time.Sleep(nothing) sema.Unlock() } // and nothing was blocked! fmt.Println("nothing blocked")
Output: nothing blocked
Example (Panic) ¶
sema := New(2) // an unlock without a corresponding unlock will always panic didPanic, value := testlib.DoesPanic(func() { sema.Unlock() }) if !didPanic { panic("did not panic") } fmt.Printf("Unlock() panic = %#v", value)
Output: Unlock() panic = "Semaphore: Unlock without Lock"
Example (Simple) ¶
sema := New(2) // we can lock it two times sema.Lock() sema.Lock() // this call would block // sema.Lock() fmt.Println("two lock calls") // before need to unlock to acquire again sema.Unlock() sema.Lock() fmt.Println("another lock call only after unlock")
Output: two lock calls another lock call only after unlock
Example (Two) ¶
// a semaphore with value >= 2 is a regular semaphore sema := New(2) nothing := time.Nanosecond // do a bunch of locks and unlocks N := 1000 // can lock it twice, before requiring an unlock for range N { sema.Lock() sema.Lock() time.Sleep(nothing) sema.Unlock() sema.Unlock() } fmt.Println("nothing blocked")
Output: nothing blocked
Example (Zero) ¶
// a zero or negative limit creates a semaphore without any limits sema := New(0) N := 1000 // so we can call Lock as many times as we want for range N { sema.Lock() } sema.Unlock() // and nothing was blocked! fmt.Println("nothing blocked")
Output: nothing blocked
func (Semaphore) Len ¶
Len returns the maximum size of the resource guarded by this semaphore, or 0 if said limit is infinite.
func (Semaphore) Lock ¶
func (s Semaphore) Lock()
Lock atomically acquires a unit of the guarded resource. When the resource is not available, it blocks until such a resource is available.