Documentation ¶
Index ¶
- func WaitAny[T any, N NotifierLoader[T]](ctx context.Context, fn func(T) bool, notifiers ...N) (T, int)
- func WaitAnyMethod[T any, V any](ctx context.Context, fn func(T) bool, method func(V) (T, <-chan struct{}), ...) (T, int)
- type Channel
- type NotifierLoader
- type Ring
- func (r *Ring[T]) All() iter.Seq[T]
- func (r *Ring[T]) Cap() int
- func (r *Ring[T]) Copy(out []T) int
- func (r *Ring[T]) Len() int
- func (r *Ring[T]) PeekFront() (T, bool)
- func (r *Ring[T]) PeekIndex(i int) (T, bool)
- func (r *Ring[T]) PopFront() (T, bool)
- func (r *Ring[T]) PopIndex(i int) (T, bool)
- func (r *Ring[T]) PushBack(e T) bool
- func (r *Ring[T]) Reset()
- func (r *Ring[T]) Scan(fn func(T) bool) (T, int)
- type StatefulNotifier
- func (n *StatefulNotifier[T]) Load() (T, <-chan struct{})
- func (n *StatefulNotifier[T]) Store(value T)
- func (n *StatefulNotifier[T]) Update(fn func(T) T) T
- func (n *StatefulNotifier[T]) Wait(ctx context.Context, fn func(T) bool) (T, error)
- func (n *StatefulNotifier[T]) Watch(ctx context.Context) iter.Seq[T]
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WaitAny ¶ added in v0.2.0
func WaitAny[T any, N NotifierLoader[T]](ctx context.Context, fn func(T) bool, notifiers ...N) (T, int)
WaitAny blocks until one of the given states match the condition function, or else the context is canceled. It returns the value that satisfied the condition, along with an index of the notifier that was matched.
Note that, like Wait, WaitAny may miss intermediate updates if multiple updates occur quickly.
If the context was canceled, the value will be the zero value and the index will be -1.
func WaitAnyMethod ¶ added in v0.2.1
func WaitAnyMethod[T any, V any](ctx context.Context, fn func(T) bool, method func(V) (T, <-chan struct{}), objs ...V) (T, int)
WaitAnyMethod is like WaitAny, but takes a list of objects along with a method signature that returns a value and a notifier channel. This allows it to be used with similar operations which have a different method name or by using `method` as an adapter function.
Types ¶
type Channel ¶ added in v0.2.0
type Channel[T any] struct { // contains filtered or unexported fields }
Channel is a publish/subscribe channel. It is similar to a Go channel with infinite capacity, with a couple important differences.
1. Multiple receivers. There may be multiple receivers (or publishers), and all receivers get all messages.
2. Persistence. Messages are not persisted. If no receivers are listening when a message is published, it will be lost. When a receiver subscribes, it will only receive messages published after the subscription is created.
func (*Channel[T]) Close ¶ added in v0.2.4
func (c *Channel[T]) Close()
Close the channel. This will prevent any new values from being published, and will cause all subscribers to stop receiving values after the last message. For receive iterators, this will cause the iterator to terminate.
func (*Channel[T]) Publish ¶ added in v0.2.0
func (c *Channel[T]) Publish(value T)
Publish a new value to the channel. This value will be sent to all subscribers. Note that values are not persisted, so if no subscribers are listening when a value is published, it will be lost.
func (*Channel[T]) Receive ¶ added in v0.2.4
Receive subscribes to updates on the channel and returns a sequence of values. The subscription is setup before the function returns, so it is safe to publish values immediately after calling Receive. The sequence may be infinite, it will only terminate if the channel is closed.
func (*Channel[T]) Subscribe ¶ added in v0.2.0
func (c *Channel[T]) Subscribe(fn func(T)) *Subscription[T]
Subscribe is like Watch, but without the context. The subscription will run until it is canceled. The subscription is setup before the function returns, so it is safe to publish values immediately after calling Subscribe.
type NotifierLoader ¶ added in v0.2.1
type NotifierLoader[T any] interface { Load() (T, <-chan struct{}) }
type Ring ¶
type Ring[T any] struct { // contains filtered or unexported fields }
Ring is a fixed-size ring buffer that supports pushing and popping elements, as well as copying elements into a slice, and removing an element by index. The ring is implemented as a single slice, which is never reallocated.
Note that no synchronization is done. If the ring is accessed concurrently, it must be synchronized externally.
func (*Ring[T]) Cap ¶
Cap returns the fixed size of the ring. This is constant for the lifetime of the ring.
func (*Ring[T]) Copy ¶
Copy makes a copy of the first n elements of the ring into the out slice. It returns the number of elements copied. This does not consume elements from the ring.
func (*Ring[T]) PeekFront ¶ added in v0.1.5
PeekFront returns the first element in the ring without removing it.
func (*Ring[T]) PeekIndex ¶ added in v0.1.5
PeekIndex returns the element at the given index without removing it. If the index is out of bounds, it returns false. The index is 0-based, with 0 being the first element in the ring. PeekIndex(0) is equivalent to PeekFront.
func (*Ring[T]) PopFront ¶
PopFront removes and returns the first element in the ring. If the ring is empty, it returns false.
func (*Ring[T]) PopIndex ¶
PopIndex removes and returns the element at the given index. This will require copying elements to maintain the ring structure, which has a time complexity of O(n) in the worst case.
If the index is out of bounds, it returns false. The index is 0-based, with 0 being the first element in the ring. PopIndex(0) is equivalent to PopFront.
func (*Ring[T]) PushBack ¶
PushBack adds the element to the ring. If the ring is full, it returns false.
type StatefulNotifier ¶ added in v0.2.0
type StatefulNotifier[T any] struct { // contains filtered or unexported fields }
StatefulNotifier holds a value and notifies listeners when the value is updated. Unlike a Channel, it does not persist values, so a listener (calling Get) may not see all updates if multiple updates occur between calls to Get.
func NewStatefulNotifier ¶ added in v0.2.0
func NewStatefulNotifier[T any](initial T) *StatefulNotifier[T]
func (*StatefulNotifier[T]) Load ¶ added in v0.2.0
func (n *StatefulNotifier[T]) Load() (T, <-chan struct{})
Load returns the current value, along with a channel that will unblock when the value is updated.
func (*StatefulNotifier[T]) Store ¶ added in v0.2.0
func (n *StatefulNotifier[T]) Store(value T)
Store updates the value and unblocks any listeners.
func (*StatefulNotifier[T]) Update ¶ added in v0.2.2
func (n *StatefulNotifier[T]) Update(fn func(T) T) T
Update will atomically provide the current value to the update function and store the result of the function. Note that this will call the user's function with a lock held, so if the function blocks, then other calls to the notifier will block.
func (*StatefulNotifier[T]) Wait ¶ added in v0.2.0
func (n *StatefulNotifier[T]) Wait(ctx context.Context, fn func(T) bool) (T, error)
Wait blocks until the given condition function returns true or the context is canceled. It returns the value that satisfied the condition.
Note that Wait may miss intermediate updates if multiple update occur quickly. If every update should be processed, use Channel instead.
func (*StatefulNotifier[T]) Watch ¶ added in v0.2.4
func (n *StatefulNotifier[T]) Watch(ctx context.Context) iter.Seq[T]
Watch returns an iterator which will yield the current value and any updates. Note that updates may be missed if multiple updates occur quickly. If all updates should be processed, use a Channel instead. If the context is cancelled, then the iterator terminates.
type Subscription ¶ added in v0.2.0
type Subscription[T any] struct { // contains filtered or unexported fields }
func (*Subscription[T]) Cancel ¶ added in v0.2.0
func (s *Subscription[T]) Cancel()