kafka

package
v3.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package kafka provides encoding and decoding functionality for Loki's Kafka integration.

Index

Constants

View Source
const (

	// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
	ProducerBatchMaxBytes = 16_000_000
)

Variables

View Source
var (
	ErrMissingKafkaAddress                 = errors.New("the Kafka address has not been configured")
	ErrMissingKafkaTopic                   = errors.New("the Kafka topic has not been configured")
	ErrInconsistentConsumerLagAtStartup    = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
	ErrInvalidMaxConsumerLagAtStartup      = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
	ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
	ErrInvalidProducerMaxRecordSizeBytes   = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
)

Functions

func Encode

func Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error)

Encode converts a logproto.Stream into one or more Kafka records. It handles splitting large streams into multiple records if necessary.

The encoding process works as follows: 1. If the stream size is smaller than maxSize, it's encoded into a single record. 2. For larger streams, it splits the entries into multiple batches, each under maxSize. 3. The data is wrapped in a Kafka record with the tenant ID as the key.

The format of each record is: - Key: Tenant ID (used for routing, not for partitioning) - Value: Protobuf serialized logproto.Stream - Partition: As specified in the partitionID parameter

Parameters: - partitionID: The Kafka partition ID for the record - tenantID: The tenant ID for the stream - stream: The logproto.Stream to be encoded - maxSize: The maximum size of each Kafka record

Types

type Config

type Config struct {
	Address      string        `yaml:"address"`
	Topic        string        `yaml:"topic"`
	ClientID     string        `yaml:"client_id"`
	DialTimeout  time.Duration `yaml:"dial_timeout"`
	WriteTimeout time.Duration `yaml:"write_timeout"`

	SASLUsername string         `yaml:"sasl_username"`
	SASLPassword flagext.Secret `yaml:"sasl_password"`

	ConsumerGroup                     string        `yaml:"consumer_group"`
	ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

	LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`

	AutoCreateTopicEnabled           bool `yaml:"auto_create_topic_enabled"`
	AutoCreateTopicDefaultPartitions int  `yaml:"auto_create_topic_default_partitions"`

	ProducerMaxRecordSizeBytes int   `yaml:"producer_max_record_size_bytes"`
	ProducerMaxBufferedBytes   int64 `yaml:"producer_max_buffered_bytes"`

	TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
	MaxConsumerLagAtStartup    time.Duration `yaml:"max_consumer_lag_at_startup"`
}

Config holds the generic config for the Kafka backend.

func (*Config) GetConsumerGroup

func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string

GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder is responsible for decoding Kafka record data back into logproto.Stream format. It caches parsed labels for efficiency.

func NewDecoder

func NewDecoder() (*Decoder, error)

func (*Decoder) Decode

func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error)

Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels. The decoding process works as follows: 1. Unmarshal the data into a logproto.Stream. 2. Parse and cache the labels for efficiency in future decodes.

Returns the decoded logproto.Stream, parsed labels, and any error encountered.

func (*Decoder) DecodeWithoutLabels

func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error)

DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL