qnq

package
v0.0.0-...-ec3616b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2025 License: 0BSD Imports: 5 Imported by: 0

Documentation

Overview

Package qnq provides a representation for message queuing, event handling and callback flows. Where there is a fan-out messaging pattern, ie. Pub/Sub semantics.

Variables of type Chan are references to a message queue, T-typed messages can be sent to the Chan using the Chan.Send method and/or subscriptions can be attached by calling [Chan.Register] with a Listener.

These message queues can be used to model event streams, let's say you want to model an event that is fired whenever somebody changes their name,

type NameChangeEvent struct {
    Customer string
    OldName string
    NewName string
}

var NameChanges qnq.Chan[NameChangeEvent]

At the location of your updateName function you would send an event:

err := NameChanges.Send(ctx, NameChangeEvent{ ... })

Another system may wish to listen to these events, so that they can update their customer records. This system can subscribe to the name changes queue that was passed (as a dependency) to their execution.

NameChanges.Attach(ctx, func(ctx context.Context, event NameChangeEvent) error {
   // update our copy of the customer
   return nil
})

Index

Constants

View Source
const ErrEmptyChannel = raise("empty channel")

ErrEmptyChannel is raised if there are no listeners on the channel.

Variables

This section is empty.

Functions

func Open

func Open[T any](db Channels) *T

Open a Channels structure, with each field being a Chan with a 'qnq' tag that specifies the name that will be used to call [OpenChan] on it.

Types

type Chan

type Chan[T any] struct {
	// contains filtered or unexported fields
}

Chan is similar to a Go channel, except that each value sent to it is broadcast to each registered Listener. All operations on a Chan are goroutine safe and lockless. At-least-once delivery semantics.

func (*Chan[T]) Listen

func (ch *Chan[T]) Listen(ctx context.Context, subscription string, listener Listener[T])

Listen registers the given listener for the lifetime of the context, any messages delivered to the channel after this function returns will be delivered to the specified listener. There may also be pending messages buffered for this listener.

The subscription name identifies the listener and may be used as a durable buffer to ensure that messages are not lost if the listener is not available.

The handler must return a nil error in order to acknowledge the message (otherwise it will not be removed from the Chan). The handler should always process incoming messages idempotently, as they may be delivered more than once.

func (*Chan[T]) Send

func (ch *Chan[T]) Send(ctx context.Context, value T) error

Send will broadcast the given message to all registered listeners, with at-least-once delivery. If a reciever returns an error, it will be returned by Chan.Send and it is the caller's responsibility to retry the send operation. Chan.Send will return an error if no listeners are registered.

type Channels

type Channels interface {
	// Send a message to the named channel, returning an error if the message
	// was not durably buffered by an at-least-once sender, or acknowledged.
	Send(ctx context.Context, topic Topic, val any) error

	// Recv a channel of messages from the named channel and subscription ID
	// each message is a decoder that returns an acknowledgement function
	// which should be called as soon as processing
	// is complete.
	Recv(ctx context.Context, topic Topic, subscription string) <-chan func(any) (func(error), error)
}

Channels interface can be used to implement a delivery mechanism for Chan.

type Listener

type Listener[Message any] func(context.Context, Message) error

Listener for values of type Message on a Chan.

type Topic

type Topic string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL