changestream

package
v0.0.0-...-a81527e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2025 License: AGPL-3.0 Imports: 3 Imported by: 29

Documentation

Index

Constants

View Source
const All = create | update | delete

All returns all the types of changes that can be represented.

View Source
const Changed = create | update

Changed returns if the underlying type has changed. This will encompass if a row has been created or updated. There is no distinction between the two types of changes, only that the underlying type has changed.

View Source
const Deleted = delete

Deleted returns if the underlying type has been deleted. This will encompass if a row has been deleted from the database.

Variables

View Source
var (
	// DefaultNumTermWatermarks is the default number of terms (watermarks) to
	// keep before removing the oldest one.
	DefaultNumTermWatermarks = 10
)

Functions

func NewTxnRunnerFactory

func NewTxnRunnerFactory(f WatchableDBFactory) database.TxnRunnerFactory

NewTxnRunnerFactory returns a TxnRunnerFactory for the input changestream.WatchableDB factory function. This ensures that we never pass the ability to access the change-stream into a state object. State objects should only be concerned with persistence and retrieval. Watchers are the concern of the service layer.

Types

type ChangeEvent

type ChangeEvent interface {
	// Type returns the type of change (create, update, delete).
	Type() ChangeType
	// Namespace returns the namespace of the change. This is normally the
	// table name.
	Namespace() string
	// Changed returns the changed value of event. This logically can be
	// the primary key of the row that was changed or the field of the change
	// that was changed.
	Changed() string
}

ChangeEvent represents a new change set via the changestream.

type ChangeType

type ChangeType int

ChangeType represents the type of change. The changes are bit flags so that they can be combined.

type EventSource

type EventSource interface {
	// Subscribe returns a subscription that can receive events from
	// a change stream according to the input subscription options.
	Subscribe(opts ...SubscriptionOption) (Subscription, error)
}

EventSource describes the ability to subscribe to a subset of events from a change stream.

type Subscription

type Subscription interface {
	// Changes returns the channel that the subscription will receive events on.
	Changes() <-chan []ChangeEvent

	// Unsubscribe removes the subscription from the event queue.
	Unsubscribe()

	// Done provides a way to know from the consumer side if the underlying
	// subscription has been terminated. This is useful to know if the
	// event queue has been killed.
	Done() <-chan struct{}
}

Subscription describes the ability to receive events from the event queue and unsubscribe from the queue.

type SubscriptionOption

type SubscriptionOption struct {
	// contains filtered or unexported fields
}

SubscriptionOption is an option that can be used to create a subscription.

func FilteredNamespace

func FilteredNamespace(namespace string, changeMask ChangeType, filter func(ChangeEvent) bool) SubscriptionOption

FilteredNamespace returns a SubscriptionOption that will subscribe to the given topic and filter the events using the given function.

func Namespace

func Namespace(namespace string, changeMask ChangeType) SubscriptionOption

Namespace returns a SubscriptionOption that will subscribe to the given namespace.

func (SubscriptionOption) ChangeMask

func (o SubscriptionOption) ChangeMask() ChangeType

ChangeMask returns the change mask that the subscription will be for.

func (SubscriptionOption) Filter

func (o SubscriptionOption) Filter() func(ChangeEvent) bool

Filter returns the filter function that the subscription will be for.

func (SubscriptionOption) Namespace

func (o SubscriptionOption) Namespace() string

Namespace returns the name of the type that the subscription will tied to.

type Term

type Term interface {
	// Changes returns the changes that are part of the term.
	Changes() []ChangeEvent

	// Done signals that the term has been completed. Empty signals that
	// the term was empty and no changes were processed. This is useful to
	// help determine if more changes are available to be processed.
	// Abort is used to signal that setting the empty value should be aborted
	// and the term should be considered incomplete and done.
	Done(empty bool, abort <-chan struct{})
}

Term represents a set of changes that are bounded by a coalesced set. The notion of a term are a set of changes that can be run one at a time asynchronously. Allowing changes within a given term to be signaled of a change independently of one another. Once a change within a term has been completed, only at that point is another change processed, until all changes are exhausted.

type WatchableDB

type WatchableDB interface {
	database.TxnRunner
	EventSource
}

WatchableDB describes the ability to run transactions against a database and to subscribe to events emitted from that same source.

type WatchableDBFactory

type WatchableDBFactory = func() (WatchableDB, error)

WatchableDBFactory provides a function for getting a database.TxnRunner or an error.

func NewWatchableDBFactoryForNamespace

func NewWatchableDBFactoryForNamespace[T WatchableDB](f func(string) (T, error), ns string) WatchableDBFactory

NewWatchableDBFactoryForNamespace returns a WatchableDBFactory for the input namespaced factory function and namespace.

type WatchableDBGetter

type WatchableDBGetter interface {
	GetWatchableDB(string) (WatchableDB, error)
}

WatchableDBGetter describes the ability to get a WatchableDB for a particular namespace.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL