eventbus

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Relay

func Relay[T any](ctx context.Context, source Source[T], destination Source[T], options ...SubscriptionOption[T])

Relay will relay from source to destination. It runs a separate goroutine, consuming events from the source and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.

func RelayWithFilter

func RelayWithFilter[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], destination Source[R], options ...SubscriptionOption[R])

RelayWithFilter will relay from source to destination with the specified filter. It runs a separate goroutine, consuming events from the source, running the filter, and sending events to the destination. When the supplied context is Done, the relay is automatically unsubscribed from the source and the destination will no longer receive events.

func RelayWithMerge added in v0.5.2

func RelayWithMerge[S any, T *S](ctx context.Context, source Source[T], merge SubscriptionMerger[S, T], destination Source[T], maxLatency time.Duration, maxEventsToMerge int, options ...SubscriptionOption[T])

RelayWithMerge will relay from source to destination, merging events before sending them to the destination. This can be used when there are lots of small individual events that can be more efficiently processed as a few larger events.

Types

type Source

type Source[T any] interface {
	// Send the event to all of the subscribers
	Send(event T)

	// SubscribeUntilDone adds a subscriber to the source and automatically unsubscribes when the context is done. If the
	// context is nil, the unsubscribe function must be called to unsubscribe. Instead of using this method to subscribe
	// to a source, use one of the eventbus.Subscribe functions to receive a channel of events.
	SubscribeUntilDone(ctx context.Context, subscriber Subscriber[T], onUnsubscribe func()) UnsubscribeFunc

	// Subscribers returns the current number of subscribers
	Subscribers() int
}

Source is a source of events.

func NewSource

func NewSource[T any]() Source[T]

NewSource returns a new Source implementation for the specified event type T

type Subscriber

type Subscriber[T any] interface {
	// Channel will be return to Subscribe calls to receive events
	Channel() <-chan T

	// Receive will be called when an event is available
	Receive(event T)

	// Close will be called when the subscriber is unsubscribed
	Close()
}

Subscriber can be notified of events of type T. Instead of using this interface directly, use one of the eventbus.Subscribe functions to receive a channel of events.

type SubscriptionFilter

type SubscriptionFilter[T, R any] func(event T) (result R, accept bool)

SubscriptionFilter can filter on events and map from an event to another type. It can also ignore events. If accept is false, the result is ignored and not sent to subscribers.

type SubscriptionMerger added in v0.5.2

type SubscriptionMerger[S any, T *S] func(into, single T) bool

SubscriptionMerger merges the second event into the first and returns true if the merge was successful. It should return false if a merge was not possible and the two individual events will be preserved and dispatched separately.

type SubscriptionOption added in v0.5.2

type SubscriptionOption[T any] func(*subscriptionOptions[T])

SubscriptionOption is used to provide options when creating a new subscription to a Source.

func WithChannel added in v0.5.2

func WithChannel[T any](channel chan T) SubscriptionOption[T]

WithChannel allows a subscriber to specify the channel that will receive events. This allows the subscriber to control the size.

func WithUnboundedChannel added in v0.5.2

func WithUnboundedChannel[T any](interval time.Duration) SubscriptionOption[T]

WithUnboundedChannel specifies that util.UnboundedChan should be used for the channel. This will allow an unbounded number of events to be received before being dispatched to subscribers.

func WithUnsubscribeHook added in v1.1.0

func WithUnsubscribeHook[T any](unsubscribeHook func()) SubscriptionOption[T]

WithUnsubscribeHook specifies a function to be called after unsubscribe

type UnsubscribeFunc

type UnsubscribeFunc func()

UnsubscribeFunc is a function that allows a subscriber to unsubscribe

func Subscribe

func Subscribe[T any](bus Source[T], options ...SubscriptionOption[T]) (<-chan T, UnsubscribeFunc)

Subscribe subscribes to events on the bus and returns a channel to receive events and an unsubscribe function.

func SubscribeUntilDone

func SubscribeUntilDone[T any](ctx context.Context, bus Source[T], options ...SubscriptionOption[T]) (<-chan T, UnsubscribeFunc)

SubscribeUntilDone subscribes to events on the bus and returns a channel to receive events and an unsubscribe function. It automatically unsubscribes when the context is done.

func SubscribeWithFilter

func SubscribeWithFilter[T, R any](bus Source[T], filter SubscriptionFilter[T, R], options ...SubscriptionOption[R]) (<-chan R, UnsubscribeFunc)

SubscribeWithFilter TODO

func SubscribeWithFilterUntilDone

func SubscribeWithFilterUntilDone[T, R any](ctx context.Context, source Source[T], filter SubscriptionFilter[T, R], options ...SubscriptionOption[R]) (<-chan R, UnsubscribeFunc)

SubscribeWithFilterUntilDone TODO

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL