syncevent

package
v0.0.0-202406181927 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2024 License: Apache-2.0, MIT Imports: 4 Imported by: 0

Documentation

Overview

Package syncevent provides efficient primitives for goroutine synchronization based on event bitmasks.

Example (IoReadinessInterrputible)
const (
	evReady = Set(1 << iota)
	evInterrupt
)
errNotReady := fmt.Errorf("not ready for I/O")

// State of some I/O object.
var (
	br    Broadcaster
	ready atomicbitops.Uint32
)
doIO := func() error {
	if ready.Load() == 0 {
		return errNotReady
	}
	return nil
}
go func() {
	// The I/O object eventually becomes ready for I/O.
	time.Sleep(100 * time.Millisecond)
	// When it does, it first ensures that future calls to isReady() return
	// true, then broadcasts the readiness event to Receivers.
	ready.Store(1)
	br.Broadcast(evReady)
}()

// Each user of the I/O object owns a Waiter.
var w Waiter
w.Init()
// The Waiter may be asynchronously interruptible, e.g. for signal
// handling in the sentry.
go func() {
	time.Sleep(200 * time.Millisecond)
	w.Receiver().Notify(evInterrupt)
}()

// To use the I/O object:
//
// Optionally, if the I/O object is likely to be ready, attempt I/O first.
err := doIO()
if err == nil {
	// Success, we're done.
	return /* nil */
}
if err != errNotReady {
	// Failure, I/O failed for some reason other than readiness.
	return /* err */
}
// Subscribe for readiness events from the I/O object.
id := br.SubscribeEvents(w.Receiver(), evReady)
// When we are finished blocking, unsubscribe from readiness events and
// remove readiness events from the pending event set.
defer UnsubscribeAndAck(&br, w.Receiver(), evReady, id)
for {
	// Attempt I/O again. This must be done after the call to SubscribeEvents,
	// since the I/O object might have become ready between the previous call
	// to doIO and the call to SubscribeEvents.
	err = doIO()
	if err == nil {
		return /* nil */
	}
	if err != errNotReady {
		return /* err */
	}
	// Block until either the I/O object indicates it is ready, or we are
	// interrupted.
	events := w.Wait()
	if events&evInterrupt != 0 {
		// In the specific case of sentry signal handling, signal delivery
		// is handled by another system, so we aren't responsible for
		// acknowledging evInterrupt.
		return /* errInterrupted */
	}
	// Note that, in a concurrent context, the I/O object might become
	// ready and then not ready again. To handle this:
	//
	//	- evReady must be acknowledged before calling doIO() again (rather
	//		than after), so that if the I/O object becomes ready *again* after
	//		the call to doIO(), the readiness event is not lost.
	//
	//	- We must loop instead of just calling doIO() once after receiving
	//		evReady.
	w.Ack(evReady)
}
Output:

Index

Examples

Constants

View Source
const (
	// NoEvents is a Set containing no events.
	NoEvents = Set(0)

	// AllEvents is a Set containing all possible events.
	AllEvents = ^Set(0)

	// MaxEvents is the number of distinct events that can be represented by a Set.
	MaxEvents = 64
)

Variables

This section is empty.

Functions

func PutWaiter

func PutWaiter(w *Waiter)

PutWaiter releases an unused Waiter previously returned by GetWaiter.

func UnsubscribeAndAck

func UnsubscribeAndAck(src Source, r *Receiver, filter Set, id SubscriptionID)

UnsubscribeAndAck is a convenience function that unsubscribes r from the given events from src and also clears them from r.

Types

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster is an implementation of Source that supports any number of subscribed Receivers.

The zero value of Broadcaster is valid and has no subscribed Receivers. Broadcaster is not copyable by value.

All Broadcaster methods may be called concurrently from multiple goroutines.

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(events Set)

Broadcast notifies all Receivers subscribed to the Broadcaster of the subset of events to which they subscribed. The order in which Receivers are notified is unspecified.

func (*Broadcaster) FilteredEvents

func (b *Broadcaster) FilteredEvents() Set

FilteredEvents returns the set of events for which Broadcast will notify at least one Receiver, i.e. the union of filters for all subscribed Receivers.

func (*Broadcaster) SubscribeEvents

func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID

SubscribeEvents implements Source.SubscribeEvents.

func (*Broadcaster) UnsubscribeEvents

func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID)

UnsubscribeEvents implements Source.UnsubscribeEvents.

type NoopSource

type NoopSource struct{}

NoopSource implements Source by never sending events to subscribed Receivers.

func (NoopSource) SubscribeEvents

func (NoopSource) SubscribeEvents(*Receiver, Set) SubscriptionID

SubscribeEvents implements Source.SubscribeEvents.

func (NoopSource) UnsubscribeEvents

func (NoopSource) UnsubscribeEvents(SubscriptionID)

UnsubscribeEvents implements Source.UnsubscribeEvents.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver is an event sink that holds pending events and invokes a callback whenever new events become pending. Receiver's methods may be called concurrently from multiple goroutines.

Receiver.Init() must be called before first use.

func (*Receiver) Ack

func (r *Receiver) Ack(es Set)

Ack unsets the given events as pending.

func (*Receiver) Init

func (r *Receiver) Init(cb ReceiverCallback)

Init must be called before first use of r.

func (*Receiver) Notify

func (r *Receiver) Notify(es Set)

Notify sets the given events as pending.

func (*Receiver) Pending

func (r *Receiver) Pending() Set

Pending returns the set of pending events.

func (*Receiver) PendingAndAckAll

func (r *Receiver) PendingAndAckAll() Set

PendingAndAckAll unsets all events as pending and returns the set of previously-pending events.

PendingAndAckAll should only be used in preference to a call to Pending followed by a conditional call to Ack when the caller expects events to be pending (e.g. after a call to ReceiverCallback.NotifyPending()).

type ReceiverCallback

type ReceiverCallback interface {
	// NotifyPending is called when the corresponding Receiver has new pending
	// events.
	//
	// NotifyPending is called synchronously from Receiver.Notify(), so
	// implementations must not take locks that may be held by callers of
	// Receiver.Notify(). NotifyPending may be called concurrently from
	// multiple goroutines.
	NotifyPending()
}

ReceiverCallback receives callbacks from a Receiver.

type Set

type Set uint64

Set is a bitmask where each bit represents a distinct user-defined event. The event package does not treat any bits in Set specially.

type Source

type Source interface {
	// SubscribeEvents causes the Source to notify the given Receiver of the
	// given subset of events.
	//
	// Preconditions:
	//	* r != nil.
	//	* The ReceiverCallback for r must not take locks that are ordered
	//		prior to the Source; for example, it cannot call any Source
	//		methods.
	SubscribeEvents(r *Receiver, filter Set) SubscriptionID

	// UnsubscribeEvents causes the Source to stop notifying the Receiver
	// subscribed by a previous call to SubscribeEvents that returned the given
	// SubscriptionID.
	//
	// Preconditions: UnsubscribeEvents may be called at most once for any
	// given SubscriptionID.
	UnsubscribeEvents(id SubscriptionID)
}

Source represents an event source.

type SubscriptionID

type SubscriptionID uint64

SubscriptionID identifies a call to Source.SubscribeEvents.

type Waiter

type Waiter struct {
	// contains filtered or unexported fields
}

Waiter allows a goroutine to block on pending events received by a Receiver.

Waiter.Init() must be called before first use.

func GetWaiter

func GetWaiter() *Waiter

GetWaiter returns an unused Waiter. PutWaiter should be called to release the Waiter once it is no longer needed.

Where possible, users should prefer to associate each goroutine that calls Waiter.Wait() with a distinct pre-allocated Waiter to avoid allocation of Waiters in hot paths.

func (*Waiter) Ack

func (w *Waiter) Ack(es Set)

Ack marks the given events as not pending.

func (*Waiter) Init

func (w *Waiter) Init()

Init must be called before first use of w.

func (*Waiter) Notify

func (w *Waiter) Notify(es Set)

Notify marks the given events as pending, possibly unblocking concurrent calls to w.Wait() or w.WaitFor().

func (*Waiter) NotifyPending

func (w *Waiter) NotifyPending()

NotifyPending implements ReceiverCallback.NotifyPending. Users of Waiter should not call NotifyPending.

func (*Waiter) Pending

func (w *Waiter) Pending() Set

Pending returns the set of pending events.

func (*Waiter) Receiver

func (w *Waiter) Receiver() *Receiver

Receiver returns the Receiver that receives events that unblock calls to w.Wait().

func (*Waiter) Wait

func (w *Waiter) Wait() Set

Wait blocks until at least one event is pending, then returns the set of pending events. It does not affect the set of pending events; callers must call w.Ack() to do so, or use w.WaitAndAck() instead.

Precondition: Only one goroutine may call any Wait* method at a time.

func (*Waiter) WaitAndAckAll

func (w *Waiter) WaitAndAckAll() Set

WaitAndAckAll blocks until at least one event is pending, then marks all events as not pending and returns the set of previously-pending events.

Precondition: Only one goroutine may call any Wait* method at a time.

func (*Waiter) WaitFor

func (w *Waiter) WaitFor(es Set) Set

WaitFor blocks until at least one event in es is pending, then returns the set of pending events (including those not in es). It does not affect the set of pending events; callers must call w.Ack() to do so.

Precondition: Only one goroutine may call any Wait* method at a time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL