Documentation
¶
Index ¶
Constants ¶
const ( // DefaultSignalTimeout is the default timeout for signalling a change to a // subscriber. // Failure to consume the changes within this time will result in the // subscriber being unsubscribed. DefaultSignalTimeout = time.Second * 10 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventMultiplexer ¶
type EventMultiplexer struct {
// contains filtered or unexported fields
}
EventMultiplexer defines a way to receive streamed terms for changes that can be multiplexed to subscriptions. The event queue allows consumers to subscribe via callbacks to the event queue. This is a lockless implementation, all subscriptions and changes are serialized in the main loop. Dispatching is randomized to ensure that subscriptions don't depend on ordering. The subscriptions can be associated with different subscription options, which provide filtering when dispatching. Unsubscribing is provided per subscription, which is done asynchronously.
func New ¶
func New(stream Stream, clock clock.Clock, metrics MetricsCollector, logger logger.Logger) (*EventMultiplexer, error)
New creates a new EventMultiplexer that will use the Stream for events.
func (*EventMultiplexer) Report ¶
func (e *EventMultiplexer) Report() map[string]any
Report returns a map of the current state of the event queue. This is used by the engine report.
func (*EventMultiplexer) Subscribe ¶
func (e *EventMultiplexer) Subscribe(opts ...changestream.SubscriptionOption) (changestream.Subscription, error)
Subscribe creates a new subscription to the event queue. Options can be provided to allow filter during the dispatching phase.
func (*EventMultiplexer) Wait ¶
func (e *EventMultiplexer) Wait() error
Wait waits for the event queue to stop.
type MetricsCollector ¶
type MetricsCollector interface { SubscriptionsInc() SubscriptionsDec() DispatchDurationObserve(val float64, failed bool) }
MetricsCollector represents the metrics methods called.
type Stream ¶
type Stream interface { // Terms returns a channel for a given namespace (database) that returns // a set of terms. The notion of terms are a set of changes that can be // run one at a time asynchronously. Allowing changes within a given // term to be signaled of a change independently from one another. // Once a change within a term has been completed, only at that point // is another change processed, until all changes are exhausted. Terms() <-chan changestream.Term // Dying returns a channel that is closed when the stream is dying. Dying() <-chan struct{} }
Stream represents a way to get change events as set of terms.