Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultNumTermWatermarks is the default number of terms (watermarks) to // keep before removing the oldest one. DefaultNumTermWatermarks = 10 )
Functions ¶
func NewTxnRunnerFactory ¶
func NewTxnRunnerFactory(f WatchableDBFactory) database.TxnRunnerFactory
NewTxnRunnerFactory returns a TxnRunnerFactory for the input changestream.WatchableDB factory function. This ensures that we never pass the ability to access the change-stream into a state object. State objects should only be concerned with persistence and retrieval. Watchers are the concern of the service layer.
Types ¶
type ChangeEvent ¶
type ChangeEvent interface { // Type returns the type of change (create, update, delete). Type() ChangeType // Namespace returns the namespace of the change. This is normally the // table name. Namespace() string // Changed returns the changed value of event. This logically can be // the primary key of the row that was changed or the field of the change // that was changed. Changed() string }
ChangeEvent represents a new change set via the changestream.
type ChangeType ¶
type ChangeType int
ChangeType represents the type of change. The changes are bit flags so that they can be combined.
const ( // Create represents a new row in the database. Create ChangeType = 1 << iota // Update represents an update to an existing row in the database. Update // Delete represents a row that has been deleted from the database. Delete // All represents any change to the namespace of interest. All = Create | Update | Delete )
type EventSource ¶
type EventSource interface { // Subscribe returns a subscription that can receive events from // a change stream according to the input subscription options. Subscribe(opts ...SubscriptionOption) (Subscription, error) }
EventSource describes the ability to subscribe to a subset of events from a change stream.
type Subscription ¶
type Subscription interface { // Changes returns the channel that the subscription will receive events on. Changes() <-chan []ChangeEvent // Unsubscribe removes the subscription from the event queue. Unsubscribe() // Done provides a way to know from the consumer side if the underlying // subscription has been terminated. This is useful to know if the // event queue has been killed. Done() <-chan struct{} }
Subscription describes the ability to receive events from the event queue and unsubscribe from the queue.
type SubscriptionOption ¶
type SubscriptionOption struct {
// contains filtered or unexported fields
}
SubscriptionOption is an option that can be used to create a subscription.
func FilteredNamespace ¶
func FilteredNamespace(namespace string, changeMask ChangeType, filter func(ChangeEvent) bool) SubscriptionOption
FilteredNamespace returns a SubscriptionOption that will subscribe to the given topic and filter the events using the given function.
func Namespace ¶
func Namespace(namespace string, changeMask ChangeType) SubscriptionOption
Namespace returns a SubscriptionOption that will subscribe to the given namespace.
func (SubscriptionOption) ChangeMask ¶
func (o SubscriptionOption) ChangeMask() ChangeType
ChangeMask returns the change mask that the subscription will be for.
func (SubscriptionOption) Filter ¶
func (o SubscriptionOption) Filter() func(ChangeEvent) bool
Filter returns the filter function that the subscription will be for.
func (SubscriptionOption) Namespace ¶
func (o SubscriptionOption) Namespace() string
Namespace returns the name of the type that the subscription will tied to.
type Term ¶
type Term interface { // Changes returns the changes that are part of the term. Changes() []ChangeEvent // Done signals that the term has been completed. Empty signals that // the term was empty and no changes were processed. This is useful to // help determine if more changes are available to be processed. // Abort is used to signal that setting the empty value should be aborted // and the term should be considered incomplete and done. Done(empty bool, abort <-chan struct{}) }
Term represents a set of changes that are bounded by a coalesced set. The notion of a term are a set of changes that can be run one at a time asynchronously. Allowing changes within a given term to be signaled of a change independently of one another. Once a change within a term has been completed, only at that point is another change processed, until all changes are exhausted.
type WatchableDB ¶
type WatchableDB interface { database.TxnRunner EventSource }
WatchableDB describes the ability to run transactions against a database and to subscribe to events emitted from that same source.
type WatchableDBFactory ¶
type WatchableDBFactory = func() (WatchableDB, error)
WatchableDBFactory provides a function for getting a database.TxnRunner or an error.
func NewWatchableDBFactoryForNamespace ¶
func NewWatchableDBFactoryForNamespace[T WatchableDB](f func(string) (T, error), ns string) WatchableDBFactory
NewWatchableDBFactoryForNamespace returns a WatchableDBFactory for the input namespaced factory function and namespace.
type WatchableDBGetter ¶
type WatchableDBGetter interface {
GetWatchableDB(string) (WatchableDB, error)
}
WatchableDBGetter describes the ability to get a WatchableDB for a particular namespace.