Documentation ¶
Overview ¶
Package eventbus provides a simple event bus implementation.
Index ¶
- func Relay[T any](ctx context.Context, source Source[T], destination Receiver[T], ...)
- func RelayWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], ...)
- func RelayWithMerge[T any](ctx context.Context, source Source[T], merge SubscriptionMerger[T], ...)
- type Receiver
- type RoutingKey
- type RoutingSource
- type Source
- type Subscriber
- type SubscriptionFilter
- type SubscriptionKey
- type SubscriptionMerger
- type SubscriptionOption
- type UnsubscribeFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Relay ¶
func Relay[T any](ctx context.Context, source Source[T], destination Receiver[T], options ...SubscriptionOption[T])
Relay will relay from source to destination. It runs a separate goroutine, consuming events from the source and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.
func RelayWithFilter ¶
func RelayWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], destination Receiver[R], options ...SubscriptionOption[R])
RelayWithFilter will relay from source to destination with the specified filter. It runs a separate goroutine, consuming events from the source, running the filter, and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.
func RelayWithMerge ¶
func RelayWithMerge[T any](ctx context.Context, source Source[T], merge SubscriptionMerger[T], destination Receiver[T], maxLatency time.Duration, maxEventsToMerge int, options ...SubscriptionOption[T])
RelayWithMerge will relay from source to destination, merging events before sending them to the destination. This can be used when there are lots of small individual events that can be more efficiently processed as a few larger events.
Types ¶
type Receiver ¶
type Receiver[T any] interface { // Send the event to this receiver Send(ctx context.Context, event T) }
Receiver receives events of type T
type RoutingKey ¶
type RoutingKey[T any, K comparable] func(event T) K
RoutingKey is a function that returns a route key for a specified event. Events will only be dispatched to subscribers that subscribe with a Context where SubscriptionKey returns this key.
type RoutingSource ¶
type RoutingSource[K, T any] interface { Source[T] // Subscribers returns the current number of routes Routes() int // HasRoute returns true if there is a route for the specified key HasRoute(K) bool }
RoutingSource is a special Source that can route events to subscribers based on the Context used to subscribe.
func NewRoutingSource ¶
func NewRoutingSource[K comparable, T any](routingKey RoutingKey[T, K], subscriptionKey SubscriptionKey[K]) RoutingSource[K, T]
NewRoutingSource creates a new source that routes events using the specified RoutingKey and SubscriptionKey functions.
type Source ¶
type Source[T any] interface { // Send the event to this receiver Send(ctx context.Context, event T) // Subscribe adds a subscriber to the source and automatically unsubscribes when the context is done. If the // context is nil, the unsubscribe function must be called to unsubscribe. Instead of using this method to subscribe // to a source, use one of the eventbus.Subscribe functions to receive a channel of events. Subscribe(ctx context.Context, subscriber Subscriber[T], onUnsubscribe func()) UnsubscribeFunc // Subscribers returns the current number of subscribers (used for testing) Subscribers() int }
Source is a source of events.
type Subscriber ¶
type Subscriber[T any] interface { // Channel will be returned to Subscribe calls to receive events Channel() <-chan T // Receive will be called when an event is available Receive(event T) // Close will be called when the subscriber is unsubscribed Close() }
Subscriber can be notified of events of type T. Instead of using this interface directly, use one of the eventbus.Subscribe functions to receive a channel of events.
type SubscriptionFilter ¶
SubscriptionFilter can filter on events and map from an event to another type. It can also ignore events. If accept is false, the result is ignored and not sent to subscribers.
type SubscriptionKey ¶
type SubscriptionKey[K comparable] func(ctx context.Context) K
SubscriptionKey is a function that returns a route key for a specified context. Subscribers will only receive events where RoutingKey returns this key.
type SubscriptionMerger ¶
SubscriptionMerger merges the second event into the first and returns true if the merge was successful. It should return false if a merge was not possible and the two individual events will be preserved and dispatched separately.
type SubscriptionOption ¶
type SubscriptionOption[T any] func(*subscriptionOptions[T])
SubscriptionOption is used to provide options when creating a new subscription to a Source.
func WithChannel ¶
func WithChannel[T any](channel chan T) SubscriptionOption[T]
WithChannel allows a subscriber to specify the channel that will receive events. This allows the subscriber to control the size.
func WithUnboundedChannel ¶
func WithUnboundedChannel[T any](interval time.Duration) SubscriptionOption[T]
WithUnboundedChannel specifies that util.UnboundedChan should be used for the channel. This will allow an unbounded number of events to be received before being dispatched to subscribers.
func WithUnsubscribeHook ¶
func WithUnsubscribeHook[T any](unsubscribeHook func()) SubscriptionOption[T]
WithUnsubscribeHook specifies a function to be called after unsubscribe
type UnsubscribeFunc ¶
type UnsubscribeFunc func()
UnsubscribeFunc is a function that allows a subscriber to unsubscribe
func Subscribe ¶
func Subscribe[T any](ctx context.Context, bus Source[T], options ...SubscriptionOption[T]) (<-chan T, UnsubscribeFunc)
Subscribe subscribes to events on the bus and returns a channel to receive events and an unsubscribe function. It automatically unsubscribes when the context is done.
func SubscribeWithFilter ¶
func SubscribeWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], options ...SubscriptionOption[R]) (<-chan R, UnsubscribeFunc)
SubscribeWithFilter TODO