ingest

package
v0.0.0-...-ddb0f58 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// ReaderMetricsPrefix is the reader metrics prefix used by the ingest storage.
	ReaderMetricsPrefix = "cortex_ingest_storage_reader"
)

Variables

View Source
var (
	ErrMissingKafkaAddress               = errors.New("the Kafka address has not been configured")
	ErrMissingKafkaTopic                 = errors.New("the Kafka topic has not been configured")
	ErrInvalidWriteClients               = errors.New("the configured number of write clients is invalid (must be greater than 0)")
	ErrInvalidConsumePosition            = errors.New("the configured consume position is invalid")
	ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
	ErrInconsistentConsumerLagAtStartup  = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
	ErrInvalidMaxConsumerLagAtStartup    = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
	ErrInconsistentSASLCredentials       = fmt.Errorf("the SASL username and password must be both configured to enable SASL authentication")
	ErrInvalidIngestionConcurrencyMax    = errors.New("ingest-storage.kafka.ingestion-concurrency-max must either be set to 0 or to a value greater than 0")
	ErrInvalidIngestionConcurrencyParams = errors.New("ingest-storage.kafka.ingestion-concurrency-queue-capacity, ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample, ingest-storage.kafka.ingestion-concurrency-batch-size and ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard must be greater than 0")
	ErrInvalidAutoCreateTopicParams      = errors.New("ingest-storage.kafka.auto-create-topic-default-partitions must be -1 or greater than 0 when ingest-storage.kafka.auto-create-topic-default-partitions=true")
)
View Source
var (
	// ErrWriteRequestDataItemTooLarge is the error returned when a split WriteRequest failed to be written to
	// a partition because it's larger than the max allowed record size. The size reported here is always
	// maxProducerRecordDataBytesLimit, regardless of the configured max record data size, because the ingestion
	// fails only if bigger than the upper limit.
	ErrWriteRequestDataItemTooLarge = errors.New(globalerror.DistributorMaxWriteRequestDataItemSize.Message(
		fmt.Sprintf("the write request contains a timeseries or metadata item which is larger that the maximum allowed size of %d bytes", maxProducerRecordDataBytesLimit)))
)

Functions

func CreateTopic

func CreateTopic(cfg KafkaConfig, logger log.Logger) error

CreateTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned. If the topic already exists, then the function logs a message and returns nil.

func IngesterPartitionID

func IngesterPartitionID(ingesterID string) (int32, error)

IngesterPartitionID returns the partition ID owner the the given ingester.

func NewKafkaReaderClient

func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error)

NewKafkaReaderClient returns the kgo.Client that should be used by the Reader.

func NewKafkaReaderClientMetrics

func NewKafkaReaderClientMetrics(prefix, component string, reg prometheus.Registerer) *kprom.Metrics

func NewKafkaWriterClient

func NewKafkaWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error)

NewKafkaWriterClient returns the kgo.Client that should be used by the Writer.

The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

Types

type Config

type Config struct {
	Enabled     bool            `yaml:"enabled"`
	KafkaConfig KafkaConfig     `yaml:"kafka"`
	Migration   MigrationConfig `yaml:"migration"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

Validate the config.

type GetPartitionIDsFunc

type GetPartitionIDsFunc func(ctx context.Context) ([]int32, error)

type KafkaConfig

type KafkaConfig struct {
	Address      string        `yaml:"address"`
	Topic        string        `yaml:"topic"`
	ClientID     string        `yaml:"client_id"`
	DialTimeout  time.Duration `yaml:"dial_timeout"`
	WriteTimeout time.Duration `yaml:"write_timeout"`
	WriteClients int           `yaml:"write_clients"`

	SASLUsername string         `yaml:"sasl_username"`
	SASLPassword flagext.Secret `yaml:"sasl_password"`

	ConsumerGroup                     string        `yaml:"consumer_group"`
	ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

	LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"`
	LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`

	ConsumeFromPositionAtStartup  string        `yaml:"consume_from_position_at_startup"`
	ConsumeFromTimestampAtStartup int64         `yaml:"consume_from_timestamp_at_startup"`
	TargetConsumerLagAtStartup    time.Duration `yaml:"target_consumer_lag_at_startup"`
	MaxConsumerLagAtStartup       time.Duration `yaml:"max_consumer_lag_at_startup"`

	AutoCreateTopicEnabled           bool `yaml:"auto_create_topic_enabled"`
	AutoCreateTopicDefaultPartitions int  `yaml:"auto_create_topic_default_partitions"`

	ProducerMaxRecordSizeBytes int   `yaml:"producer_max_record_size_bytes"`
	ProducerMaxBufferedBytes   int64 `yaml:"producer_max_buffered_bytes"`

	WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"`

	// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
	FallbackClientErrorSampleRate int64 `yaml:"-"`

	FetchConcurrencyMax               int  `yaml:"fetch_concurrency_max"`
	UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
	MaxBufferedBytes                  int  `yaml:"max_buffered_bytes"`

	IngestionConcurrencyMax       int `yaml:"ingestion_concurrency_max"`
	IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`

	// IngestionConcurrencyQueueCapacity controls how many batches can be enqueued for flushing series to the TSDB HEAD.
	// We don't want to push any batches in parallel and instead want to prepare the next ones while the current one finishes, hence the buffer of 5.
	// For example, if we flush 1 batch/sec, then batching 2 batches/sec doesn't make us faster.
	// This is our initial assumption, and there's potential in testing with higher numbers if there's a high variability in flush times - assuming we can preserve the order of the batches. For now, we'll stick to 5.
	// If there's high variability in the time to flush or in the time to batch, then this buffer might need to be increased.
	IngestionConcurrencyQueueCapacity int `yaml:"ingestion_concurrency_queue_capacity"`

	// IngestionConcurrencyTargetFlushesPerShard is the number of flushes we want to target per shard.
	// There is some overhead in the parallelization. With fewer flushes, the overhead of splitting up the work is higher than the benefit of parallelization.
	// the default of 80 was devised experimentally to keep the memory and CPU usage low ongoing consumption, while keeping replay speed high during cold replay.
	IngestionConcurrencyTargetFlushesPerShard int `yaml:"ingestion_concurrency_target_flushes_per_shard"`

	// IngestionConcurrencyEstimatedBytesPerSample is the estimated number of bytes per sample.
	// Our data indicates that the average sample size is somewhere between ~250 and ~500 bytes. We'll use 500 bytes as a conservative estimate.
	IngestionConcurrencyEstimatedBytesPerSample int `yaml:"ingestion_concurrency_estimated_bytes_per_sample"`
	// contains filtered or unexported fields
}

KafkaConfig holds the generic config for the Kafka backend.

func (*KafkaConfig) GetConsumerGroup

func (cfg *KafkaConfig) GetConsumerGroup(instanceID string, partitionID int32) string

GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.

func (*KafkaConfig) RegisterFlags

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)

func (*KafkaConfig) RegisterFlagsWithPrefix

func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*KafkaConfig) Validate

func (cfg *KafkaConfig) Validate() error

type KafkaLogger

type KafkaLogger struct {
	// contains filtered or unexported fields
}

func NewKafkaLogger

func NewKafkaLogger(logger log.Logger) *KafkaLogger

func (*KafkaLogger) Level

func (l *KafkaLogger) Level() kgo.LogLevel

func (*KafkaLogger) Log

func (l *KafkaLogger) Log(lev kgo.LogLevel, msg string, keyvals ...any)

type KafkaProducer

type KafkaProducer struct {
	*kgo.Client
	// contains filtered or unexported fields
}

KafkaProducer is a kgo.Client wrapper exposing some higher level features and metrics useful for producers.

func NewKafkaProducer

func NewKafkaProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *KafkaProducer

NewKafkaProducer returns a new KafkaProducer.

The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

func (*KafkaProducer) Close

func (c *KafkaProducer) Close()

func (*KafkaProducer) ProduceSync

func (c *KafkaProducer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults

ProduceSync produces records to Kafka and returns once all records have been successfully committed, or an error occurred.

This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, if the configured limit is reached.

type MigrationConfig

type MigrationConfig struct {
	DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}

MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.

func (*MigrationConfig) RegisterFlags

func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)

func (*MigrationConfig) RegisterFlagsWithPrefix

func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type PartitionReader

type PartitionReader struct {
	services.Service
	// contains filtered or unexported fields
}

func NewPartitionReaderForPusher

func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)

func (*PartitionReader) BufferedBytes

func (r *PartitionReader) BufferedBytes() int64

func (*PartitionReader) BufferedRecords

func (r *PartitionReader) BufferedRecords() int64

func (*PartitionReader) EstimatedBytesPerRecord

func (r *PartitionReader) EstimatedBytesPerRecord() int64

func (*PartitionReader) PollFetches

func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context)

func (*PartitionReader) Stop

func (r *PartitionReader) Stop()

Stop implements fetcher

func (*PartitionReader) WaitReadConsistencyUntilLastProducedOffset

func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error)

WaitReadConsistencyUntilLastProducedOffset waits until all data produced up until now has been consumed by the reader.

func (*PartitionReader) WaitReadConsistencyUntilOffset

func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error)

WaitReadConsistencyUntilOffset waits until all data up until input offset has been consumed by the reader.

type Pusher

type Pusher interface {
	PushToStorage(context.Context, *mimirpb.WriteRequest) error
}

type PusherCloser

type PusherCloser interface {
	// PushToStorage pushes the write request to the storage.
	PushToStorage(context.Context, *mimirpb.WriteRequest) error
	// Close tells the PusherCloser that no more records are coming and it should flush any remaining records.
	Close() []error
}

type StrongReadConsistencyInstrumentation

type StrongReadConsistencyInstrumentation[T any] struct {
	// contains filtered or unexported fields
}

func NewStrongReadConsistencyInstrumentation

func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T]

func (*StrongReadConsistencyInstrumentation[T]) Observe

func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error)

type TopicOffsetsReader

type TopicOffsetsReader struct {
	// contains filtered or unexported fields
}

TopicOffsetsReader is responsible to read the offsets of partitions in a topic.

func NewTopicOffsetsReader

func NewTopicOffsetsReader(client *kgo.Client, topic string, getPartitionIDs GetPartitionIDsFunc, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader

func NewTopicOffsetsReaderForAllPartitions

func NewTopicOffsetsReaderForAllPartitions(client *kgo.Client, topic string, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader

NewTopicOffsetsReaderForAllPartitions returns a TopicOffsetsReader instance that fetches the offsets for all existing partitions in a topic. The list of partitions is refreshed each time FetchLastProducedOffset() is called, so using a TopicOffsetsReader created by this function adds an extra latency to refresh partitions each time.

func (TopicOffsetsReader) CachedOffset

func (r TopicOffsetsReader) CachedOffset() (O, error)

CachedOffset returns the last result of fetching the offset. This is likely outdated, but it's useful to get a directionally correct value quickly.

func (*TopicOffsetsReader) FetchLastProducedOffset

func (p *TopicOffsetsReader) FetchLastProducedOffset(ctx context.Context) (map[int32]int64, error)

FetchLastProducedOffset fetches and returns the last produced offset for each requested partition in the topic. The offset is -1 if a partition has been created but no record has been produced yet.

func (TopicOffsetsReader) WaitNextFetchLastProducedOffset

func (r TopicOffsetsReader) WaitNextFetchLastProducedOffset(ctx context.Context) (O, error)

WaitNextFetchLastProducedOffset returns the result of the *next* "last produced offset" request that will be issued.

The "last produced offset" is the offset of the last message written to the partition (starting from 0), or -1 if no message has been written yet.

type Writer

type Writer struct {
	services.Service
	// contains filtered or unexported fields
}

Writer is responsible to write incoming data to the ingest storage.

func NewWriter

func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer

func (*Writer) WriteSync

func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string, req *mimirpb.WriteRequest) error

WriteSync the input data to the ingest storage. The function blocks until the data has been successfully committed, or an error occurred.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL