kafkareceiver

package module
v0.116.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: Apache-2.0 Imports: 31 Imported by: 17

README

Kafka Receiver

Status
Stability beta: metrics, logs, traces
Distributions core, contrib
Issues Open issues Closed issues
Code Owners @pavolloffay, @MovieStoreGuy

Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.

Note that metrics and logs only support OTLP.

Getting Started

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • resolve_canonical_bootstrap_servers_only (default = false): Whether to resolve then reverse-lookup broker IPs during startup
  • topic (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from. Only one telemetry type may be used for a given topic.
  • encoding (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings:
    • otlp_proto: the payload is deserialized to ExportTraceServiceRequest, ExportLogsServiceRequest or ExportMetricsServiceRequest respectively.
    • otlp_json: the payload is deserialized to ExportTraceServiceRequest ExportLogsServiceRequest or ExportMetricsServiceRequest respectively using JSON encoding.
    • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
    • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
    • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
    • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
    • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
    • raw: (logs only) the payload's bytes are inserted as the body of a log record.
    • text: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>, like text_utf-8, text_shift_jis, etc., to customize this behavior.
    • json: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
    • azure_resource_logs: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • initial_offset (default = latest): The initial offset to use if no offset was previously committed. Must be latest or earliest.
  • session_timeout (default = 10s): The request timeout for detecting client failures when using Kafka’s group management facilities.
  • heartbeat_interval (default = 3s): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
  • min_fetch_size (default = 1): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
  • default_fetch_size (default = 1048576): The default number of message bytes to fetch in a request, defaults to 1MB.
  • max_fetch_size (default = 0): The maximum number of message bytes to fetch in a request, defaults to unlimited.
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • sasl
      • username: The username to use.
      • password: The password to use
      • mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
      • aws_msk.region: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
      • aws_msk.broker_addr: MSK Broker address in case of AWS_MSK_IAM mechanism
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to false.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to false.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to false.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
      • disable_fast_negotiation: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to false by default.
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • autocommit
    • enable: (default = true) Whether or not to auto-commit updated offsets back to the broker
    • interval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
  • message_marking:
    • after: (default = false) If true, the messages are marked after the pipeline execution
    • on_error: (default = false) If false, only the successfully processed messages are marked Note: this can block the entire partition in case a message processing returns a permanent error
  • header_extraction:
    • extract_headers (default = false): Allows user to attach header fields to resource attributes in otel piepline
    • headers (default = []): List of headers they'd like to extract from kafka record. Note: Matching pattern will be exact. Regexes are not supported as of now.

Example:

receivers:
  kafka:
    protocol_version: 2.0.0

Example of connecting to kafka using sasl and TLS:

receivers:
  kafka:
    auth:
      sasl:
        username: "user"
        password: "secret"
        mechanism: "SCRAM-SHA-512"
      tls:
        insecure: false

Example of header extraction:

receivers:
  kafka:
    topic: test
    header_extraction: 
      extract_headers: true
      headers: ["header1", "header2"]
  • If we feed following kafka record to test topic and use above configs:
{
  event: Hello,
  headers: {
    header1: value1,
    header2: value2,
  }
}

we will get a log record in collector similar to:

{
  ...
  body: Hello,
  resource: {
    kafka.header.header1: value1,
    kafka.header.header2: value2,
  },
  ...
}
  • Here you can see the kafka record header header1 and header2 being added to resource attribute.
  • Every matching kafka header key is prefixed with kafka.header string and attached to resource attributes.

Documentation

Overview

Package kafkareceiver receives traces from Kafka.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory(options ...FactoryOption) receiver.Factory

NewFactory creates Kafka receiver factory.

Types

type AutoCommit added in v0.38.0

type AutoCommit struct {
	// Whether or not to auto-commit updated offsets back to the broker.
	// (default enabled).
	Enable bool `mapstructure:"enable"`
	// How frequently to commit updated offsets. Ineffective unless
	// auto-commit is enabled (default 1s)
	Interval time.Duration `mapstructure:"interval"`
}

type Config

type Config struct {
	// The list of kafka brokers (default localhost:9092)
	Brokers []string `mapstructure:"brokers"`
	// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
	// each of the provided brokers. It will then do a PTR lookup for each
	// returned IP, and that set of names becomes the broker list. This can be
	// required in SASL environments.
	ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
	// Kafka protocol version
	ProtocolVersion string `mapstructure:"protocol_version"`
	// Session interval for the Kafka consumer
	SessionTimeout time.Duration `mapstructure:"session_timeout"`
	// Heartbeat interval for the Kafka consumer
	HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
	// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
	Topic string `mapstructure:"topic"`
	// Encoding of the messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`
	// The consumer group that receiver will be consuming messages from (default "otel-collector")
	GroupID string `mapstructure:"group_id"`
	// The consumer client ID that receiver will use (default "otel-collector")
	ClientID string `mapstructure:"client_id"`
	// The initial offset to use if no offset was previously committed.
	// Must be `latest` or `earliest` (default "latest").
	InitialOffset string `mapstructure:"initial_offset"`

	// Metadata is the namespace for metadata management properties used by the
	// Client, and shared by the Producer/Consumer.
	Metadata kafkaexporter.Metadata `mapstructure:"metadata"`

	Authentication kafka.Authentication `mapstructure:"auth"`

	// Controls the auto-commit functionality
	AutoCommit AutoCommit `mapstructure:"autocommit"`

	// Controls the way the messages are marked as consumed
	MessageMarking MessageMarking `mapstructure:"message_marking"`

	// Extract headers from kafka records
	HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`

	// The minimum bytes per fetch from Kafka (default "1")
	MinFetchSize int32 `mapstructure:"min_fetch_size"`
	// The default bytes per fetch from Kafka (default "1048576")
	DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
	// The maximum bytes per fetch from Kafka (default "0", no limit)
	MaxFetchSize int32 `mapstructure:"max_fetch_size"`
}

Config defines configuration for Kafka receiver.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks the receiver configuration is valid

type FactoryOption

type FactoryOption func(factory *kafkaReceiverFactory)

FactoryOption applies changes to kafkaExporterFactory.

type HeaderExtraction added in v0.87.0

type HeaderExtraction struct {
	ExtractHeaders bool     `mapstructure:"extract_headers"`
	Headers        []string `mapstructure:"headers"`
}

type HeaderExtractor added in v0.87.0

type HeaderExtractor interface {
	// contains filtered or unexported methods
}

type LogsUnmarshaler

type LogsUnmarshaler interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (plog.Logs, error)
	// Encoding of the serialized messages.
	Encoding() string
}

LogsUnmarshaler deserializes the message body.

type LogsUnmarshalerWithEnc added in v0.78.0

type LogsUnmarshalerWithEnc interface {
	LogsUnmarshaler
	// WithEnc sets the character encoding (UTF-8, GBK, etc.) of the unmarshaler.
	WithEnc(string) (LogsUnmarshalerWithEnc, error)
}

type MessageMarking added in v0.38.0

type MessageMarking struct {
	// If true, the messages are marked after the pipeline execution
	After bool `mapstructure:"after"`

	// If false, only the successfully processed messages are marked, it has no impact if
	// After is set to false.
	// Note: this can block the entire partition in case a message processing returns
	// a permanent error.
	OnError bool `mapstructure:"on_error"`
}

type MetricsUnmarshaler

type MetricsUnmarshaler interface {
	// Unmarshal deserializes the message body into traces
	Unmarshal([]byte) (pmetric.Metrics, error)
	// Encoding of the serialized messages
	Encoding() string
}

MetricsUnmarshaler deserializes the message body

type TracesUnmarshaler

type TracesUnmarshaler interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (ptrace.Traces, error)
	// Encoding of the serialized messages.
	Encoding() string
}

TracesUnmarshaler deserializes the message body.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL