Documentation ¶
Overview ¶
Package chans contains utility constraints, functions, and types regarding Go channels, such as the PubSub type for fan-out events.
Index ¶
- Variables
- func RecvContext[C ~<-chan V, V any](ctx context.Context, ch C) (V, bool)
- func RecvQueued[C Receiver[V], V any](ch C, maxValues int) []V
- func RecvQueuedFull[C Receiver[V], B ~[]V, V any](ch C, buf B) int
- func RecvTimeout[C Receiver[V], V any](ch C, timeout time.Duration) (V, bool)
- func SendContext[C Sender[V], V any](ctx context.Context, ch C, value V) bool
- func SendTimeout[C Sender[V], V any](ch C, value V, timeout time.Duration) bool
- type Chan
- type PubSub
- func (o *PubSub[T]) Pub(ev T)
- func (o *PubSub[T]) PubSlice(evs []T)
- func (o *PubSub[T]) PubSliceSync(evs []T)
- func (o *PubSub[T]) PubSliceWait(evs []T)
- func (o *PubSub[T]) PubSync(ev T)
- func (o *PubSub[T]) PubWait(ev T)
- func (o *PubSub[T]) Sub() <-chan T
- func (o *PubSub[T]) SubBuf(size int) <-chan T
- func (o *PubSub[T]) Unsub(sub <-chan T) error
- func (o *PubSub[T]) UnsubAll() error
- func (o *PubSub[T]) WithOnly(sub <-chan T) *PubSub[T]
- type Receiver
- type Sender
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrAlreadyUnsubscribed = errors.New("already unsubscribed") ErrSubscriptionNotInitalized = errors.New("subscription is not initialized") )
Errors specific for the listener and subscriptions.
Functions ¶
func RecvContext ¶
RecvContext receives a value from a channel, or cancels when the given context is cancelled.
func RecvQueued ¶
RecvQueued will receive all values from a channel until either there's no more values in the channel's queue buffer, or it has received maxValues values, or until the channel is closed, whichever comes first.
func RecvQueuedFull ¶
RecvQueuedFull will receive all values from a channel until either there's no more values in the channel's queue buffer, or it has filled buf with values, or until the channel is closed, whichever comes first, and then returns the number of values that was received.
func RecvTimeout ¶
RecvTimeout receives a value from a channel, or cancels after a given timeout. If the timeout duration is zero or negative, then no limit is used.
func SendContext ¶
SendContext receives a value from a channel, or cancels when the given context is cancelled.
Types ¶
type Chan ¶
type Chan[T any] interface { ~chan T | ~chan<- T | ~<-chan T }
Chan is a constraint that permits any type of channel, be it a receive-only, send-only, or unidirectional channel.
type PubSub ¶
type PubSub[T any] struct { OnPubTimeout func(ev T) // called if Pub or PubWait times out PubTimeoutAfter time.Duration // times out Pub & PubWait, if positive DefaultBuffer int // contains filtered or unexported fields }
PubSub is a type that allows publishing an event which will be sent out to all subscribed channels. A sort of "fan-out message queue".
Example ¶
// SPDX-FileCopyrightText: 2022 Kalle Fagerberg // // SPDX-License-Identifier: MIT package main import ( "fmt" "sync" "gopkg.in/typ.v4/chans" ) func printMessages(prefix string, ch <-chan string, wg *sync.WaitGroup) { for msg := range ch { fmt.Println(prefix, msg) } wg.Done() } func main() { var pub chans.PubSub[string] var wg sync.WaitGroup sub1 := pub.Sub() sub2 := pub.Sub() wg.Add(2) go printMessages("sub1:", sub1, &wg) go printMessages("sub2:", sub2, &wg) pub.PubWait("hello there") pub.UnsubAll() wg.Wait() }
Output: sub1: hello there sub2: hello there
func (*PubSub[T]) Pub ¶
func (o *PubSub[T]) Pub(ev T)
Pub sends the event to all subscriptions in their own goroutines and returns immediately without waiting for any of the channels to finish sending.
func (*PubSub[T]) PubSlice ¶
func (o *PubSub[T]) PubSlice(evs []T)
PubSlice sends a slice of events to all subscriptions in their own goroutines and returns immediately without waiting for any of the channels to finish sending.
func (*PubSub[T]) PubSliceSync ¶
func (o *PubSub[T]) PubSliceSync(evs []T)
PubSliceSync blocks while sending a slice of events syncronously to all subscriptions without starting a single goroutine. Useful in performance-critical use cases where there are a low expected number of subscribers (0-3).
func (*PubSub[T]) PubSliceWait ¶
func (o *PubSub[T]) PubSliceWait(evs []T)
PubSliceWait blocks while sending a slice of events to all subscriptions in their own goroutines, and waits until all have received the message or timed out.
func (*PubSub[T]) PubSync ¶
func (o *PubSub[T]) PubSync(ev T)
PubSync blocks while sending the event syncronously to all subscriptions without starting a single goroutine. Useful in performance-critical use cases where there are a low expected number of subscribers (0-3).
func (*PubSub[T]) PubWait ¶
func (o *PubSub[T]) PubWait(ev T)
PubWait blocks while sending the event to all subscriptions in their own goroutines, and waits until all have received the message or timed out.
func (*PubSub[T]) Sub ¶
func (o *PubSub[T]) Sub() <-chan T
Sub subscribes to events in a newly created channel using the default buffer size for this PubSub. If no default is configured, the buffer size will be 0.
func (*PubSub[T]) SubBuf ¶
SubBuf subscribes to events in a newly created channel with a specified buffer size.