Documentation ¶
Index ¶
- Constants
- func ConsumeInitialEvent[T any](ctx context.Context, w Watcher[T]) (T, error)
- func NewNamespaceMapperWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType, ...) watcher.StringsWatcher
- func NewNamespaceWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType, ...) watcher.StringsWatcher
- type Applier
- type BaseWatcher
- type Mapper
- type MultiWatcher
- func NewMultiNotifyWatcher(ctx context.Context, watchers ...Watcher[struct{}]) (*MultiWatcher[struct{}], error)
- func NewMultiStringsWatcher(ctx context.Context, watchers ...Watcher[[]string]) (*MultiWatcher[[]string], error)
- func NewMultiWatcher[T any](ctx context.Context, applier Applier[T], watchers ...Watcher[T]) (*MultiWatcher[T], error)
- type NamespaceQuery
- type NamespaceWatcher
- type Query
- type StringsNotifyWatcher
- type ValueWatcher
- func NewNamespaceNotifyMapperWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType, ...) *ValueWatcher
- func NewNamespaceNotifyWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType) *ValueWatcher
- func NewValueMapperWatcher(base *BaseWatcher, namespace, changeValue string, ...) *ValueWatcher
- func NewValueWatcher(base *BaseWatcher, namespace, changeValue string, ...) *ValueWatcher
- type Watcher
Constants ¶
const ( // ErrWorkerNotStopping is returned when a worker is not stopping. We can't // do anything about it, so we just return the error. ErrWorkerNotStopping = errors.ConstError("worker not stopping") // ErrWorkerStopped is returned when a worker has stopped. We can't do // anything about it, so we just return the error. ErrWorkerStopped = errors.ConstError("worker stopped") )
const ErrSubscriptionClosed = errors.ConstError("watcher subscription closed")
Variables ¶
This section is empty.
Functions ¶
func ConsumeInitialEvent ¶
ConsumeInitialEvent checks whether the first set of returned changes are available and returns them, otherwise it kills the worker and waits for the error and returns it.
func NewNamespaceMapperWatcher ¶
func NewNamespaceMapperWatcher( base *BaseWatcher, namespace string, changeMask changestream.ChangeType, initialQuery NamespaceQuery, mapper Mapper, ) watcher.StringsWatcher
NewNamespaceMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur. Values from the namespace that change are processed by the input mapper, and based on the mapper's logic a subset of them (or none) may be emitted.
func NewNamespaceWatcher ¶
func NewNamespaceWatcher( base *BaseWatcher, namespace string, changeMask changestream.ChangeType, initialQuery NamespaceQuery, ) watcher.StringsWatcher
NewNamespaceWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur. It emits the values from the namespace that changed.
Types ¶
type Applier ¶
type Applier[T any] func(T, T) T
Applier is a function that applies a change to a value.
type BaseWatcher ¶
type BaseWatcher struct {
// contains filtered or unexported fields
}
BaseWatcher encapsulates members common to all EventQueue-based watchers. It has no functionality by itself, and is intended to be embedded in other more specific watchers.
func NewBaseWatcher ¶
func NewBaseWatcher(watchableDB changestream.WatchableDB, logger logger.Logger) *BaseWatcher
NewBaseWatcher returns a BaseWatcher constructed from the arguments.
func (*BaseWatcher) Kill ¶
func (w *BaseWatcher) Kill()
Kill (worker.Worker) kills the watcher via its tomb.
func (*BaseWatcher) Wait ¶
func (w *BaseWatcher) Wait() error
Wait (worker.Worker) waits for the watcher's tomb to die, and returns the error with which it was killed.
type Mapper ¶
type Mapper func(context.Context, database.TxnRunner, []changestream.ChangeEvent) ([]changestream.ChangeEvent, error)
Mapper is a function that maps a slice of change events to another slice of change events. This allows modification or dropping of events if necessary. When zero events returned, no change will be emitted. The inverse is also possible, allowing fake events to be added to the stream.
func FilterEvents ¶
func FilterEvents(filter func(changestream.ChangeEvent) bool) Mapper
FilterEvents drops events that do not match the filter.
type MultiWatcher ¶
type MultiWatcher[T any] struct { // contains filtered or unexported fields }
MultiWatcher implements Watcher, combining multiple Watchers.
func NewMultiNotifyWatcher ¶
func NewMultiNotifyWatcher(ctx context.Context, watchers ...Watcher[struct{}]) (*MultiWatcher[struct{}], error)
NewMultiNotifyWatcher creates a NotifyWatcher that combines each of the NotifyWatchers passed in. Each watcher's initial event is consumed, and a single initial event is sent.
func NewMultiStringsWatcher ¶
func NewMultiStringsWatcher(ctx context.Context, watchers ...Watcher[[]string]) (*MultiWatcher[[]string], error)
NewMultiStringsWatcher creates a strings watcher (Watcher[[]string]) that combines each of the (strings) watchers passed in. Each watcher's initial event is consumed, and a single initial event is sent.
func NewMultiWatcher ¶
func NewMultiWatcher[T any](ctx context.Context, applier Applier[T], watchers ...Watcher[T]) (*MultiWatcher[T], error)
NewMultiWatcher creates a NotifyWatcher that combines each of the NotifyWatchers passed in. Each watcher's initial event is consumed, and a single initial event is sent. Subsequent events are not coalesced.
func (*MultiWatcher[T]) Changes ¶
func (w *MultiWatcher[T]) Changes() <-chan T
func (*MultiWatcher[T]) Err ¶
func (w *MultiWatcher[T]) Err() error
func (*MultiWatcher[T]) Kill ¶
func (w *MultiWatcher[T]) Kill()
func (*MultiWatcher[T]) Stop ¶
func (w *MultiWatcher[T]) Stop() error
func (*MultiWatcher[T]) Wait ¶
func (w *MultiWatcher[T]) Wait() error
type NamespaceQuery ¶
NamespaceQuery is a function that returns the initial state of a namespace watcher.
func EmptyInitialNamespaceChanges ¶
func EmptyInitialNamespaceChanges() NamespaceQuery
EmptyInitialNamespaceChanges returns a query that returns no initial changes.
func InitialNamespaceChanges ¶
func InitialNamespaceChanges(selectAll string) NamespaceQuery
InitialNamespaceChanges retrieves the current state of the world from the database, as it concerns this watcher.
type NamespaceWatcher ¶
type NamespaceWatcher struct { *BaseWatcher // contains filtered or unexported fields }
NamespaceWatcher watches for changes in a namespace. Any time events matching the change mask occur in the namespace, the values associated with the events are emitted.
func (*NamespaceWatcher) Changes ¶
func (w *NamespaceWatcher) Changes() <-chan []string
Changes returns the channel on which the keys for changed rows are sent to downstream consumers.
type StringsNotifyWatcher ¶
type StringsNotifyWatcher struct {
// contains filtered or unexported fields
}
StringsNotifyWatcher wraps a Watcher[[]string] and provides a Watcher[struct{}] interface.
func NewStringsNotifyWatcher ¶
func NewStringsNotifyWatcher(watcher Watcher[[]string]) (*StringsNotifyWatcher, error)
NewStringsNotifyWatcher creates a new StringsNotifyWatcher.
func (*StringsNotifyWatcher) Changes ¶
func (w *StringsNotifyWatcher) Changes() <-chan struct{}
func (*StringsNotifyWatcher) Err ¶
func (w *StringsNotifyWatcher) Err() error
func (*StringsNotifyWatcher) Kill ¶
func (w *StringsNotifyWatcher) Kill()
func (*StringsNotifyWatcher) Stop ¶
func (w *StringsNotifyWatcher) Stop() error
func (*StringsNotifyWatcher) Wait ¶
func (w *StringsNotifyWatcher) Wait() error
type ValueWatcher ¶
type ValueWatcher struct { *BaseWatcher // contains filtered or unexported fields }
ValueWatcher watches for events associated with a single value from a namespace. Any time the identified change value has an associated event, a notification is emitted.
func NewNamespaceNotifyMapperWatcher ¶
func NewNamespaceNotifyMapperWatcher( base *BaseWatcher, namespace string, changeMask changestream.ChangeType, mapper Mapper, ) *ValueWatcher
NewNamespaceNotifyMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur.
func NewNamespaceNotifyWatcher ¶
func NewNamespaceNotifyWatcher(base *BaseWatcher, namespace string, changeMask changestream.ChangeType) *ValueWatcher
NewNamespaceNotifyWatcher returns a new watcher that receives changes from the input base watcher's db/queue when changes in the namespace occur.
func NewValueMapperWatcher ¶
func NewValueMapperWatcher( base *BaseWatcher, namespace, changeValue string, changeMask changestream.ChangeType, mapper Mapper, ) *ValueWatcher
NewValueMapperWatcher returns a new watcher that receives changes from the input base watcher's db/queue when mapper accepts the change-log events for a specific changeValue from the input namespace.
func NewValueWatcher ¶
func NewValueWatcher( base *BaseWatcher, namespace, changeValue string, changeMask changestream.ChangeType, ) *ValueWatcher
NewValueWatcher returns a new watcher that receives changes from the input base watcher's db/queue when change-log events occur for a specific changeValue from the input namespace.
func (*ValueWatcher) Changes ¶
func (w *ValueWatcher) Changes() <-chan struct{}
Changes returns the channel on which notifications are sent when the watched database row changes.