changestream

package
v0.0.0-...-84dfa8d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2025 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Manifold

func Manifold(config ManifoldConfig) dependency.Manifold

Manifold returns a dependency manifold that runs the changestream worker, using the resource names defined in the supplied config.

Types

type Collector

type Collector struct {
	// Stream metrics.
	WatermarkInserts       *prometheus.CounterVec
	WatermarkRetries       *prometheus.CounterVec
	ChangesRequestDuration *prometheus.HistogramVec
	ChangesCount           *prometheus.HistogramVec
	// EventMultiplexer metrics.
	Subscriptions    *prometheus.GaugeVec
	DispatchDuration *prometheus.HistogramVec
}

Collector defines a prometheus collector for the dbaccessor.

func NewMetricsCollector

func NewMetricsCollector() *Collector

NewMetricsCollector returns a new Collector.

func (*Collector) Collect

func (c *Collector) Collect(ch chan<- prometheus.Metric)

Collect is part of the prometheus.Collector interface.

func (*Collector) Describe

func (c *Collector) Describe(ch chan<- *prometheus.Desc)

Describe is part of the prometheus.Collector interface.

func (*Collector) ForNamespace

func (c *Collector) ForNamespace(namespace string) *NamespaceCollector

type DBGetter

type DBGetter = coredatabase.DBGetter

DBGetter describes the ability to supply a sql.DB reference for a particular database.

type FileNotifier

type FileNotifier interface {
	// Changes returns a channel if a file was created or deleted.
	Changes() (<-chan bool, error)
}

FileNotifier represents a way to watch for changes in a namespace folder directory.

type FileNotifyWatcher

type FileNotifyWatcher = filenotifywatcher.FileNotifyWatcher

FileNotifyWatcher is the interface that the worker uses to interact with the file notify watcher.

type ManifoldConfig

type ManifoldConfig struct {
	AgentName         string
	DBAccessor        string
	FileNotifyWatcher string

	Clock                clock.Clock
	Logger               logger.Logger
	NewMetricsCollector  MetricsCollectorFn
	PrometheusRegisterer prometheus.Registerer
	NewWatchableDB       WatchableDBFn
}

ManifoldConfig defines the names of the manifolds on which a Manifold will depend.

func (ManifoldConfig) Validate

func (cfg ManifoldConfig) Validate() error

type Metrics

type Metrics interface {
	ForNamespace(string) *NamespaceCollector
	Describe(ch chan<- *prometheus.Desc)
	Collect(ch chan<- prometheus.Metric)
}

Metrics is a wrapper for the Collector interface of the prometheus package, extended with a ForNamespace(string) method that returns a collector of changestream metrics for a given namespace.

type MetricsCollectorFn

type MetricsCollectorFn = func() *Collector

MetricsCollectorFn is an alias function that allows the creation of a metrics collector.

type NamespaceCollector

type NamespaceCollector struct {
	*Collector
	Namespace string
}

NamespaceCollector is a prometheus collector extended with a Namespace argument used in the metric labels.

func (*NamespaceCollector) ChangesCountObserve

func (c *NamespaceCollector) ChangesCountObserve(val int)

ChangesCountObserve records the number of changes returned by the changes request (see worker/changestream/stream/stream.go readChanges()).

func (*NamespaceCollector) ChangesRequestDurationObserve

func (c *NamespaceCollector) ChangesRequestDurationObserve(val float64)

ChangesRequestDurationObserve records a duration of the changes request (see worker/changestream/stream/stream.go readChanges()).

func (*NamespaceCollector) DispatchDurationObserve

func (c *NamespaceCollector) DispatchDurationObserve(val float64, failed bool)

DispatchDurationObsere records the duration of the events dispatch method (see worker/changestream/eventmultiplexer/eventmultiplexer.go dispatchSet()).

func (*NamespaceCollector) SubscriptionsDec

func (c *NamespaceCollector) SubscriptionsDec()

SubscriptionsDec decrements the number of current subscriptions.

func (*NamespaceCollector) SubscriptionsInc

func (c *NamespaceCollector) SubscriptionsInc()

SubscriptionsInc increments the number of current subscriptions.

func (*NamespaceCollector) WatermarkInsertsInc

func (c *NamespaceCollector) WatermarkInsertsInc()

WatermarkInsertsInc increments the watermark inserts counter.

func (*NamespaceCollector) WatermarkRetriesInc

func (c *NamespaceCollector) WatermarkRetriesInc()

WatermarkRetriesInc increments the watermark insertion retries counter.

type NamespaceMetrics

type NamespaceMetrics interface {
	// Stream metrics.
	WatermarkInsertsInc()
	WatermarkRetriesInc()
	ChangesRequestDurationObserve(val float64)
	ChangesCountObserve(val int)
	// EventMultiplexer metrics.
	SubscriptionsInc()
	SubscriptionsDec()
	DispatchDurationObserve(val float64, failed bool)
}

NamespaceMetrics is a set of methods to be used in changestream to collect prometheus metrics for a given namespace.

type WatchableDB

type WatchableDB struct {
	// contains filtered or unexported fields
}

WatchableDB is a worker that is responsible for managing the lifecycle of both the DBStream and the EventQueue.

func (*WatchableDB) Kill

func (w *WatchableDB) Kill()

Kill is part of the worker.Worker interface.

func (*WatchableDB) StdTxn

func (w *WatchableDB) StdTxn(ctx context.Context, fn func(context.Context, *sql.Tx) error) error

StdTxn manages the application of a standard library transaction within which the input function is executed. The input context can be used by the caller to cancel this process.

func (*WatchableDB) Subscribe

Subscribe returns a subscription for the input options. The subscription is then used to drive watchers.

func (*WatchableDB) Txn

func (w *WatchableDB) Txn(ctx context.Context, fn func(context.Context, *sqlair.TX) error) error

Txn manages the application of a SQLair transaction within which the input function is executed. See https://github.com/canonical/sqlair. The input context can be used by the caller to cancel this process.

func (*WatchableDB) Wait

func (w *WatchableDB) Wait() error

Wait is part of the worker.Worker interface.

type WatchableDBFn

WatchableDBFn is an alias function that allows the creation of EventQueueWorker.

type WatchableDBWorker

type WatchableDBWorker interface {
	worker.Worker
	changestream.WatchableDB
}

WatchableDBWorker is the interface that the worker uses to interact with the watchable database.

func NewWatchableDB

func NewWatchableDB(
	tag string,
	db coredatabase.TxnRunner,
	fileNotifier FileNotifier,
	clock clock.Clock,
	metrics NamespaceMetrics,
	logger logger.Logger,
) (WatchableDBWorker, error)

NewWatchableDB creates a new WatchableDB.

type WorkerConfig

type WorkerConfig struct {
	AgentTag          string
	DBGetter          DBGetter
	FileNotifyWatcher FileNotifyWatcher
	Clock             clock.Clock
	Logger            logger.Logger
	Metrics           Metrics
	NewWatchableDB    WatchableDBFn
}

WorkerConfig encapsulates the configuration options for the changestream worker.

func (*WorkerConfig) Validate

func (c *WorkerConfig) Validate() error

Validate ensures that the config values are valid.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL