eventbus

package
v1.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package eventbus provides a simple event bus implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Relay

func Relay[T any](ctx context.Context, source Source[T], destination Receiver[T], options ...SubscriptionOption[T])

Relay will relay from source to destination. It runs a separate goroutine, consuming events from the source and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.

func RelayWithFilter

func RelayWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], destination Receiver[R], options ...SubscriptionOption[R])

RelayWithFilter will relay from source to destination with the specified filter. It runs a separate goroutine, consuming events from the source, running the filter, and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.

func RelayWithMerge

func RelayWithMerge[T any](ctx context.Context, source Source[T], merge SubscriptionMerger[T], destination Receiver[T], maxLatency time.Duration, maxEventsToMerge int, options ...SubscriptionOption[T])

RelayWithMerge will relay from source to destination, merging events before sending them to the destination. This can be used when there are lots of small individual events that can be more efficiently processed as a few larger events.

Types

type Receiver

type Receiver[T any] interface {
	// Send the event to this receiver
	Send(ctx context.Context, event T)
}

Receiver receives events of type T

type RoutingKey

type RoutingKey[T any, K comparable] func(event T) K

RoutingKey is a function that returns a route key for a specified event. Events will only be dispatched to subscribers that subscribe with a Context where SubscriptionKey returns this key.

type RoutingSource

type RoutingSource[K, T any] interface {
	Source[T]

	// Subscribers returns the current number of routes
	Routes() int

	// HasRoute returns true if there is a route for the specified key
	HasRoute(K) bool
}

RoutingSource is a special Source that can route events to subscribers based on the Context used to subscribe.

func NewRoutingSource

func NewRoutingSource[K comparable, T any](routingKey RoutingKey[T, K], subscriptionKey SubscriptionKey[K]) RoutingSource[K, T]

NewRoutingSource creates a new source that routes events using the specified RoutingKey and SubscriptionKey functions.

type Source

type Source[T any] interface {
	// Send the event to this receiver
	Send(ctx context.Context, event T)

	// Subscribe adds a subscriber to the source and automatically unsubscribes when the context is done. If the
	// context is nil, the unsubscribe function must be called to unsubscribe. Instead of using this method to subscribe
	// to a source, use one of the eventbus.Subscribe functions to receive a channel of events.
	Subscribe(ctx context.Context, subscriber Subscriber[T], onUnsubscribe func()) UnsubscribeFunc

	// Subscribers returns the current number of subscribers (used for testing)
	Subscribers() int
}

Source is a source of events.

func NewSource

func NewSource[T any]() Source[T]

NewSource returns a new Source implementation for the specified event type T

type Subscriber

type Subscriber[T any] interface {
	// Channel will be returned to Subscribe calls to receive events
	Channel() <-chan T

	// Receive will be called when an event is available
	Receive(event T)

	// Close will be called when the subscriber is unsubscribed
	Close()
}

Subscriber can be notified of events of type T. Instead of using this interface directly, use one of the eventbus.Subscribe functions to receive a channel of events.

type SubscriptionFilter

type SubscriptionFilter[T, R any] func(event T) (result R, accept bool)

SubscriptionFilter can filter on events and map from an event to another type. It can also ignore events. If accept is false, the result is ignored and not sent to subscribers.

type SubscriptionKey

type SubscriptionKey[K comparable] func(ctx context.Context) K

SubscriptionKey is a function that returns a route key for a specified context. Subscribers will only receive events where RoutingKey returns this key.

type SubscriptionMerger

type SubscriptionMerger[T any] func(into, single T) bool

SubscriptionMerger merges the second event into the first and returns true if the merge was successful. It should return false if a merge was not possible and the two individual events will be preserved and dispatched separately.

type SubscriptionOption

type SubscriptionOption[T any] func(*subscriptionOptions[T])

SubscriptionOption is used to provide options when creating a new subscription to a Source.

func WithChannel

func WithChannel[T any](channel chan T) SubscriptionOption[T]

WithChannel allows a subscriber to specify the channel that will receive events. This allows the subscriber to control the size.

func WithUnboundedChannel

func WithUnboundedChannel[T any](interval time.Duration) SubscriptionOption[T]

WithUnboundedChannel specifies that util.UnboundedChan should be used for the channel. This will allow an unbounded number of events to be received before being dispatched to subscribers.

func WithUnsubscribeHook

func WithUnsubscribeHook[T any](unsubscribeHook func()) SubscriptionOption[T]

WithUnsubscribeHook specifies a function to be called after unsubscribe

type UnsubscribeFunc

type UnsubscribeFunc func()

UnsubscribeFunc is a function that allows a subscriber to unsubscribe

func Subscribe

func Subscribe[T any](ctx context.Context, bus Source[T], options ...SubscriptionOption[T]) (<-chan T, UnsubscribeFunc)

Subscribe subscribes to events on the bus and returns a channel to receive events and an unsubscribe function. It automatically unsubscribes when the context is done.

func SubscribeWithFilter

func SubscribeWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], options ...SubscriptionOption[R]) (<-chan R, UnsubscribeFunc)

SubscribeWithFilter TODO

Directories

Path Synopsis
Package broadcast contains the Broadcast interface and a simple local implementation.
Package broadcast contains the Broadcast interface and a simple local implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL