eventsource

package
v0.0.0-...-c5633df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ErrWorkerNotStopping is returned when a worker is not stopping. We can't
	// do anything about it, so we just return the error.
	ErrWorkerNotStopping = errors.ConstError("worker not stopping")

	// ErrWorkerStopped is returned when a worker has stopped. We can't do
	// anything about it, so we just return the error.
	ErrWorkerStopped = errors.ConstError("worker stopped")
)
View Source
const ErrSubscriptionClosed = errors.ConstError("watcher subscription closed")

Variables

This section is empty.

Functions

func ConsumeInitialEvent

func ConsumeInitialEvent[T any](ctx context.Context, w Watcher[T]) (T, error)

ConsumeInitialEvent checks whether the first set of returned changes are available and returns them, otherwise it kills the worker and waits for the error and returns it.

func NewNamespaceMapperWatcher

func NewNamespaceMapperWatcher(
	base *BaseWatcher, namespace string, changeMask changestream.ChangeType, initialQuery NamespaceQuery, mapper Mapper,
) watcher.StringsWatcher

NewNamespaceMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur. Values from the namespace that change are processed by the input mapper, and based on the mapper's logic a subset of them (or none) may be emitted.

func NewNamespaceWatcher

func NewNamespaceWatcher(
	base *BaseWatcher, namespace string, changeMask changestream.ChangeType, initialQuery NamespaceQuery,
) watcher.StringsWatcher

NewNamespaceWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur. It emits the values from the namespace that changed.

Types

type Applier

type Applier[T any] func(T, T) T

Applier is a function that applies a change to a value.

type BaseWatcher

type BaseWatcher struct {
	// contains filtered or unexported fields
}

BaseWatcher encapsulates members common to all EventQueue-based watchers. It has no functionality by itself, and is intended to be embedded in other more specific watchers.

func NewBaseWatcher

func NewBaseWatcher(watchableDB changestream.WatchableDB, logger logger.Logger) *BaseWatcher

NewBaseWatcher returns a BaseWatcher constructed from the arguments.

func (*BaseWatcher) Kill

func (w *BaseWatcher) Kill()

Kill (worker.Worker) kills the watcher via its tomb.

func (*BaseWatcher) Wait

func (w *BaseWatcher) Wait() error

Wait (worker.Worker) waits for the watcher's tomb to die, and returns the error with which it was killed.

type Mapper

Mapper is a function that maps a slice of change events to another slice of change events. This allows modification or dropping of events if necessary. When zero events returned, no change will be emitted. The inverse is also possible, allowing fake events to be added to the stream.

func FilterEvents

func FilterEvents(filter func(changestream.ChangeEvent) bool) Mapper

FilterEvents drops events that do not match the filter.

type MultiWatcher

type MultiWatcher[T any] struct {
	// contains filtered or unexported fields
}

MultiWatcher implements Watcher, combining multiple Watchers.

func NewMultiNotifyWatcher

func NewMultiNotifyWatcher(ctx context.Context, watchers ...Watcher[struct{}]) (*MultiWatcher[struct{}], error)

NewMultiNotifyWatcher creates a NotifyWatcher that combines each of the NotifyWatchers passed in. Each watcher's initial event is consumed, and a single initial event is sent.

func NewMultiStringsWatcher

func NewMultiStringsWatcher(ctx context.Context, watchers ...Watcher[[]string]) (*MultiWatcher[[]string], error)

NewMultiStringsWatcher creates a strings watcher (Watcher[[]string]) that combines each of the (strings) watchers passed in. Each watcher's initial event is consumed, and a single initial event is sent.

func NewMultiWatcher

func NewMultiWatcher[T any](ctx context.Context, applier Applier[T], watchers ...Watcher[T]) (*MultiWatcher[T], error)

NewMultiWatcher creates a NotifyWatcher that combines each of the NotifyWatchers passed in. Each watcher's initial event is consumed, and a single initial event is sent. Subsequent events are not coalesced.

func (*MultiWatcher[T]) Changes

func (w *MultiWatcher[T]) Changes() <-chan T

func (*MultiWatcher[T]) Err

func (w *MultiWatcher[T]) Err() error

func (*MultiWatcher[T]) Kill

func (w *MultiWatcher[T]) Kill()

func (*MultiWatcher[T]) Stop

func (w *MultiWatcher[T]) Stop() error

func (*MultiWatcher[T]) Wait

func (w *MultiWatcher[T]) Wait() error

type NamespaceQuery

type NamespaceQuery Query[[]string]

NamespaceQuery is a function that returns the initial state of a namespace watcher.

func EmptyInitialNamespaceChanges

func EmptyInitialNamespaceChanges() NamespaceQuery

EmptyInitialNamespaceChanges returns a query that returns no initial changes.

func InitialNamespaceChanges

func InitialNamespaceChanges(selectAll string) NamespaceQuery

InitialNamespaceChanges retrieves the current state of the world from the database, as it concerns this watcher.

type NamespaceWatcher

type NamespaceWatcher struct {
	*BaseWatcher
	// contains filtered or unexported fields
}

NamespaceWatcher watches for changes in a namespace. Any time events matching the change mask occur in the namespace, the values associated with the events are emitted.

func (*NamespaceWatcher) Changes

func (w *NamespaceWatcher) Changes() <-chan []string

Changes returns the channel on which the keys for changed rows are sent to downstream consumers.

type Query

type Query[T any] func(context.Context, database.TxnRunner) (T, error)

Query is a function that returns the initial state of a watcher.

type StringsNotifyWatcher

type StringsNotifyWatcher struct {
	// contains filtered or unexported fields
}

StringsNotifyWatcher wraps a Watcher[[]string] and provides a Watcher[struct{}] interface.

func NewStringsNotifyWatcher

func NewStringsNotifyWatcher(watcher Watcher[[]string]) (*StringsNotifyWatcher, error)

NewStringsNotifyWatcher creates a new StringsNotifyWatcher.

func (*StringsNotifyWatcher) Changes

func (w *StringsNotifyWatcher) Changes() <-chan struct{}

func (*StringsNotifyWatcher) Err

func (w *StringsNotifyWatcher) Err() error

func (*StringsNotifyWatcher) Kill

func (w *StringsNotifyWatcher) Kill()

func (*StringsNotifyWatcher) Stop

func (w *StringsNotifyWatcher) Stop() error

func (*StringsNotifyWatcher) Wait

func (w *StringsNotifyWatcher) Wait() error

type ValueWatcher

type ValueWatcher struct {
	*BaseWatcher
	// contains filtered or unexported fields
}

ValueWatcher watches for events associated with a single value from a namespace. Any time the identified change value has an associated event, a notification is emitted.

func NewNamespaceNotifyMapperWatcher

func NewNamespaceNotifyMapperWatcher(
	base *BaseWatcher, namespace string, changeMask changestream.ChangeType, mapper Mapper,
) *ValueWatcher

NewNamespaceNotifyMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur.

func NewNamespaceNotifyWatcher

func NewNamespaceNotifyWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType) *ValueWatcher

NewNamespaceNotifyWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur.

func NewValueMapperWatcher

func NewValueMapperWatcher(
	base *BaseWatcher, namespace, changeValue string, changeMask changestream.ChangeType, mapper Mapper,
) *ValueWatcher

NewValueMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when mapper accepts the change-log events for a specific changeValue from the input namespace.

func NewValueWatcher

func NewValueWatcher(
	base *BaseWatcher, namespace, changeValue string, changeMask changestream.ChangeType,
) *ValueWatcher

NewValueWatcher returns a new watcher that receives changes from the input base watcher's db/queue when change-log events occur for a specific changeValue from the input namespace.

func (*ValueWatcher) Changes

func (w *ValueWatcher) Changes() <-chan struct{}

Changes returns the channel on which notifications are sent when the watched database row changes.

type Watcher

type Watcher[T any] interface {
	worker.Worker
	Changes() <-chan T
}

Watcher defines a generic watcher over a set of changes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL