mqtt_consumer

package
v1.32.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2024 License: MIT Imports: 16 Imported by: 16

README

MQTT Consumer Input Plugin

The MQTT consumer plugin reads from the specified MQTT topics and creates metrics using one of the supported input data formats.

Service Input

This plugin is a service input. Normal plugins gather metrics determined by the interval setting. Service plugins start a service to listens and waits for metrics or events to occur. Service plugins have two key differences from normal plugins:

  1. The global or plugin specific interval setting may not apply
  2. The CLI options of --test, --test-wait, and --once may not produce output for this plugin

Global configuration options

In addition to the plugin-specific configuration settings, plugins support additional global and plugin configuration settings. These settings are used to modify metrics, tags, and field or create aliases and configure ordering, etc. See the CONFIGURATION.md for more details.

Startup error behavior options

In addition to the plugin-specific and global configuration settings the plugin supports options for specifying the behavior when experiencing startup errors using the startup_error_behavior setting. Available values are:

  • error: Telegraf with stop and exit in case of startup errors. This is the default behavior.
  • ignore: Telegraf will ignore startup errors for this plugin and disables it but continues processing for all other plugins.
  • retry: Telegraf will try to startup the plugin in every gather or write cycle in case of startup errors. The plugin is disabled until the startup succeeds.

Secret-store support

This plugin supports secrets from secret-stores for the username and password option. See the secret-store documentation for more details on how to use them.

Configuration

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://127.0.0.1:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "telegraf/host01/cpu",
    "telegraf/+/mem",
    "sensors/#",
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  # topic_tag = "topic"

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  # qos = 0

  ## Connection timeout for initial connection in seconds
  # connection_timeout = "30s"

  ## Interval and ping timeout for keep-alive messages
  ## The sum of those options defines when a connection loss is detected.
  ## Note: The keep-alive interval needs to be greater or equal one second and
  ## fractions of a second are not supported.
  # keepalive = "60s"
  # ping_timeout = "10s"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Persistent session disables clearing of the client session on connection.
  ## In order for this option to work you must also set client_id to identify
  ## the client.  To receive messages that arrived while the client is offline,
  ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
  ## publishing. Finally, using a persistent session will use the initial
  ## connection topics and not subscribe to any new topics even after
  ## reconnecting or restarting without a change in client ID.
  # persistent_session = false

  ## If unset, a random client ID will be generated.
  # client_id = ""

  ## Username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Client trace messages
  ## When set to true, and debug mode enabled in the agent settings, the MQTT
  ## client's messages are included in telegraf logs. These messages are very
  ## noisey, but essential for debugging issues.
  # client_trace = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path,
  ## # denotes a variable length path element (can only be used once per setting)
  # [[inputs.mqtt_consumer.topic_parsing]]
  #   topic = ""
  #   measurement = ""
  #   tags = ""
  #   fields = ""
  ## Value supported is int, float, unit
  #   [inputs.mqtt_consumer.topic_parsing.types]
  #      key = type

Example Output

mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661

About Topic Parsing

The MQTT topic as a whole is stored as a tag, but this can be far too coarse to be easily used when utilizing the data further down the line. This change allows tag values to be extracted from the MQTT topic letting you store the information provided in the topic in a meaningful way. An _ denotes an ignored entry in the topic path. Please see the following example.

Topic Parsing Example
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://127.0.0.1:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "telegraf/+/cpu/23",
  ]

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "value"
  data_type = "float"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "telegraf/one/cpu/23"
    measurement = "_/_/measurement/_"
    tags = "tag/_/_/_"
    fields = "_/_/_/test"
    [inputs.mqtt_consumer.topic_parsing.types]
      test = "int"

Will result in the following metric:

cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291

Field Pivoting Example

You can use the pivot processor to rotate single valued metrics into a multi field metric. For more info check out the pivot processors here.

For this example these are the topics:

/sensors/CLE/v1/device5/temp
/sensors/CLE/v1/device5/rpm
/sensors/CLE/v1/device5/ph
/sensors/CLE/v1/device5/spin

And these are the metrics:

sensors,site=CLE,version=v1,device_name=device5,field=temp value=390
sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0
sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45

Using pivot in the config will rotate the metrics into a multi field metric. The config:

[[inputs.mqtt_consumer]]
    ....
    topics = "/sensors/#"
    [[inputs.mqtt_consumer.topic_parsing]]
        measurement = "/measurement/_/_/_/_"
        tags = "/_/site/version/device_name/field"
[[processors.pivot]]
    tag_key = "field"
    value_key = "value"

Will result in the following metric:

sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45

Metrics

  • All measurements are tagged with the incoming topic, ie topic=telegraf/host01/cpu

  • example when [[inputs.mqtt_consumer.topic_parsing]] is set

  • when [[inputs.internal]] is set:

    • payload_size (int): get the cumulative size in bytes that have been received from incoming messages
    • messages_received (int): count of the number of messages that have been received from mqtt

This will result in the following metric:

internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Connect() mqtt.Token
	SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
	AddRoute(topic string, callback mqtt.MessageHandler)
	Disconnect(quiesce uint)
	IsConnected() bool
}

type ClientFactory

type ClientFactory func(o *mqtt.ClientOptions) Client

type MQTTConsumer

type MQTTConsumer struct {
	Servers                []string             `toml:"servers"`
	Topics                 []string             `toml:"topics"`
	TopicTag               *string              `toml:"topic_tag"`
	TopicParserConfig      []TopicParsingConfig `toml:"topic_parsing"`
	Username               config.Secret        `toml:"username"`
	Password               config.Secret        `toml:"password"`
	QoS                    int                  `toml:"qos"`
	ConnectionTimeout      config.Duration      `toml:"connection_timeout"`
	KeepAliveInterval      config.Duration      `toml:"keepalive"`
	PingTimeout            config.Duration      `toml:"ping_timeout"`
	MaxUndeliveredMessages int                  `toml:"max_undelivered_messages"`
	PersistentSession      bool                 `toml:"persistent_session"`
	ClientTrace            bool                 `toml:"client_trace"`
	ClientID               string               `toml:"client_id"`
	Log                    telegraf.Logger      `toml:"-"`
	tls.ClientConfig
	// contains filtered or unexported fields
}

func New

func New(factory ClientFactory) *MQTTConsumer

func (*MQTTConsumer) Gather

func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error

func (*MQTTConsumer) Init

func (m *MQTTConsumer) Init() error

func (*MQTTConsumer) SampleConfig

func (*MQTTConsumer) SampleConfig() string

func (*MQTTConsumer) SetParser

func (m *MQTTConsumer) SetParser(parser telegraf.Parser)

func (*MQTTConsumer) Start

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error

func (*MQTTConsumer) Stop

func (m *MQTTConsumer) Stop()

type TopicParser added in v1.32.0

type TopicParser struct {
	// contains filtered or unexported fields
}

func (*TopicParser) Parse added in v1.32.0

func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error

type TopicParsingConfig added in v1.21.0

type TopicParsingConfig struct {
	Topic       string            `toml:"topic"`
	Measurement string            `toml:"measurement"`
	Tags        string            `toml:"tags"`
	Fields      string            `toml:"fields"`
	FieldTypes  map[string]string `toml:"types"`
}

func (*TopicParsingConfig) NewParser added in v1.32.0

func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL