kafka_consumer

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2020 License: MIT Imports: 13 Imported by: 186

README

Kafka Consumer Input Plugin

The Kafka consumer plugin reads from Kafka and creates metrics using one of the supported input data formats.

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin and use the old zookeeper connection method.

Configuration
[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["localhost:9092"]

  ## Topics to consume.
  topics = ["telegraf"]

  ## When set this tag will be added to all metrics with the topic as the value.
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled using the "enable_tls" option.
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  ## Name of the consumer group.
  # consumer_group = "telegraf_metrics_consumers"

  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroup added in v1.14.0

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
	Errors() <-chan error
	Close() error
}

type ConsumerGroupCreator added in v1.14.0

type ConsumerGroupCreator interface {
	Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)
}

type ConsumerGroupHandler added in v1.14.0

type ConsumerGroupHandler struct {
	MaxMessageLen int
	TopicTag      string
	// contains filtered or unexported fields
}

ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.

func NewConsumerGroupHandler added in v1.14.0

func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler

func (*ConsumerGroupHandler) Cleanup added in v1.14.0

Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.

func (*ConsumerGroupHandler) ConsumeClaim added in v1.14.0

ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.

func (*ConsumerGroupHandler) Handle added in v1.14.0

Handle processes a message and if successful saves it to be acknowledged after delivery.

func (*ConsumerGroupHandler) Reserve added in v1.14.0

func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error

Reserve blocks until there is an available slot for a new message.

func (*ConsumerGroupHandler) Setup added in v1.14.0

Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.

type KafkaConsumer added in v1.14.0

type KafkaConsumer struct {
	Brokers                []string `toml:"brokers"`
	ClientID               string   `toml:"client_id"`
	ConsumerGroup          string   `toml:"consumer_group"`
	MaxMessageLen          int      `toml:"max_message_len"`
	MaxUndeliveredMessages int      `toml:"max_undelivered_messages"`
	Offset                 string   `toml:"offset"`
	BalanceStrategy        string   `toml:"balance_strategy"`
	Topics                 []string `toml:"topics"`
	TopicTag               string   `toml:"topic_tag"`
	Version                string   `toml:"version"`
	SASLPassword           string   `toml:"sasl_password"`
	SASLUsername           string   `toml:"sasl_username"`
	SASLVersion            *int     `toml:"sasl_version"`

	EnableTLS *bool `toml:"enable_tls"`
	tls.ClientConfig

	Log telegraf.Logger `toml:"-"`

	ConsumerCreator ConsumerGroupCreator `toml:"-"`
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Description added in v1.14.0

func (k *KafkaConsumer) Description() string

func (*KafkaConsumer) Gather added in v1.14.0

func (k *KafkaConsumer) Gather(acc telegraf.Accumulator) error

func (*KafkaConsumer) Init added in v1.14.0

func (k *KafkaConsumer) Init() error

func (*KafkaConsumer) SampleConfig added in v1.14.0

func (k *KafkaConsumer) SampleConfig() string

func (*KafkaConsumer) SetParser added in v1.14.0

func (k *KafkaConsumer) SetParser(parser parsers.Parser)

func (*KafkaConsumer) Start added in v1.14.0

func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error

func (*KafkaConsumer) Stop added in v1.14.0

func (k *KafkaConsumer) Stop()

type Message added in v1.14.0

type Message struct {
	// contains filtered or unexported fields
}

Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.

type SaramaCreator added in v1.14.0

type SaramaCreator struct{}

func (*SaramaCreator) Create added in v1.14.0

func (*SaramaCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL