Documentation ¶
Overview ¶
Package sinks provides functionality to store monitored data in different ways.
At the moment we provide sink connectors for
- PostgreSQL and flavours,
- Prometheus,
- plain JSON files,
- and RPC servers.
To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.
Index ¶
- type CmdOpts
- type DbStorageSchemaType
- type ExistingPartitionInfo
- type JSONWriter
- type MeasurementMessagePostgres
- type MultiWriter
- type PostgresWriter
- func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
- func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
- func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
- func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
- func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
- func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
- func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
- func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
- func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
- func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error
- func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error
- type PrometheusWriter
- func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
- func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
- func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
- func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)
- func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
- func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
- func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error
- func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error
- type RPCWriter
- type SyncReq
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CmdOpts ¶
type CmdOpts struct { Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"` BatchingDelay time.Duration `` /* 180-byte string literal not displayed */ Retention int `` /* 135-byte string literal not displayed */ RealDbnameField string `` /* 151-byte string literal not displayed */ SystemIdentifierField string `` /* 169-byte string literal not displayed */ }
CmdOpts specifies the storage configuration to store metrics measurements
type DbStorageSchemaType ¶
type DbStorageSchemaType int
const ( DbStorageSchemaPostgres DbStorageSchemaType = iota DbStorageSchemaTimescale )
type ExistingPartitionInfo ¶
type JSONWriter ¶
type JSONWriter struct {
// contains filtered or unexported fields
}
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
func NewJSONWriter ¶
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (*JSONWriter) SyncMetric ¶
func (jw *JSONWriter) SyncMetric(_, _, _ string) error
func (*JSONWriter) Write ¶
func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error
type MultiWriter ¶
MultiWriter ensures the simultaneous storage of data in several storages.
func NewMultiWriter ¶
func NewMultiWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error)
NewMultiWriter creates and returns new instance of MultiWriter struct.
func (*MultiWriter) AddWriter ¶
func (mw *MultiWriter) AddWriter(w Writer)
func (*MultiWriter) SyncMetrics ¶
func (mw *MultiWriter) SyncMetrics(dbUnique, metricName, op string) (err error)
func (*MultiWriter) WriteMeasurements ¶
func (mw *MultiWriter) WriteMeasurements(ctx context.Context, storageCh <-chan []metrics.MeasurementEnvelope)
type PostgresWriter ¶
type PostgresWriter struct {
// contains filtered or unexported fields
}
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
func NewPostgresWriter ¶
func NewWriterFromPostgresConn ¶
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)
func (*PostgresWriter) AddDBUniqueMetricToListingTable ¶
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (*PostgresWriter) DropOldTimePartitions ¶
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)
func (*PostgresWriter) EnsureBuiltinMetricDummies ¶
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (*PostgresWriter) EnsureMetricDbnameTime ¶
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)
func (*PostgresWriter) EnsureMetricDummy ¶
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (*PostgresWriter) EnsureMetricTime ¶
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error
EnsureMetricTime creates special partitions if Timescale used for realtime metrics
func (*PostgresWriter) EnsureMetricTimescale ¶
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)
func (*PostgresWriter) GetOldTimePartitions ¶
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)
func (*PostgresWriter) ReadMetricSchemaType ¶
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (*PostgresWriter) SyncMetric ¶
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (*PostgresWriter) Write ¶
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error
Write send the measurements to the cache channel
type PrometheusWriter ¶
type PrometheusWriter struct { PrometheusNamespace string // contains filtered or unexported fields }
PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
func NewPrometheusWriter ¶
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (*PrometheusWriter) Collect ¶
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
func (*PrometheusWriter) Describe ¶
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
func (*PrometheusWriter) MetricStoreMessageToPromMetrics ¶
func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
func (*PrometheusWriter) PromAsyncCacheAddMetricData ¶
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)
func (*PrometheusWriter) PromAsyncCacheInitIfRequired ¶
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny ¶
func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
func (*PrometheusWriter) SyncMetric ¶
func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error
func (*PrometheusWriter) Write ¶
func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error
type RPCWriter ¶
type RPCWriter struct {
// contains filtered or unexported fields
}
RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.