eventmultiplexer

package
v0.0.0-...-1592773 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultSignalTimeout is the default timeout for signalling a change to a
	// subscriber.
	// Failure to consume the changes within this time will result in the
	// subscriber being unsubscribed.
	DefaultSignalTimeout = time.Second * 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChangeSet

type ChangeSet = []changestream.ChangeEvent

ChangeSet represents a set of changes.

type EventMultiplexer

type EventMultiplexer struct {
	// contains filtered or unexported fields
}

EventMultiplexer defines a way to receive streamed terms for changes that can be multiplexed to subscriptions. The event queue allows consumers to subscribe via callbacks to the event queue. This is a lockless implementation, all subscriptions and changes are serialized in the main loop. Dispatching is randomized to ensure that subscriptions don't depend on ordering. The subscriptions can be associated with different subscription options, which provide filtering when dispatching. Unsubscribing is provided per subscription, which is done asynchronously.

func New

func New(stream Stream, clock clock.Clock, metrics MetricsCollector, logger logger.Logger) (*EventMultiplexer, error)

New creates a new EventMultiplexer that will use the Stream for events.

func (*EventMultiplexer) Kill

func (e *EventMultiplexer) Kill()

Kill stops the event queue.

func (*EventMultiplexer) Report

func (e *EventMultiplexer) Report() map[string]any

Report returns a map of the current state of the event queue. This is used by the engine report.

func (*EventMultiplexer) Subscribe

Subscribe creates a new subscription to the event queue. Options can be provided to allow filter during the dispatching phase.

func (*EventMultiplexer) Wait

func (e *EventMultiplexer) Wait() error

Wait waits for the event queue to stop.

type MetricsCollector

type MetricsCollector interface {
	SubscriptionsInc()
	SubscriptionsDec()
	DispatchDurationObserve(val float64, failed bool)
}

MetricsCollector represents the metrics methods called.

type Stream

type Stream interface {
	// Terms returns a channel for a given namespace (database) that returns
	// a set of terms. The notion of terms are a set of changes that can be
	// run one at a time asynchronously. Allowing changes within a given
	// term to be signaled of a change independently from one another.
	// Once a change within a term has been completed, only at that point
	// is another change processed, until all changes are exhausted.
	Terms() <-chan changestream.Term

	// Dying returns a channel that is closed when the stream is dying.
	Dying() <-chan struct{}
}

Stream represents a way to get change events as set of terms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL