chans

package
v4.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2023 License: MIT Imports: 5 Imported by: 3

Documentation

Overview

Package chans contains utility constraints, functions, and types regarding Go channels, such as the PubSub type for fan-out events.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyUnsubscribed       = errors.New("already unsubscribed")
	ErrSubscriptionNotInitalized = errors.New("subscription is not initialized")
)

Errors specific for the listener and subscriptions.

Functions

func RecvContext

func RecvContext[C ~<-chan V, V any](ctx context.Context, ch C) (V, bool)

RecvContext receives a value from a channel, or cancels when the given context is cancelled.

func RecvQueued

func RecvQueued[C Receiver[V], V any](ch C, maxValues int) []V

RecvQueued will receive all values from a channel until either there's no more values in the channel's queue buffer, or it has received maxValues values, or until the channel is closed, whichever comes first.

func RecvQueuedFull

func RecvQueuedFull[C Receiver[V], B ~[]V, V any](ch C, buf B) int

RecvQueuedFull will receive all values from a channel until either there's no more values in the channel's queue buffer, or it has filled buf with values, or until the channel is closed, whichever comes first, and then returns the number of values that was received.

func RecvTimeout

func RecvTimeout[C Receiver[V], V any](ch C, timeout time.Duration) (V, bool)

RecvTimeout receives a value from a channel, or cancels after a given timeout. If the timeout duration is zero or negative, then no limit is used.

func SendContext

func SendContext[C Sender[V], V any](ctx context.Context, ch C, value V) bool

SendContext receives a value from a channel, or cancels when the given context is cancelled.

func SendTimeout

func SendTimeout[C Sender[V], V any](ch C, value V, timeout time.Duration) bool

SendTimeout sends a value to a channel, or cancels after a given duration.

Types

type Chan

type Chan[T any] interface {
	~chan T | ~chan<- T | ~<-chan T
}

Chan is a constraint that permits any type of channel, be it a receive-only, send-only, or unidirectional channel.

type PubSub

type PubSub[T any] struct {
	OnPubTimeout    func(ev T)    // called if Pub or PubWait times out
	PubTimeoutAfter time.Duration // times out Pub & PubWait, if positive
	DefaultBuffer   int
	// contains filtered or unexported fields
}

PubSub is a type that allows publishing an event which will be sent out to all subscribed channels. A sort of "fan-out message queue".

Example
// SPDX-FileCopyrightText: 2022 Kalle Fagerberg
//
// SPDX-License-Identifier: MIT

package main

import (
	"fmt"
	"sync"

	"gopkg.in/typ.v4/chans"
)

func printMessages(prefix string, ch <-chan string, wg *sync.WaitGroup) {
	for msg := range ch {
		fmt.Println(prefix, msg)
	}
	wg.Done()
}

func main() {
	var pub chans.PubSub[string]
	var wg sync.WaitGroup

	sub1 := pub.Sub()
	sub2 := pub.Sub()

	wg.Add(2)
	go printMessages("sub1:", sub1, &wg)
	go printMessages("sub2:", sub2, &wg)

	pub.PubWait("hello there")
	pub.UnsubAll()
	wg.Wait()

}
Output:

sub1: hello there
sub2: hello there

func (*PubSub[T]) Pub

func (o *PubSub[T]) Pub(ev T)

Pub sends the event to all subscriptions in their own goroutines and returns immediately without waiting for any of the channels to finish sending.

func (*PubSub[T]) PubSlice

func (o *PubSub[T]) PubSlice(evs []T)

PubSlice sends a slice of events to all subscriptions in their own goroutines and returns immediately without waiting for any of the channels to finish sending.

func (*PubSub[T]) PubSliceSync

func (o *PubSub[T]) PubSliceSync(evs []T)

PubSliceSync blocks while sending a slice of events syncronously to all subscriptions without starting a single goroutine. Useful in performance-critical use cases where there are a low expected number of subscribers (0-3).

func (*PubSub[T]) PubSliceWait

func (o *PubSub[T]) PubSliceWait(evs []T)

PubSliceWait blocks while sending a slice of events to all subscriptions in their own goroutines, and waits until all have received the message or timed out.

func (*PubSub[T]) PubSync

func (o *PubSub[T]) PubSync(ev T)

PubSync blocks while sending the event syncronously to all subscriptions without starting a single goroutine. Useful in performance-critical use cases where there are a low expected number of subscribers (0-3).

func (*PubSub[T]) PubWait

func (o *PubSub[T]) PubWait(ev T)

PubWait blocks while sending the event to all subscriptions in their own goroutines, and waits until all have received the message or timed out.

func (*PubSub[T]) Sub

func (o *PubSub[T]) Sub() <-chan T

Sub subscribes to events in a newly created channel using the default buffer size for this PubSub. If no default is configured, the buffer size will be 0.

func (*PubSub[T]) SubBuf

func (o *PubSub[T]) SubBuf(size int) <-chan T

SubBuf subscribes to events in a newly created channel with a specified buffer size.

func (*PubSub[T]) Unsub

func (o *PubSub[T]) Unsub(sub <-chan T) error

Unsub unsubscribes a previously subscribed channel.

func (*PubSub[T]) UnsubAll

func (o *PubSub[T]) UnsubAll() error

UnsubAll unsubscribes all subscription channels, rendering them all useless.

func (*PubSub[T]) WithOnly

func (o *PubSub[T]) WithOnly(sub <-chan T) *PubSub[T]

WithOnly returns a new publisher that only contains the given subscription channel. Useful if you need to send events only to a single specific subscription.

type Receiver

type Receiver[T any] interface {
	~chan T | ~<-chan T
}

Receiver is a constraint that permits a receive-only chan or a send & receive channal.

type Sender

type Sender[T any] interface {
	~chan T | ~chan<- T
}

Sender is a constraint that permits a send-only chan or a send & receive channal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL