postgresql

package
v1.33.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: MIT Imports: 27 Imported by: 0

README

PostgreSQL Output Plugin

This plugin writes metrics to a PostgreSQL (or compatible) server managing the schema and automatically updating missing columns.

⭐ Telegraf v1.24.0 🏷️ datastore 💻 all

Global configuration options

In addition to the plugin-specific configuration settings, plugins support additional global and plugin configuration settings. These settings are used to modify metrics, tags, and field or create aliases and configure ordering, etc. See the CONFIGURATION.md for more details.

Startup error behavior options

In addition to the plugin-specific and global configuration settings the plugin supports options for specifying the behavior when experiencing startup errors using the startup_error_behavior setting. Available values are:

  • error: Telegraf with stop and exit in case of startup errors. This is the default behavior.
  • ignore: Telegraf will ignore startup errors for this plugin and disables it but continues processing for all other plugins.
  • retry: Telegraf will try to startup the plugin in every gather or write cycle in case of startup errors. The plugin is disabled until the startup succeeds.

Secret-store support

This plugin supports secrets from secret-stores for the connection option. See the secret-store documentation for more details on how to use them.

Configuration

# Publishes metrics to a postgresql database
[[outputs.postgresql]]
  ## Specify connection address via the standard libpq connection string:
  ##   host=... user=... password=... sslmode=... dbname=...
  ## Or a URL:
  ##   postgres://[user[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
  ## See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  ##
  ## All connection parameters are optional. Environment vars are also supported.
  ## e.g. PGPASSWORD, PGHOST, PGUSER, PGDATABASE
  ## All supported vars can be found here:
  ##  https://www.postgresql.org/docs/current/libpq-envars.html
  ##
  ## Non-standard parameters:
  ##   pool_max_conns (default: 1) - Maximum size of connection pool for parallel (per-batch per-table) inserts.
  ##   pool_min_conns (default: 0) - Minimum size of connection pool.
  ##   pool_max_conn_lifetime (default: 0s) - Maximum connection age before closing.
  ##   pool_max_conn_idle_time (default: 0s) - Maximum idle time of a connection before closing.
  ##   pool_health_check_period (default: 0s) - Duration between health checks on idle connections.
  # connection = ""

  ## Postgres schema to use.
  # schema = "public"

  ## Store tags as foreign keys in the metrics table. Default is false.
  # tags_as_foreign_keys = false

  ## Suffix to append to table name (measurement name) for the foreign tag table.
  # tag_table_suffix = "_tag"

  ## Deny inserting metrics if the foreign tag can't be inserted.
  # foreign_tag_constraint = false

  ## Store all tags as a JSONB object in a single 'tags' column.
  # tags_as_jsonb = false

  ## Store all fields as a JSONB object in a single 'fields' column.
  # fields_as_jsonb = false

  ## Name of the timestamp column
  ## NOTE: Some tools (e.g. Grafana) require the default name so be careful!
  # timestamp_column_name = "time"

  ## Type of the timestamp column
  ## Currently, "timestamp without time zone" and "timestamp with time zone"
  ## are supported
  # timestamp_column_type = "timestamp without time zone"

  ## Templated statements to execute when creating a new table.
  # create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }})''',
  # ]

  ## Templated statements to execute when adding columns to a table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped. Points containing fields for which there is no
  ## column will have the field omitted.
  # add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## Templated statements to execute when creating a new tag table.
  # tag_table_create_templates = [
  #   '''CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))''',
  # ]

  ## Templated statements to execute when adding columns to a tag table.
  ## Set to an empty list to disable. Points containing tags for which there is
  ## no column will be skipped.
  # tag_table_add_column_templates = [
  #   '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
  # ]

  ## The postgres data type to use for storing unsigned 64-bit integer values
  ## (Postgres does not have a native unsigned 64-bit integer type).
  ## The value can be one of:
  ##   numeric - Uses the PostgreSQL "numeric" data type.
  ##   uint8 - Requires pguint extension (https://github.com/petere/pguint)
  # uint64_type = "numeric"

  ## When using pool_max_conns > 1, and a temporary error occurs, the query is
  ## retried with an incremental backoff. This controls the maximum duration.
  # retry_max_backoff = "15s"

  ## Approximate number of tag IDs to store in in-memory cache (when using
  ## tags_as_foreign_keys). This is an optimization to skip inserting known
  ## tag IDs. Each entry consumes approximately 34 bytes of memory.
  # tag_cache_size = 100000

  ## Cut column names at the given length to not exceed PostgreSQL's
  ## 'identifier length' limit (default: no limit)
  ## (see https://www.postgresql.org/docs/current/limits.html)
  ## Be careful to not create duplicate column names!
  # column_name_length_limit = 0

  ## Enable & set the log level for the Postgres driver.
  # log_level = "warn" # trace, debug, info, warn, error, none
Concurrency

By default the postgresql plugin does not utilize any concurrency. However it can for increased throughput. When concurrency is off, telegraf core handles things like retrying on failure, buffering, etc. When concurrency is used, these aspects have to be handled by the plugin.

To enable concurrent writes to the database, set the pool_max_conns connection parameter to a value >1. When enabled, incoming batches will be split by measurement/table name. In addition, if a batch comes in and the previous batch has not completed, concurrency will be used for the new batch as well.

If all connections are utilized and the pool is exhausted, further incoming batches will be buffered within telegraf core.

Foreign tags

When using tags_as_foreign_keys, tags will be written to a separate table with a tag_id column used for joins. Each series (unique combination of tag values) gets its own entry in the tags table, and a unique tag_id.

Data types

By default the postgresql plugin maps Influx data types to the following PostgreSQL types:

Influx PostgreSQL
float double precision
integer bigint
uinteger numeric*
string text
boolean boolean
unix timestamp timestamp

It is important to note that uinteger (unsigned 64-bit integer) is mapped to the numeric PostgreSQL data type. The numeric data type is an arbitrary precision decimal data type that is less efficient than bigint. This is necessary as the range of values for the Influx uinteger data type can exceed bigint, and thus cause errors when inserting data.

pguint

As a solution to the uinteger/numeric data type problem, there is a PostgreSQL extension that offers unsigned 64-bit integer support: https://github.com/petere/pguint.

If this extension is installed, you can enable the unsigned_integers config parameter which will cause the plugin to use the uint8 datatype instead of numeric.

Templating

The postgresql plugin uses templates for the schema modification SQL statements. This allows for complete control of the schema by the user.

Documentation on how to write templates can be found sqltemplate docs

Long Column Names

Postgres imposes a limit on the length of column identifiers, which can be found in the official docs. By default Telegraf does not enforce this limit as this limit can be modified on the server side. Furthermore, cutting off column names could lead to collisions if the columns are only different after the cut-off.

[!WARNING] Make sure you will not cause column name collisions when setting column_name_length_limit! If in doubt, explicitly shorten the field and tag names using e.g. the regexp processor.

Samples
TimescaleDB
tags_as_foreign_keys = true
create_templates = [
    '''CREATE TABLE {{ .table }} ({{ .columns }})''',
    '''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
    '''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
]
Multi-node
tags_as_foreign_keys = true
create_templates = [
    '''CREATE TABLE {{ .table }} ({{ .columns }})''',
    '''SELECT create_distributed_hypertable({{ .table|quoteLiteral }}, 'time', partitioning_column => 'tag_id', number_partitions => (SELECT count(*) FROM timescaledb_information.data_nodes)::integer, replication_factor => 2, chunk_time_interval => INTERVAL '7d')''',
    '''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
]
Tag table with view

This example enables tags_as_foreign_keys, but creates a postgres view to automatically join the metric & tag tables. The metric & tag tables are stored in a "telegraf" schema, with the view in the "public" schema.

tags_as_foreign_keys = true
schema = "telegraf"
create_templates = [
    '''CREATE TABLE {{ .table }} ({{ .columns }})''',
    '''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
    '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
    '''DROP VIEW IF EXISTS {{ .table.WithSchema "public" }}''',
    '''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
tag_table_add_column_templates = [
    '''ALTER TABLE {{.table}} ADD COLUMN IF NOT EXISTS {{.columns|join ", ADD COLUMN IF NOT EXISTS "}}''',
    '''DROP VIEW IF EXISTS {{ .metricTable.WithSchema "public" }}''',
    '''CREATE VIEW {{ .metricTable.WithSchema "public" }} AS SELECT time, {{ (.allColumns.Tags.Concat .metricTable.Columns.Fields).Identifiers | join "," }} FROM {{ .metricTable }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
Immutable data table

Some PostgreSQL-compatible databases don't allow modification of table schema after initial creation. This example works around the limitation by creating a new table and then using a view to join them together.

tags_as_foreign_keys = true
schema = 'telegraf'
create_templates = [
    '''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
    '''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
    '''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
    '''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '14d')''',
    '''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }}''',
    '''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
    '''ALTER TABLE {{ .table }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash).WithSchema "" }}''',
    '''ALTER VIEW {{ .table.WithSuffix "_data" }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash "_data").WithSchema "" }}''',
    '''DROP VIEW {{ .table.WithSchema "public" }}''',

    '''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
    '''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '7d')''',
    '''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
    '''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '14d')''',
    '''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }} UNION ALL SELECT {{ (.allColumns.Union .table.Columns).Selectors | join "," }} FROM {{ .table.WithSuffix "_" .table.Columns.Hash "_data" }}''',
    '''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
tag_table_add_column_templates = [
    '''ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}''',
    '''DROP VIEW {{ .metricTable.WithSchema "public" }}''',
    '''CREATE VIEW {{ .metricTable.WithSchema "public" }} AS SELECT time, {{ (.allColumns.Tags.Concat .metricTable.Columns.Fields).Identifiers | join "," }} FROM {{ .metricTable.WithSuffix "_data" }} t, {{ .table }} tt WHERE t.tag_id = tt.tag_id''',
]
Index

Create an index on time and tag columns for faster querying of data.

create_templates = [
    '''CREATE TABLE {{ .table }} ({{ .columns }})''',
    '''CREATE INDEX ON {{ .table }} USING btree({{ .columns.Keys.Identifiers | join "," }})'''
  ]

Error handling

When the plugin encounters an error writing to the database, it attempts to determine whether the error is temporary or permanent. An error is considered temporary if it's possible that retrying the write will succeed. Some examples of temporary errors are things like connection interruption, deadlocks, etc. Permanent errors are things like invalid data type, insufficient permissions, etc.

When an error is determined to be temporary, the plugin will retry the write with an incremental backoff.

When an error is determined to be permanent, the plugin will discard the sub-batch. The "sub-batch" is the portion of the input batch that is being written to the same table.

Documentation

Index

Constants

View Source
const (
	PgBool                     = "boolean"
	PgSmallInt                 = "smallint"
	PgInteger                  = "integer"
	PgBigInt                   = "bigint"
	PgReal                     = "real"
	PgDoublePrecision          = "double precision"
	PgNumeric                  = "numeric"
	PgText                     = "text"
	PgTimestampWithTimeZone    = "timestamp with time zone"
	PgTimestampWithoutTimeZone = "timestamp without time zone"
	PgSerial                   = "serial"
	PgJSONb                    = "jsonb"
)

Constants for naming PostgreSQL data types both in their short and long versions.

View Source
const (
	PgUint8 = "uint8"
)

Types from pguint

Variables

This section is empty.

Functions

func NewTableSources

func NewTableSources(p *Postgresql, metrics []telegraf.Metric) map[string]*TableSource

Types

type Postgresql

type Postgresql struct {
	Connection                 config.Secret           `toml:"connection"`
	Schema                     string                  `toml:"schema"`
	TagsAsForeignKeys          bool                    `toml:"tags_as_foreign_keys"`
	TagTableSuffix             string                  `toml:"tag_table_suffix"`
	ForeignTagConstraint       bool                    `toml:"foreign_tag_constraint"`
	TagsAsJsonb                bool                    `toml:"tags_as_jsonb"`
	FieldsAsJsonb              bool                    `toml:"fields_as_jsonb"`
	TimestampColumnName        string                  `toml:"timestamp_column_name"`
	TimestampColumnType        string                  `toml:"timestamp_column_type"`
	CreateTemplates            []*sqltemplate.Template `toml:"create_templates"`
	AddColumnTemplates         []*sqltemplate.Template `toml:"add_column_templates"`
	TagTableCreateTemplates    []*sqltemplate.Template `toml:"tag_table_create_templates"`
	TagTableAddColumnTemplates []*sqltemplate.Template `toml:"tag_table_add_column_templates"`
	Uint64Type                 string                  `toml:"uint64_type"`
	RetryMaxBackoff            config.Duration         `toml:"retry_max_backoff"`
	TagCacheSize               int                     `toml:"tag_cache_size"`
	ColumnNameLenLimit         int                     `toml:"column_name_length_limit"`
	LogLevel                   string                  `toml:"log_level"`
	Logger                     telegraf.Logger         `toml:"-"`
	// contains filtered or unexported fields
}

func (*Postgresql) Close

func (p *Postgresql) Close() error

Close closes the connection(s) to the database.

func (*Postgresql) Connect

func (p *Postgresql) Connect() error

Connect establishes a connection to the target database and prepares the cache

func (*Postgresql) Init

func (p *Postgresql) Init() error

func (*Postgresql) SampleConfig

func (p *Postgresql) SampleConfig() string

func (*Postgresql) Write

func (p *Postgresql) Write(metrics []telegraf.Metric) error

type TableManager

type TableManager struct {
	*Postgresql
	// contains filtered or unexported fields
}

func NewTableManager

func NewTableManager(postgresql *Postgresql) *TableManager

NewTableManager returns an instance of the tables.Manager interface that can handle checking and updating the state of tables in the PG database.

func (*TableManager) ClearTableCache

func (tm *TableManager) ClearTableCache()

ClearTableCache clear the table structure cache.

func (*TableManager) EnsureStructure

func (tm *TableManager) EnsureStructure(
	ctx context.Context,
	db dbh,
	tbl *tableState,
	columns []utils.Column,
	createTemplates, addColumnsTemplates []*sqltemplate.Template,
	metricsTable, tagsTable *tableState,
) ([]utils.Column, error)

EnsureStructure ensures that the table identified by tableName contains the provided columns.

createTemplates and addColumnTemplates are the templates which are executed in the event of table create or alter (respectively). metricsTableName and tagsTableName are passed to the templates.

If the table cannot be modified, the returned column list is the columns which are missing from the table. This includes when an error is returned.

func (*TableManager) MatchSource

func (tm *TableManager) MatchSource(ctx context.Context, db dbh, rowSource *TableSource) error

MatchSource scans through the metrics, determining what columns are needed for inserting, and ensuring the DB schema matches.

If the schema does not match, and schema updates are disabled: If a field missing from the DB, the field is omitted. If a tag is missing from the DB, the metric is dropped.

type TableSource

type TableSource struct {
	// contains filtered or unexported fields
}

TableSource satisfies pgx.CopyFromSource

func NewTableSource

func NewTableSource(postgresql *Postgresql, name string) *TableSource

func (*TableSource) AddMetric

func (tsrc *TableSource) AddMetric(metric telegraf.Metric)

func (*TableSource) ColumnNames

func (tsrc *TableSource) ColumnNames() []string

func (*TableSource) DropColumn

func (tsrc *TableSource) DropColumn(col utils.Column) error

DropColumn drops the specified column. If column is a tag column, any metrics containing the tag will be skipped. If column is a field column, any metrics containing the field will have it omitted.

func (*TableSource) Err

func (tsrc *TableSource) Err() error

func (*TableSource) FieldColumns

func (tsrc *TableSource) FieldColumns() []utils.Column

FieldColumns returns the superset of all fields of all metrics.

func (*TableSource) MetricTableColumns

func (tsrc *TableSource) MetricTableColumns() []utils.Column

MetricTableColumns returns the full column list, including time, tag id or tags, and fields.

func (*TableSource) Name

func (tsrc *TableSource) Name() string

func (*TableSource) Next

func (tsrc *TableSource) Next() bool

func (*TableSource) Reset

func (tsrc *TableSource) Reset()

func (*TableSource) TagColumns

func (tsrc *TableSource) TagColumns() []utils.Column

TagColumns returns the superset of all tags of all metrics.

func (*TableSource) TagTableColumns

func (tsrc *TableSource) TagTableColumns() []utils.Column

func (*TableSource) Values

func (tsrc *TableSource) Values() ([]interface{}, error)

type TagTableSource

type TagTableSource struct {
	*TableSource
	// contains filtered or unexported fields
}

func NewTagTableSource

func NewTagTableSource(tsrc *TableSource) *TagTableSource

func (*TagTableSource) ColumnNames

func (ttsrc *TagTableSource) ColumnNames() []string

func (*TagTableSource) Err

func (ttsrc *TagTableSource) Err() error

func (*TagTableSource) Name

func (ttsrc *TagTableSource) Name() string

func (*TagTableSource) Next

func (ttsrc *TagTableSource) Next() bool

func (*TagTableSource) Reset

func (ttsrc *TagTableSource) Reset()

func (*TagTableSource) UpdateCache

func (ttsrc *TagTableSource) UpdateCache()

func (*TagTableSource) Values

func (ttsrc *TagTableSource) Values() ([]interface{}, error)

type Uint8

type Uint8 struct {
	Int    uint64
	Status pgtype.Status
}

func (*Uint8) AssignTo

func (u *Uint8) AssignTo(dst interface{}) error

func (*Uint8) DecodeBinary

func (u *Uint8) DecodeBinary(_, src []byte) error

func (*Uint8) DecodeText

func (u *Uint8) DecodeText(_, src []byte) error

func (*Uint8) EncodeBinary

func (u *Uint8) EncodeBinary(_, buf []byte) ([]byte, error)

func (*Uint8) EncodeText

func (u *Uint8) EncodeText(_, buf []byte) ([]byte, error)

func (*Uint8) Get

func (u *Uint8) Get() interface{}

func (*Uint8) MarshalJSON

func (u *Uint8) MarshalJSON() ([]byte, error)

func (*Uint8) Scan

func (u *Uint8) Scan(src interface{}) error

Scan implements the database/sql Scanner interface.

func (*Uint8) Set

func (u *Uint8) Set(src interface{}) error

func (*Uint8) UnmarshalJSON

func (u *Uint8) UnmarshalJSON(b []byte) error

func (*Uint8) Value

func (u *Uint8) Value() (driver.Value, error)

Value implements the database/sql/driver Valuer interface.

Directories

Path Synopsis
Package sqltemplate
Package sqltemplate

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL