Documentation ¶
Index ¶
- Constants
- Variables
- func CreateTopic(cfg KafkaConfig, logger log.Logger) error
- func IngesterPartitionID(ingesterID string) (int32, error)
- func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error)
- func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics
- func NewKafkaWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, ...) (*kgo.Client, error)
- type Config
- type GetPartitionIDsFunc
- type KafkaConfig
- type KafkaLogger
- type KafkaProducer
- type MigrationConfig
- type PartitionReader
- func (r *PartitionReader) BufferedBytes() int64
- func (r *PartitionReader) BufferedRecords() int64
- func (r *PartitionReader) EstimatedBytesPerRecord() int64
- func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context)
- func (r *PartitionReader) Stop()
- func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error)
- func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error)
- type Pusher
- type PusherCloser
- type StrongReadConsistencyInstrumentation
- type TopicOffsetsReader
- type Writer
Constants ¶
const (
// ReaderMetricsPrefix is the reader metrics prefix used by the ingest storage.
ReaderMetricsPrefix = "cortex_ingest_storage_reader"
)
Variables ¶
var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") ErrInvalidWriteClients = errors.New("the configured number of write clients is invalid (must be greater than 0)") ErrInvalidConsumePosition = errors.New("the configured consume position is invalid") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) ErrInconsistentConsumerLagAtStartup = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") ErrInvalidMaxConsumerLagAtStartup = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") ErrInconsistentSASLCredentials = fmt.Errorf("the SASL username and password must be both configured to enable SASL authentication") ErrInvalidIngestionConcurrencyMax = errors.New("ingest-storage.kafka.ingestion-concurrency-max must either be set to 0 or to a value greater than 0") ErrInvalidIngestionConcurrencyParams = errors.New("ingest-storage.kafka.ingestion-concurrency-queue-capacity, ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample, ingest-storage.kafka.ingestion-concurrency-batch-size and ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard must be greater than 0") ErrInvalidAutoCreateTopicParams = errors.New("ingest-storage.kafka.auto-create-topic-default-partitions must be -1 or greater than 0 when ingest-storage.kafka.auto-create-topic-default-partitions=true") )
var ( // ErrWriteRequestDataItemTooLarge is the error returned when a split WriteRequest failed to be written to // a partition because it's larger than the max allowed record size. The size reported here is always // maxProducerRecordDataBytesLimit, regardless of the configured max record data size, because the ingestion // fails only if bigger than the upper limit. ErrWriteRequestDataItemTooLarge = errors.New(globalerror.DistributorMaxWriteRequestDataItemSize.Message( fmt.Sprintf("the write request contains a timeseries or metadata item which is larger that the maximum allowed size of %d bytes", maxProducerRecordDataBytesLimit))) )
Functions ¶
func CreateTopic ¶
func CreateTopic(cfg KafkaConfig, logger log.Logger) error
CreateTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned. If the topic already exists, then the function logs a message and returns nil.
func IngesterPartitionID ¶
IngesterPartitionID returns the partition ID owner the the given ingester.
func NewKafkaReaderClient ¶
func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error)
NewKafkaReaderClient returns the kgo.Client that should be used by the Reader.
func NewKafkaReaderClientMetrics ¶
func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics
func NewKafkaWriterClient ¶
func NewKafkaWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error)
NewKafkaWriterClient returns the kgo.Client that should be used by the Writer.
The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).
Types ¶
type Config ¶
type Config struct { Enabled bool `yaml:"enabled"` KafkaConfig KafkaConfig `yaml:"kafka"` Migration MigrationConfig `yaml:"migration"` }
func (*Config) RegisterFlags ¶
type KafkaConfig ¶
type KafkaConfig struct { Address string `yaml:"address"` Topic string `yaml:"topic"` ClientID string `yaml:"client_id"` DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` WriteClients int `yaml:"write_clients"` SASLUsername string `yaml:"sasl_username"` SASLPassword flagext.Secret `yaml:"sasl_password"` ConsumerGroup string `yaml:"consumer_group"` ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"` ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"` TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"` // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. FallbackClientErrorSampleRate int64 `yaml:"-"` FetchConcurrencyMax int `yaml:"fetch_concurrency_max"` UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` MaxBufferedBytes int `yaml:"max_buffered_bytes"` IngestionConcurrencyMax int `yaml:"ingestion_concurrency_max"` IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"` // IngestionConcurrencyQueueCapacity controls how many batches can be enqueued for flushing series to the TSDB HEAD. // We don't want to push any batches in parallel and instead want to prepare the next ones while the current one finishes, hence the buffer of 5. // For example, if we flush 1 batch/sec, then batching 2 batches/sec doesn't make us faster. // This is our initial assumption, and there's potential in testing with higher numbers if there's a high variability in flush times - assuming we can preserve the order of the batches. For now, we'll stick to 5. // If there's high variability in the time to flush or in the time to batch, then this buffer might need to be increased. IngestionConcurrencyQueueCapacity int `yaml:"ingestion_concurrency_queue_capacity"` // IngestionConcurrencyTargetFlushesPerShard is the number of flushes we want to target per shard. // There is some overhead in the parallelization. With fewer flushes, the overhead of splitting up the work is higher than the benefit of parallelization. // the default of 80 was devised experimentally to keep the memory and CPU usage low ongoing consumption, while keeping replay speed high during cold replay. IngestionConcurrencyTargetFlushesPerShard int `yaml:"ingestion_concurrency_target_flushes_per_shard"` // IngestionConcurrencyEstimatedBytesPerSample is the estimated number of bytes per sample. // Our data indicates that the average sample size is somewhere between ~250 and ~500 bytes. We'll use 500 bytes as a conservative estimate. IngestionConcurrencyEstimatedBytesPerSample int `yaml:"ingestion_concurrency_estimated_bytes_per_sample"` // contains filtered or unexported fields }
KafkaConfig holds the generic config for the Kafka backend.
func (*KafkaConfig) GetConsumerGroup ¶
func (cfg *KafkaConfig) GetConsumerGroup(instanceID string, partitionID int32) string
GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.
func (*KafkaConfig) RegisterFlags ¶
func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)
func (*KafkaConfig) RegisterFlagsWithPrefix ¶
func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
func (*KafkaConfig) Validate ¶
func (cfg *KafkaConfig) Validate() error
type KafkaLogger ¶
type KafkaLogger struct {
// contains filtered or unexported fields
}
func NewKafkaLogger ¶
func NewKafkaLogger(logger log.Logger) *KafkaLogger
func (*KafkaLogger) Level ¶
func (l *KafkaLogger) Level() kgo.LogLevel
type KafkaProducer ¶
KafkaProducer is a kgo.Client wrapper exposing some higher level features and metrics useful for producers.
func NewKafkaProducer ¶
func NewKafkaProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *KafkaProducer
NewKafkaProducer returns a new KafkaProducer.
The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).
func (*KafkaProducer) Close ¶
func (c *KafkaProducer) Close()
func (*KafkaProducer) ProduceSync ¶
func (c *KafkaProducer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
ProduceSync produces records to Kafka and returns once all records have been successfully committed, or an error occurred.
This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, if the configured limit is reached.
type MigrationConfig ¶
type MigrationConfig struct {
DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}
MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.
func (*MigrationConfig) RegisterFlags ¶
func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)
func (*MigrationConfig) RegisterFlagsWithPrefix ¶
func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type PartitionReader ¶
func NewPartitionReaderForPusher ¶
func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)
func (*PartitionReader) BufferedBytes ¶
func (r *PartitionReader) BufferedBytes() int64
func (*PartitionReader) BufferedRecords ¶
func (r *PartitionReader) BufferedRecords() int64
func (*PartitionReader) EstimatedBytesPerRecord ¶
func (r *PartitionReader) EstimatedBytesPerRecord() int64
func (*PartitionReader) PollFetches ¶
func (*PartitionReader) WaitReadConsistencyUntilLastProducedOffset ¶
func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error)
WaitReadConsistencyUntilLastProducedOffset waits until all data produced up until now has been consumed by the reader.
func (*PartitionReader) WaitReadConsistencyUntilOffset ¶
func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error)
WaitReadConsistencyUntilOffset waits until all data up until input offset has been consumed by the reader.
type PusherCloser ¶
type StrongReadConsistencyInstrumentation ¶
type StrongReadConsistencyInstrumentation[T any] struct { // contains filtered or unexported fields }
func NewStrongReadConsistencyInstrumentation ¶
func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer, topics []string) *StrongReadConsistencyInstrumentation[T]
type TopicOffsetsReader ¶
type TopicOffsetsReader struct {
// contains filtered or unexported fields
}
TopicOffsetsReader is responsible to read the offsets of partitions in a topic.
func NewTopicOffsetsReader ¶
func NewTopicOffsetsReader(client *kgo.Client, topic string, getPartitionIDs GetPartitionIDsFunc, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader
func NewTopicOffsetsReaderForAllPartitions ¶
func NewTopicOffsetsReaderForAllPartitions(client *kgo.Client, topic string, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader
NewTopicOffsetsReaderForAllPartitions returns a TopicOffsetsReader instance that fetches the offsets for all existing partitions in a topic. The list of partitions is refreshed each time FetchLastProducedOffset() is called, so using a TopicOffsetsReader created by this function adds an extra latency to refresh partitions each time.
func (TopicOffsetsReader) CachedOffset ¶
func (r TopicOffsetsReader) CachedOffset() (O, error)
CachedOffset returns the last result of fetching the offset. This is likely outdated, but it's useful to get a directionally correct value quickly.
func (*TopicOffsetsReader) FetchLastProducedOffset ¶
FetchLastProducedOffset fetches and returns the last produced offset for each requested partition in the topic. The offset is -1 if a partition has been created but no record has been produced yet.
func (*TopicOffsetsReader) Topic ¶
func (p *TopicOffsetsReader) Topic() string
func (TopicOffsetsReader) WaitNextFetchLastProducedOffset ¶
WaitNextFetchLastProducedOffset returns the result of the *next* "last produced offset" request that will be issued.
The "last produced offset" is the offset of the last message written to the partition (starting from 0), or -1 if no message has been written yet.
type Writer ¶
Writer is responsible to write incoming data to the ingest storage.
func NewWriter ¶
func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer