amqp_consumer

package
v1.23.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: MIT Imports: 14 Imported by: 12

README

AMQP Consumer Input Plugin

This plugin provides a consumer for use with AMQP 0-9-1, a prominent implementation of this protocol being RabbitMQ.

Metrics are read from a topic exchange using the configured queue and binding_key.

Message payload should be formatted in one of the Telegraf Data Formats.

For an introduction to AMQP see:

Configuration

# AMQP consumer plugin
[[inputs.amqp_consumer]]
  ## Brokers to consume from.  If multiple brokers are specified a random broker
  ## will be selected anytime a connection is established.  This can be
  ## helpful for load balancing when not using a dedicated load balancer.
  brokers = ["amqp://localhost:5672/influxdb"]

  ## Authentication credentials for the PLAIN auth_method.
  # username = ""
  # password = ""

  ## Name of the exchange to declare.  If unset, no exchange will be declared.
  exchange = "telegraf"

  ## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
  # exchange_type = "topic"

  ## If true, exchange will be passively declared.
  # exchange_passive = false

  ## Exchange durability can be either "transient" or "durable".
  # exchange_durability = "durable"

  ## Additional exchange arguments.
  # exchange_arguments = { }
  # exchange_arguments = {"hash_property" = "timestamp"}

  ## AMQP queue name.
  queue = "telegraf"

  ## AMQP queue durability can be "transient" or "durable".
  queue_durability = "durable"

  ## If true, queue will be passively declared.
  # queue_passive = false

  ## A binding between the exchange and queue using this binding key is
  ## created.  If unset, no binding is created.
  binding_key = "#"

  ## Maximum number of messages server should give to the worker.
  # prefetch_count = 50

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Auth method. PLAIN and EXTERNAL are supported
  ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
  ## described here: https://www.rabbitmq.com/plugins.html
  # auth_method = "PLAIN"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Content encoding for message payloads, can be set to "gzip" to or
  ## "identity" to apply no encoding.
  # content_encoding = "identity"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Documentation

Index

Constants

View Source
const (
	DefaultAuthMethod = "PLAIN"

	DefaultBroker = "amqp://localhost:5672/influxdb"

	DefaultExchangeType       = "topic"
	DefaultExchangeDurability = "durable"

	DefaultQueueDurability = "durable"

	DefaultPrefetchCount = 50
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPConsumer

type AMQPConsumer struct {
	URL                    string            `toml:"url" deprecated:"1.7.0;use 'brokers' instead"`
	Brokers                []string          `toml:"brokers"`
	Username               string            `toml:"username"`
	Password               string            `toml:"password"`
	Exchange               string            `toml:"exchange"`
	ExchangeType           string            `toml:"exchange_type"`
	ExchangeDurability     string            `toml:"exchange_durability"`
	ExchangePassive        bool              `toml:"exchange_passive"`
	ExchangeArguments      map[string]string `toml:"exchange_arguments"`
	MaxUndeliveredMessages int               `toml:"max_undelivered_messages"`

	// Queue Name
	Queue           string `toml:"queue"`
	QueueDurability string `toml:"queue_durability"`
	QueuePassive    bool   `toml:"queue_passive"`

	// Binding Key
	BindingKey string `toml:"binding_key"`

	// Controls how many messages the server will try to keep on the network
	// for consumers before receiving delivery acks.
	PrefetchCount int

	// AMQP Auth method
	AuthMethod string
	tls.ClientConfig

	ContentEncoding string `toml:"content_encoding"`
	Log             telegraf.Logger
	// contains filtered or unexported fields
}

AMQPConsumer is the top level struct for this plugin

func (*AMQPConsumer) Gather

func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error

All gathering is done in the Start function

func (*AMQPConsumer) SampleConfig

func (*AMQPConsumer) SampleConfig() string

func (*AMQPConsumer) SetParser

func (a *AMQPConsumer) SetParser(parser parsers.Parser)

func (*AMQPConsumer) Start

func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error

Start satisfies the telegraf.ServiceInput interface

func (*AMQPConsumer) Stop

func (a *AMQPConsumer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL