changestream

package
v0.0.0-...-e9a5067 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: AGPL-3.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultNumTermWatermarks is the default number of terms (watermarks) to
	// keep before removing the oldest one.
	DefaultNumTermWatermarks = 10
)

Functions

func NewTxnRunnerFactory

func NewTxnRunnerFactory(f WatchableDBFactory) database.TxnRunnerFactory

NewTxnRunnerFactory returns a TxnRunnerFactory for the input changestream.WatchableDB factory function. This ensures that we never pass the ability to access the change-stream into a state object. State objects should only be concerned with persistence and retrieval. Watchers are the concern of the service layer.

Types

type ChangeEvent

type ChangeEvent interface {
	// Type returns the type of change (create, update, delete).
	Type() ChangeType
	// Namespace returns the namespace of the change. This is normally the
	// table name.
	Namespace() string
	// Changed returns the changed value of event. This logically can be
	// the primary key of the row that was changed or the field of the change
	// that was changed.
	Changed() string
}

ChangeEvent represents a new change set via the changestream.

type ChangeType

type ChangeType int

ChangeType represents the type of change. The changes are bit flags so that they can be combined.

const (
	// Create represents a new row in the database.
	Create ChangeType = 1 << iota
	// Update represents an update to an existing row in the database.
	Update
	// Delete represents a row that has been deleted from the database.
	Delete
	// All represents any change to the namespace of interest.
	All = Create | Update | Delete
)

type EventSource

type EventSource interface {
	// Subscribe returns a subscription that can receive events from
	// a change stream according to the input subscription options.
	Subscribe(opts ...SubscriptionOption) (Subscription, error)
}

EventSource describes the ability to subscribe to a subset of events from a change stream.

type Subscription

type Subscription interface {
	// Changes returns the channel that the subscription will receive events on.
	Changes() <-chan []ChangeEvent

	// Unsubscribe removes the subscription from the event queue.
	Unsubscribe()

	// Done provides a way to know from the consumer side if the underlying
	// subscription has been terminated. This is useful to know if the
	// event queue has been killed.
	Done() <-chan struct{}
}

Subscription describes the ability to receive events from the event queue and unsubscribe from the queue.

type SubscriptionOption

type SubscriptionOption struct {
	// contains filtered or unexported fields
}

SubscriptionOption is an option that can be used to create a subscription.

func FilteredNamespace

func FilteredNamespace(namespace string, changeMask ChangeType, filter func(ChangeEvent) bool) SubscriptionOption

FilteredNamespace returns a SubscriptionOption that will subscribe to the given topic and filter the events using the given function.

func Namespace

func Namespace(namespace string, changeMask ChangeType) SubscriptionOption

Namespace returns a SubscriptionOption that will subscribe to the given namespace.

func (SubscriptionOption) ChangeMask

func (o SubscriptionOption) ChangeMask() ChangeType

ChangeMask returns the change mask that the subscription will be for.

func (SubscriptionOption) Filter

func (o SubscriptionOption) Filter() func(ChangeEvent) bool

Filter returns the filter function that the subscription will be for.

func (SubscriptionOption) Namespace

func (o SubscriptionOption) Namespace() string

Namespace returns the name of the type that the subscription will tied to.

type Term

type Term interface {
	// Changes returns the changes that are part of the term.
	Changes() []ChangeEvent

	// Done signals that the term has been completed. Empty signals that
	// the term was empty and no changes were processed. This is useful to
	// help determine if more changes are available to be processed.
	// Abort is used to signal that setting the empty value should be aborted
	// and the term should be considered incomplete and done.
	Done(empty bool, abort <-chan struct{})
}

Term represents a set of changes that are bounded by a coalesced set. The notion of a term are a set of changes that can be run one at a time asynchronously. Allowing changes within a given term to be signaled of a change independently of one another. Once a change within a term has been completed, only at that point is another change processed, until all changes are exhausted.

type WatchableDB

type WatchableDB interface {
	database.TxnRunner
	EventSource
}

WatchableDB describes the ability to run transactions against a database and to subscribe to events emitted from that same source.

type WatchableDBFactory

type WatchableDBFactory = func() (WatchableDB, error)

WatchableDBFactory provides a function for getting a database.TxnRunner or an error.

func NewWatchableDBFactoryForNamespace

func NewWatchableDBFactoryForNamespace[T WatchableDB](f func(string) (T, error), ns string) WatchableDBFactory

NewWatchableDBFactoryForNamespace returns a WatchableDBFactory for the input namespaced factory function and namespace.

type WatchableDBGetter

type WatchableDBGetter interface {
	GetWatchableDB(string) (WatchableDB, error)
}

WatchableDBGetter describes the ability to get a WatchableDB for a particular namespace.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL