Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") ErrInvalidWriteClients = errors.New("the configured number of write clients is invalid (must be greater than 0)") ErrInvalidConsumePosition = errors.New("the configured consume position is invalid") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) )
View Source
var ( // ErrWriteRequestDataItemTooLarge is the error returned when a split WriteRequest failed to be written to // a partition because it's larger than the max allowed record size. The size reported here is always // maxProducerRecordDataBytesLimit, regardless of the configured max record data size, because the ingestion // fails only if bigger than the upper limit. ErrWriteRequestDataItemTooLarge = errors.New(globalerror.DistributorMaxWriteRequestDataItemSize.Message( fmt.Sprintf("the write request contains a timeseries or metadata item which is larger that the maximum allowed size of %d bytes", maxProducerRecordDataBytesLimit))) )
Functions ¶
func IngesterPartitionID ¶
IngesterPartitionID returns the partition ID owner the the given ingester.
Types ¶
type Config ¶
type Config struct { Enabled bool `yaml:"enabled"` KafkaConfig KafkaConfig `yaml:"kafka"` Migration MigrationConfig `yaml:"migration"` }
func (*Config) RegisterFlags ¶
type KafkaConfig ¶
type KafkaConfig struct { Address string `yaml:"address"` Topic string `yaml:"topic"` ClientID string `yaml:"client_id"` DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` WriteClients int `yaml:"write_clients"` ConsumerGroup string `yaml:"consumer_group"` ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"` LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"` ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"` ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. FallbackClientErrorSampleRate int64 `yaml:"-"` }
KafkaConfig holds the generic config for the Kafka backend.
func (*KafkaConfig) GetConsumerGroup ¶
func (cfg *KafkaConfig) GetConsumerGroup(instanceID string, partitionID int32) string
GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.
func (*KafkaConfig) RegisterFlags ¶
func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)
func (*KafkaConfig) RegisterFlagsWithPrefix ¶
func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
func (*KafkaConfig) Validate ¶
func (cfg *KafkaConfig) Validate() error
type MigrationConfig ¶
type MigrationConfig struct {
DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}
MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.
func (*MigrationConfig) RegisterFlags ¶
func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)
func (*MigrationConfig) RegisterFlagsWithPrefix ¶
func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type PartitionReader ¶
func NewPartitionReaderForPusher ¶
func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)
func (*PartitionReader) WaitReadConsistency ¶
func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr error)
WaitReadConsistency waits until all data produced up until now has been consumed by the reader.
type Writer ¶
Writer is responsible to write incoming data to the ingest storage.
func NewWriter ¶
func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer
Click to show internal directories.
Click to hide internal directories.