Documentation
¶
Overview ¶
Package kafka provides encoding and decoding functionality for Loki's Kafka integration.
Index ¶
Constants ¶
const (
// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
ProducerBatchMaxBytes = 16_000_000
)
Variables ¶
var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) )
Functions ¶
func Encode ¶
func Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error)
Encode converts a logproto.Stream into one or more Kafka records. It handles splitting large streams into multiple records if necessary.
The encoding process works as follows: 1. If the stream size is smaller than maxSize, it's encoded into a single record. 2. For larger streams, it splits the entries into multiple batches, each under maxSize. 3. The data is wrapped in a Kafka record with the tenant ID as the key.
The format of each record is: - Key: Tenant ID (used for routing, not for partitioning) - Value: Protobuf serialized logproto.Stream - Partition: As specified in the partitionID parameter
Parameters: - partitionID: The Kafka partition ID for the record - tenantID: The tenant ID for the stream - stream: The logproto.Stream to be encoded - maxSize: The maximum size of each Kafka record
Types ¶
type Config ¶
type Config struct { Address string `yaml:"address"` Topic string `yaml:"topic"` ClientID string `yaml:"client_id"` DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` SASLUsername string `yaml:"sasl_username"` SASLPassword flagext.Secret `yaml:"sasl_password"` ConsumerGroup string `yaml:"consumer_group"` ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` }
Config holds the generic config for the Kafka backend.
func (*Config) GetConsumerGroup ¶
GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.
func (*Config) RegisterFlags ¶
func (*Config) RegisterFlagsWithPrefix ¶
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
Decoder is responsible for decoding Kafka record data back into logproto.Stream format. It caches parsed labels for efficiency.
func NewDecoder ¶
func (*Decoder) Decode ¶
Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels. The decoding process works as follows: 1. Unmarshal the data into a logproto.Stream. 2. Parse and cache the labels for efficiency in future decodes.
Returns the decoded logproto.Stream, parsed labels, and any error encountered.