Documentation ¶
Overview ¶
Package syncevent provides efficient primitives for goroutine synchronization based on event bitmasks.
Example (IoReadinessInterrputible) ¶
const ( evReady = Set(1 << iota) evInterrupt ) errNotReady := fmt.Errorf("not ready for I/O") // State of some I/O object. var ( br Broadcaster ready atomicbitops.Uint32 ) doIO := func() error { if ready.Load() == 0 { return errNotReady } return nil } go func() { // The I/O object eventually becomes ready for I/O. time.Sleep(100 * time.Millisecond) // When it does, it first ensures that future calls to isReady() return // true, then broadcasts the readiness event to Receivers. ready.Store(1) br.Broadcast(evReady) }() // Each user of the I/O object owns a Waiter. var w Waiter w.Init() // The Waiter may be asynchronously interruptible, e.g. for signal // handling in the sentry. go func() { time.Sleep(200 * time.Millisecond) w.Receiver().Notify(evInterrupt) }() // To use the I/O object: // // Optionally, if the I/O object is likely to be ready, attempt I/O first. err := doIO() if err == nil { // Success, we're done. return /* nil */ } if err != errNotReady { // Failure, I/O failed for some reason other than readiness. return /* err */ } // Subscribe for readiness events from the I/O object. id := br.SubscribeEvents(w.Receiver(), evReady) // When we are finished blocking, unsubscribe from readiness events and // remove readiness events from the pending event set. defer UnsubscribeAndAck(&br, w.Receiver(), evReady, id) for { // Attempt I/O again. This must be done after the call to SubscribeEvents, // since the I/O object might have become ready between the previous call // to doIO and the call to SubscribeEvents. err = doIO() if err == nil { return /* nil */ } if err != errNotReady { return /* err */ } // Block until either the I/O object indicates it is ready, or we are // interrupted. events := w.Wait() if events&evInterrupt != 0 { // In the specific case of sentry signal handling, signal delivery // is handled by another system, so we aren't responsible for // acknowledging evInterrupt. return /* errInterrupted */ } // Note that, in a concurrent context, the I/O object might become // ready and then not ready again. To handle this: // // - evReady must be acknowledged before calling doIO() again (rather // than after), so that if the I/O object becomes ready *again* after // the call to doIO(), the readiness event is not lost. // // - We must loop instead of just calling doIO() once after receiving // evReady. w.Ack(evReady) }
Output:
Index ¶
Examples ¶
Constants ¶
const ( // NoEvents is a Set containing no events. NoEvents = Set(0) // AllEvents is a Set containing all possible events. AllEvents = ^Set(0) // MaxEvents is the number of distinct events that can be represented by a Set. MaxEvents = 64 )
Variables ¶
This section is empty.
Functions ¶
func PutWaiter ¶
func PutWaiter(w *Waiter)
PutWaiter releases an unused Waiter previously returned by GetWaiter.
func UnsubscribeAndAck ¶
func UnsubscribeAndAck(src Source, r *Receiver, filter Set, id SubscriptionID)
UnsubscribeAndAck is a convenience function that unsubscribes r from the given events from src and also clears them from r.
Types ¶
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
Broadcaster is an implementation of Source that supports any number of subscribed Receivers.
The zero value of Broadcaster is valid and has no subscribed Receivers. Broadcaster is not copyable by value.
All Broadcaster methods may be called concurrently from multiple goroutines.
func (*Broadcaster) Broadcast ¶
func (b *Broadcaster) Broadcast(events Set)
Broadcast notifies all Receivers subscribed to the Broadcaster of the subset of events to which they subscribed. The order in which Receivers are notified is unspecified.
func (*Broadcaster) FilteredEvents ¶
func (b *Broadcaster) FilteredEvents() Set
FilteredEvents returns the set of events for which Broadcast will notify at least one Receiver, i.e. the union of filters for all subscribed Receivers.
func (*Broadcaster) SubscribeEvents ¶
func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID
SubscribeEvents implements Source.SubscribeEvents.
func (*Broadcaster) UnsubscribeEvents ¶
func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID)
UnsubscribeEvents implements Source.UnsubscribeEvents.
type NoopSource ¶
type NoopSource struct{}
NoopSource implements Source by never sending events to subscribed Receivers.
func (NoopSource) SubscribeEvents ¶
func (NoopSource) SubscribeEvents(*Receiver, Set) SubscriptionID
SubscribeEvents implements Source.SubscribeEvents.
func (NoopSource) UnsubscribeEvents ¶
func (NoopSource) UnsubscribeEvents(SubscriptionID)
UnsubscribeEvents implements Source.UnsubscribeEvents.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver is an event sink that holds pending events and invokes a callback whenever new events become pending. Receiver's methods may be called concurrently from multiple goroutines.
Receiver.Init() must be called before first use.
func (*Receiver) Init ¶
func (r *Receiver) Init(cb ReceiverCallback)
Init must be called before first use of r.
func (*Receiver) PendingAndAckAll ¶
PendingAndAckAll unsets all events as pending and returns the set of previously-pending events.
PendingAndAckAll should only be used in preference to a call to Pending followed by a conditional call to Ack when the caller expects events to be pending (e.g. after a call to ReceiverCallback.NotifyPending()).
type ReceiverCallback ¶
type ReceiverCallback interface { // NotifyPending is called when the corresponding Receiver has new pending // events. // // NotifyPending is called synchronously from Receiver.Notify(), so // implementations must not take locks that may be held by callers of // Receiver.Notify(). NotifyPending may be called concurrently from // multiple goroutines. NotifyPending() }
ReceiverCallback receives callbacks from a Receiver.
type Set ¶
type Set uint64
Set is a bitmask where each bit represents a distinct user-defined event. The event package does not treat any bits in Set specially.
type Source ¶
type Source interface { // SubscribeEvents causes the Source to notify the given Receiver of the // given subset of events. // // Preconditions: // * r != nil. // * The ReceiverCallback for r must not take locks that are ordered // prior to the Source; for example, it cannot call any Source // methods. SubscribeEvents(r *Receiver, filter Set) SubscriptionID // UnsubscribeEvents causes the Source to stop notifying the Receiver // subscribed by a previous call to SubscribeEvents that returned the given // SubscriptionID. // // Preconditions: UnsubscribeEvents may be called at most once for any // given SubscriptionID. UnsubscribeEvents(id SubscriptionID) }
Source represents an event source.
type SubscriptionID ¶
type SubscriptionID uint64
SubscriptionID identifies a call to Source.SubscribeEvents.
type Waiter ¶
type Waiter struct {
// contains filtered or unexported fields
}
Waiter allows a goroutine to block on pending events received by a Receiver.
Waiter.Init() must be called before first use.
func GetWaiter ¶
func GetWaiter() *Waiter
GetWaiter returns an unused Waiter. PutWaiter should be called to release the Waiter once it is no longer needed.
Where possible, users should prefer to associate each goroutine that calls Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of Waiters in hot paths.
func (*Waiter) Notify ¶
Notify marks the given events as pending, possibly unblocking concurrent calls to w.Wait() or w.WaitFor().
func (*Waiter) NotifyPending ¶
func (w *Waiter) NotifyPending()
NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter should not call NotifyPending.
func (*Waiter) Receiver ¶
Receiver returns the Receiver that receives events that unblock calls to w.Wait().
func (*Waiter) Wait ¶
Wait blocks until at least one event is pending, then returns the set of pending events. It does not affect the set of pending events; callers must call w.Ack() to do so, or use w.WaitAndAck() instead.
Precondition: Only one goroutine may call any Wait* method at a time.
func (*Waiter) WaitAndAckAll ¶
WaitAndAckAll blocks until at least one event is pending, then marks all events as not pending and returns the set of previously-pending events.
Precondition: Only one goroutine may call any Wait* method at a time.
func (*Waiter) WaitFor ¶
WaitFor blocks until at least one event in es is pending, then returns the set of pending events (including those not in es). It does not affect the set of pending events; callers must call w.Ack() to do so.
Precondition: Only one goroutine may call any Wait* method at a time.