kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

README

Kafka Input Plugin

The kafka input plugin reads from Kafka and passes each message to configured parser. This plugin requires parser.

Each reader uses its own commit queue into which each fetched message is placed. Every commit_interval fetch process paused and queue scanning for uncommitted ready sequence from oldest to newest messages. Largest offset found from the beginning of the queue will be committed.

Message is marked as ready to be committed when an event tracker hook is called.

If commit queue is full, fetching is suspended until at least one message is committed.

[!TIP]
This plugin may write it's own metrics

Configuration

[[inputs]]
  [inputs.kafka]
    # if true, plugin client writes it's own metrics
    enable_metrics = false

    # list of kafka cluster nodes
    brokers = [ "localhost:9092" ]

    # unique identifier that the transport communicates to brokers when it sends requests
    client_id = "neptunus.kafka.input"

    # topics to consume
    # topic name will be set as an event routing key
    topics = [ "topic_one", "topic_two" ]

    # if configured, an event id will be set by data from path
    # expected format - "type:path"
    id_from = "field:path.to.id"

    # determines from whence the consumer group should begin consuming
    # when it finds a partition without a committed offset, "first" or "last"
    start_offset = "last"

    # consumer group identifier
    group_id = "neptunus.kafka.input"

    # amount of time the consumer group will be saved by the broker
    group_ttl = "24h"

    # consumer group balancing strategy
    # "range", "round-robin" or "rack-affinity"
    group_balancer = "range"

    # rack where this consumer is running
    # requires for rack-affinity group balancer
    rack = "rack-01"

    # maximum amount of time a dial will wait for a connect to complete
    dial_timeout = "5s"

    # amount of time that may pass without a heartbeat before the coordinator
    # considers the consumer dead and initiates a rebalance
    session_timeout = "30s"

    # amount of time the coordinator will wait for members to join as part of a rebalance
    rebalance_timeout = "30s"

    # frequency at which the reader sends the consumer group heartbeat update
    heartbeat_interval = "3s"

    # amount of time to wait to fetch message from kafka messages batch
    read_batch_timeout = "3s"

    # amount of time to wait for new data to come when fetching batches of messages from kafka
    wait_batch_timeout = "3s"

    # maximum batch size that the consumer will accept
    max_batch_size = "1MiB"

    # maximum length of internal uncommitted messages queue
    max_uncommitted = 100

    # interval between commit queue scans
    commit_interval = "300ms"

    ## TLS configuration
    # if true, TLS client will be used
    tls_enable = false
    # trusted root certificates for server
    tls_ca_file = "/etc/neptunus/ca.pem"
    # used for TLS client certificate authentication
    tls_key_file = "/etc/neptunus/key.pem"
    tls_cert_file = "/etc/neptunus/cert.pem"
    # minimum TLS version, not limited by default
    tls_min_version = "TLS12"
    # send the specified TLS server name via SNI
    tls_server_name = "exmple.svc.local"
    # use TLS but skip chain & host verification
    tls_insecure_skip_verify = false

    # SASL settings
    [inputs.kafka.sasl]
      # SASL mechanism, "none", "plain", "scram-sha-256" or "scram-sha-512"
      mechanism = "scram-sha-512"

      # user credentials
      username = ""
      password = ""

    # a "label name -> header" map
    # if message header exists, it will be saved as configured label
    [inputs.kafka.labelheaders]
      length = "Content-Length"
      encoding = "Content-Type"

    [inputs.kafka.parser]
      type = "json"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Kafka

type Kafka struct {
	*core.BaseInput      `mapstructure:"-"`
	EnableMetrics        bool              `mapstructure:"enable_metrics"`
	Brokers              []string          `mapstructure:"brokers"`
	ClientId             string            `mapstructure:"client_id"`
	GroupId              string            `mapstructure:"group_id"`
	GroupTTL             time.Duration     `mapstructure:"group_ttl"`
	GroupBalancer        string            `mapstructure:"group_balancer"`
	Rack                 string            `mapstructure:"rack"`
	Topics               []string          `mapstructure:"topics"`
	DialTimeout          time.Duration     `mapstructure:"dial_timeout"`
	SessionTimeout       time.Duration     `mapstructure:"session_timeout"`
	RebalanceTimeout     time.Duration     `mapstructure:"rebalance_timeout"`
	HeartbeatInterval    time.Duration     `mapstructure:"heartbeat_interval"`
	ReadBatchTimeout     time.Duration     `mapstructure:"read_batch_timeout"`
	WaitBatchTimeout     time.Duration     `mapstructure:"wait_batch_timeout"`
	StartOffset          string            `mapstructure:"start_offset"`
	MaxBatchSize         datasize.Size     `mapstructure:"max_batch_size"`
	MaxUncommitted       int               `mapstructure:"max_uncommitted"`
	CommitInterval       time.Duration     `mapstructure:"commit_interval"`
	SASL                 SASL              `mapstructure:"sasl"`
	LabelHeaders         map[string]string `mapstructure:"labelheaders"`
	*ider.Ider           `mapstructure:",squash"`
	*tls.TLSClientConfig `mapstructure:",squash"`
	// contains filtered or unexported fields
}

func (*Kafka) Close

func (i *Kafka) Close() error

func (*Kafka) Init

func (i *Kafka) Init() (err error)

func (*Kafka) Run

func (i *Kafka) Run()

func (*Kafka) SetChannels

func (i *Kafka) SetChannels(out chan<- *core.Event)

func (*Kafka) SetParser

func (i *Kafka) SetParser(p core.Parser)

type SASL

type SASL struct {
	Mechanism string `mapstructure:"mechanism"`
	Username  string `mapstructure:"username"`
	Password  string `mapstructure:"password"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL