sinks

package
v3.0.0-...-871c1ed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: BSD-3-Clause Imports: 21 Imported by: 0

Documentation

Overview

Package sinks provides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CmdOpts

type CmdOpts struct {
	Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
	BatchingDelay         time.Duration `` /* 180-byte string literal not displayed */
	Retention             int           `` /* 135-byte string literal not displayed */
	RealDbnameField       string        `` /* 151-byte string literal not displayed */
	SystemIdentifierField string        `` /* 169-byte string literal not displayed */
}

CmdOpts specifies the storage configuration to store metrics measurements

type DbStorageSchemaType

type DbStorageSchemaType int
const (
	DbStorageSchemaPostgres DbStorageSchemaType = iota
	DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
	StartTime time.Time
	EndTime   time.Time
}

type JSONWriter

type JSONWriter struct {
	// contains filtered or unexported fields
}

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _, _ string) error

func (*JSONWriter) Write

func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
	Time    time.Time
	DBName  string
	Metric  string
	Data    map[string]any
	TagData map[string]any
}

type MultiWriter

type MultiWriter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MultiWriter ensures the simultaneous storage of data in several storages.

func NewMultiWriter

func NewMultiWriter(ctx context.Context, opts *CmdOpts, metricDefs *metrics.Metrics) (mw *MultiWriter, err error)

NewMultiWriter creates and returns new instance of MultiWriter struct.

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) SyncMetrics

func (mw *MultiWriter) SyncMetrics(dbUnique, metricName, op string) (err error)

func (*MultiWriter) WriteMeasurements

func (mw *MultiWriter) WriteMeasurements(ctx context.Context, storageCh <-chan []metrics.MeasurementEnvelope)

type PostgresWriter

type PostgresWriter struct {
	// contains filtered or unexported fields
}

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DropOldTimePartitions

func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error)

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTime

func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error

EnsureMetricTime creates special partitions if Timescale used for realtime metrics

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error)

func (*PostgresWriter) GetOldTimePartitions

func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error)

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementEnvelope) error

Write send the measurements to the cache channel

type PrometheusWriter

type PrometheusWriter struct {
	PrometheusNamespace string
	// contains filtered or unexported fields
}

PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

func (*PrometheusWriter) MetricStoreMessageToPromMetrics

func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric

func (*PrometheusWriter) PromAsyncCacheAddMetricData

func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []metrics.MeasurementEnvelope)

func (*PrometheusWriter) PromAsyncCacheInitIfRequired

func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)

func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny

func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(dbUnique, metricName, op string) error

func (*PrometheusWriter) Write

func (promw *PrometheusWriter) Write(msgs []metrics.MeasurementEnvelope) error

type RPCWriter

type RPCWriter struct {
	// contains filtered or unexported fields
}

RPCWriter is a sink that sends metric measurements to a remote server using the RPC protocol. Remote server should implement the Receiver interface. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

func NewRPCWriter

func NewRPCWriter(ctx context.Context, address string) (*RPCWriter, error)

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(dbUnique string, metricName string, op string) error

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msgs []metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

type SyncReq

type SyncReq struct {
	DbName     string
	MetricName string
	Operation  string
}

type Writer

type Writer interface {
	SyncMetric(dbUnique, metricName, op string) error
	Write(msgs []metrics.MeasurementEnvelope) error
}

Writer is an interface that writes metrics values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL