broadcast

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2022 License: Apache-2.0 Imports: 2 Imported by: 18

README

broadcast

CI Go Report Card

Notification broadcaster in Go

What?

broadcast is a library that allows sending repeated notifications to multiple goroutines with guaranteed delivery and user defined types.

Why?

Why not Channels?

The standard way to handle notifications is via a chan struct{}. However, sending a message to a channel is received by a single goroutine.

The only operation that is broadcast to multiple goroutines is a channel closure. Yet, if the channel is closed, there's no way to send a message again.

❌ Repeated notifications to multiple goroutines

✅ Guaranteed delivery

Why not sync.Cond?

sync.Cond is the standard solution based on condition variables to set up containers of goroutines waiting for a specific condition.

There's one caveat to keep in mind, though: the Broadcast() method doesn't guarantee that a goroutine will receive the notification. Indeed, the notification will be lost if the listener goroutine isn't waiting on the Wait() method.

✅ Repeated notifications to multiple goroutines

❌ Guaranteed delivery

How?

Step by Step

First, we need to create a Relay for a message type (empty struct in this case):

relay := broadcast.NewRelay[struct{}]()

Once a Relay is created, we can create a new listener using the Listener method. As the broadcast library relies internally on channels, it accepts a capacity:

list := relay.Listener(1) // Create a new listener based on a channel with a one capacity

A Relay can send a notification in three different manners:

  • Notify: block until a notification is sent to all the listeners
  • NotifyCtx: send a notification to all listeners unless the provided context times out or is canceled
  • Broadcast: send a notification to all listeners in a non-blocking manner; delivery isn't guaranteed

On the Listener side, we can access the internal channel using Ch:

<-list.Ch() // Wait on a notification

We can close a Listener and a Relay using Close:

list.Close() 
relay.Close()

Closing a Relay and Listeners can be done concurrently in a safe manner.

Example
type msg string
const (
    msgA msg = "A"
    msgB     = "B"
    msgC     = "C"
)

relay := broadcast.NewRelay[msg]() // Create a relay for msg values
defer relay.Close()

// Listener goroutines
for i := 0; i < 2; i++ {
    go func(i int) {
        l := relay.Listener(1)  // Create a listener with a buffer capacity of 1
        for n := range l.Ch() { // Ranges over notifications
            fmt.Printf("listener %d has received a notification: %v\n", i, n)
        }
    }(i)
}

// Notifiers
time.Sleep(time.Second)
relay.Notify(msgA)                                     // Send notification with guaranteed delivery
ctx, _ := context.WithTimeout(context.Background(), 0) // Context with immediate timeout
relay.NotifyCtx(ctx, msgB)                             // Send notification respecting context cancellation
time.Sleep(time.Second)                                // Allow time for previous messages to be processed
relay.Broadcast(msgC)                                  // Send notification without guaranteed delivery
time.Sleep(time.Second)                                // Allow time for previous messages to be processed

Documentation

Overview

Package broadcast allows to send repeated notifications to multiple goroutines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Listener

type Listener[T any] struct {
	// contains filtered or unexported fields
}

Listener is a Relay listener.

func (*Listener[T]) Ch

func (l *Listener[T]) Ch() <-chan T

Ch returns the Listener channel.

func (*Listener[T]) Close

func (l *Listener[T]) Close()

Close closes a listener. This operation can be safely called in the meantime as Relay.Close()

type Relay

type Relay[T any] struct {
	// contains filtered or unexported fields
}

Relay is the struct in charge of handling the listeners and dispatching the notifications.

func NewRelay

func NewRelay[T any]() *Relay[T]

NewRelay is the factory to create a Relay.

func (*Relay[T]) Broadcast

func (r *Relay[T]) Broadcast(v T)

Broadcast broadcasts a notification to all the listeners. The notification is sent in a non-blocking manner, so there's no guarantee that a listener receives it.

func (*Relay[T]) Close

func (r *Relay[T]) Close()

Close closes a relay. This operation can be safely called in the meantime as Listener.Close()

func (*Relay[T]) Listener

func (r *Relay[T]) Listener(capacity int) *Listener[T]

Listener creates a new listener given a channel capacity.

func (*Relay[T]) Notify

func (r *Relay[T]) Notify(v T)

Notify sends a notification to all the listeners. It guarantees that all the listeners will receive the notification.

func (*Relay[T]) NotifyCtx added in v0.0.4

func (r *Relay[T]) NotifyCtx(ctx context.Context, v T)

NotifyCtx tries sending a notification to all the listeners until the context times out or is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL