Documentation
¶
Index ¶
- func Manifold(config ManifoldConfig) dependency.Manifold
- type Collector
- type DBGetter
- type FileNotifier
- type FileNotifyWatcher
- type ManifoldConfig
- type Metrics
- type MetricsCollectorFn
- type NamespaceCollector
- func (c *NamespaceCollector) ChangesCountObserve(val int)
- func (c *NamespaceCollector) ChangesRequestDurationObserve(val float64)
- func (c *NamespaceCollector) DispatchDurationObserve(val float64, failed bool)
- func (c *NamespaceCollector) SubscriptionsDec()
- func (c *NamespaceCollector) SubscriptionsInc()
- func (c *NamespaceCollector) WatermarkInsertsInc()
- func (c *NamespaceCollector) WatermarkRetriesInc()
- type NamespaceMetrics
- type WatchableDB
- func (w *WatchableDB) Kill()
- func (w *WatchableDB) StdTxn(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
- func (w *WatchableDB) Subscribe(opts ...changestream.SubscriptionOption) (changestream.Subscription, error)
- func (w *WatchableDB) Txn(ctx context.Context, fn func(context.Context, *sqlair.TX) error) error
- func (w *WatchableDB) Wait() error
- type WatchableDBFn
- type WatchableDBWorker
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Manifold ¶
func Manifold(config ManifoldConfig) dependency.Manifold
Manifold returns a dependency manifold that runs the changestream worker, using the resource names defined in the supplied config.
Types ¶
type Collector ¶
type Collector struct { // Stream metrics. WatermarkInserts *prometheus.CounterVec WatermarkRetries *prometheus.CounterVec ChangesRequestDuration *prometheus.HistogramVec ChangesCount *prometheus.HistogramVec // EventMultiplexer metrics. Subscriptions *prometheus.GaugeVec DispatchDuration *prometheus.HistogramVec }
Collector defines a prometheus collector for the dbaccessor.
func NewMetricsCollector ¶
func NewMetricsCollector() *Collector
NewMetricsCollector returns a new Collector.
func (*Collector) Collect ¶
func (c *Collector) Collect(ch chan<- prometheus.Metric)
Collect is part of the prometheus.Collector interface.
func (*Collector) Describe ¶
func (c *Collector) Describe(ch chan<- *prometheus.Desc)
Describe is part of the prometheus.Collector interface.
func (*Collector) ForNamespace ¶
func (c *Collector) ForNamespace(namespace string) *NamespaceCollector
type DBGetter ¶
type DBGetter = coredatabase.DBGetter
DBGetter describes the ability to supply a sql.DB reference for a particular database.
type FileNotifier ¶
type FileNotifier interface { // Changes returns a channel if a file was created or deleted. Changes() (<-chan bool, error) }
FileNotifier represents a way to watch for changes in a namespace folder directory.
type FileNotifyWatcher ¶
type FileNotifyWatcher = filenotifywatcher.FileNotifyWatcher
FileNotifyWatcher is the interface that the worker uses to interact with the file notify watcher.
type ManifoldConfig ¶
type ManifoldConfig struct { AgentName string DBAccessor string FileNotifyWatcher string Clock clock.Clock Logger logger.Logger NewMetricsCollector MetricsCollectorFn PrometheusRegisterer prometheus.Registerer NewWatchableDB WatchableDBFn }
ManifoldConfig defines the names of the manifolds on which a Manifold will depend.
func (ManifoldConfig) Validate ¶
func (cfg ManifoldConfig) Validate() error
type Metrics ¶
type Metrics interface { ForNamespace(string) *NamespaceCollector Describe(ch chan<- *prometheus.Desc) Collect(ch chan<- prometheus.Metric) }
Metrics is a wrapper for the Collector interface of the prometheus package, extended with a ForNamespace(string) method that returns a collector of changestream metrics for a given namespace.
type MetricsCollectorFn ¶
type MetricsCollectorFn = func() *Collector
MetricsCollectorFn is an alias function that allows the creation of a metrics collector.
type NamespaceCollector ¶
NamespaceCollector is a prometheus collector extended with a Namespace argument used in the metric labels.
func (*NamespaceCollector) ChangesCountObserve ¶
func (c *NamespaceCollector) ChangesCountObserve(val int)
ChangesCountObserve records the number of changes returned by the changes request (see worker/changestream/stream/stream.go readChanges()).
func (*NamespaceCollector) ChangesRequestDurationObserve ¶
func (c *NamespaceCollector) ChangesRequestDurationObserve(val float64)
ChangesRequestDurationObserve records a duration of the changes request (see worker/changestream/stream/stream.go readChanges()).
func (*NamespaceCollector) DispatchDurationObserve ¶
func (c *NamespaceCollector) DispatchDurationObserve(val float64, failed bool)
DispatchDurationObsere records the duration of the events dispatch method (see worker/changestream/eventmultiplexer/eventmultiplexer.go dispatchSet()).
func (*NamespaceCollector) SubscriptionsDec ¶
func (c *NamespaceCollector) SubscriptionsDec()
SubscriptionsDec decrements the number of current subscriptions.
func (*NamespaceCollector) SubscriptionsInc ¶
func (c *NamespaceCollector) SubscriptionsInc()
SubscriptionsInc increments the number of current subscriptions.
func (*NamespaceCollector) WatermarkInsertsInc ¶
func (c *NamespaceCollector) WatermarkInsertsInc()
WatermarkInsertsInc increments the watermark inserts counter.
func (*NamespaceCollector) WatermarkRetriesInc ¶
func (c *NamespaceCollector) WatermarkRetriesInc()
WatermarkRetriesInc increments the watermark insertion retries counter.
type NamespaceMetrics ¶
type NamespaceMetrics interface { // Stream metrics. WatermarkInsertsInc() WatermarkRetriesInc() ChangesRequestDurationObserve(val float64) ChangesCountObserve(val int) // EventMultiplexer metrics. SubscriptionsInc() SubscriptionsDec() DispatchDurationObserve(val float64, failed bool) }
NamespaceMetrics is a set of methods to be used in changestream to collect prometheus metrics for a given namespace.
type WatchableDB ¶
type WatchableDB struct {
// contains filtered or unexported fields
}
WatchableDB is a worker that is responsible for managing the lifecycle of both the DBStream and the EventQueue.
func (*WatchableDB) Kill ¶
func (w *WatchableDB) Kill()
Kill is part of the worker.Worker interface.
func (*WatchableDB) StdTxn ¶
StdTxn manages the application of a standard library transaction within which the input function is executed. The input context can be used by the caller to cancel this process.
func (*WatchableDB) Subscribe ¶
func (w *WatchableDB) Subscribe(opts ...changestream.SubscriptionOption) (changestream.Subscription, error)
Subscribe returns a subscription for the input options. The subscription is then used to drive watchers.
func (*WatchableDB) Txn ¶
Txn manages the application of a SQLair transaction within which the input function is executed. See https://github.com/canonical/sqlair. The input context can be used by the caller to cancel this process.
func (*WatchableDB) Wait ¶
func (w *WatchableDB) Wait() error
Wait is part of the worker.Worker interface.
type WatchableDBFn ¶
type WatchableDBFn = func(string, coredatabase.TxnRunner, FileNotifier, clock.Clock, NamespaceMetrics, logger.Logger) (WatchableDBWorker, error)
WatchableDBFn is an alias function that allows the creation of EventQueueWorker.
type WatchableDBWorker ¶
type WatchableDBWorker interface { worker.Worker changestream.WatchableDB }
WatchableDBWorker is the interface that the worker uses to interact with the watchable database.
func NewWatchableDB ¶
func NewWatchableDB( tag string, db coredatabase.TxnRunner, fileNotifier FileNotifier, clock clock.Clock, metrics NamespaceMetrics, logger logger.Logger, ) (WatchableDBWorker, error)
NewWatchableDB creates a new WatchableDB.
type WorkerConfig ¶
type WorkerConfig struct { AgentTag string DBGetter DBGetter FileNotifyWatcher FileNotifyWatcher Clock clock.Clock Logger logger.Logger Metrics Metrics NewWatchableDB WatchableDBFn }
WorkerConfig encapsulates the configuration options for the changestream worker.
func (*WorkerConfig) Validate ¶
func (c *WorkerConfig) Validate() error
Validate ensures that the config values are valid.