ingest

package
v0.0.0-...-ba40f9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: AGPL-3.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingKafkaAddress               = errors.New("the Kafka address has not been configured")
	ErrMissingKafkaTopic                 = errors.New("the Kafka topic has not been configured")
	ErrInvalidWriteClients               = errors.New("the configured number of write clients is invalid (must be greater than 0)")
	ErrInvalidConsumePosition            = errors.New("the configured consume position is invalid")
	ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
	ErrInconsistentConsumerLagAtStartup  = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
	ErrInvalidMaxConsumerLagAtStartup    = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
)
View Source
var (
	// ErrWriteRequestDataItemTooLarge is the error returned when a split WriteRequest failed to be written to
	// a partition because it's larger than the max allowed record size. The size reported here is always
	// maxProducerRecordDataBytesLimit, regardless of the configured max record data size, because the ingestion
	// fails only if bigger than the upper limit.
	ErrWriteRequestDataItemTooLarge = errors.New(globalerror.DistributorMaxWriteRequestDataItemSize.Message(
		fmt.Sprintf("the write request contains a timeseries or metadata item which is larger that the maximum allowed size of %d bytes", maxProducerRecordDataBytesLimit)))
)

Functions

func IngesterPartitionID

func IngesterPartitionID(ingesterID string) (int32, error)

IngesterPartitionID returns the partition ID owner the the given ingester.

func NewKafkaReaderClient

func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error)

NewKafkaReaderClient returns the kgo.Client that should be used by the Reader.

func NewKafkaReaderClientMetrics

func NewKafkaReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics

func NewKafkaWriterClient

func NewKafkaWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error)

NewKafkaWriterClient returns the kgo.Client that should be used by the Writer.

The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

Types

type Config

type Config struct {
	Enabled     bool            `yaml:"enabled"`
	KafkaConfig KafkaConfig     `yaml:"kafka"`
	Migration   MigrationConfig `yaml:"migration"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

Validate the config.

type GetPartitionIDsFunc

type GetPartitionIDsFunc func(ctx context.Context) ([]int32, error)

type KafkaConfig

type KafkaConfig struct {
	Address      string        `yaml:"address"`
	Topic        string        `yaml:"topic"`
	ClientID     string        `yaml:"client_id"`
	DialTimeout  time.Duration `yaml:"dial_timeout"`
	WriteTimeout time.Duration `yaml:"write_timeout"`
	WriteClients int           `yaml:"write_clients"`

	ConsumerGroup                     string        `yaml:"consumer_group"`
	ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

	LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"`
	LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`

	ConsumeFromPositionAtStartup  string        `yaml:"consume_from_position_at_startup"`
	ConsumeFromTimestampAtStartup int64         `yaml:"consume_from_timestamp_at_startup"`
	TargetConsumerLagAtStartup    time.Duration `yaml:"target_consumer_lag_at_startup"`
	MaxConsumerLagAtStartup       time.Duration `yaml:"max_consumer_lag_at_startup"`

	AutoCreateTopicEnabled           bool `yaml:"auto_create_topic_enabled"`
	AutoCreateTopicDefaultPartitions int  `yaml:"auto_create_topic_default_partitions"`

	ProducerMaxRecordSizeBytes int   `yaml:"producer_max_record_size_bytes"`
	ProducerMaxBufferedBytes   int64 `yaml:"producer_max_buffered_bytes"`

	WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"`

	// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
	FallbackClientErrorSampleRate int64 `yaml:"-"`
}

KafkaConfig holds the generic config for the Kafka backend.

func (*KafkaConfig) GetConsumerGroup

func (cfg *KafkaConfig) GetConsumerGroup(instanceID string, partitionID int32) string

GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.

func (*KafkaConfig) RegisterFlags

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet)

func (*KafkaConfig) RegisterFlagsWithPrefix

func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*KafkaConfig) Validate

func (cfg *KafkaConfig) Validate() error

type KafkaLogger

type KafkaLogger struct {
	// contains filtered or unexported fields
}

func NewKafkaLogger

func NewKafkaLogger(logger log.Logger) *KafkaLogger

func (*KafkaLogger) Level

func (l *KafkaLogger) Level() kgo.LogLevel

func (*KafkaLogger) Log

func (l *KafkaLogger) Log(lev kgo.LogLevel, msg string, keyvals ...any)

type KafkaProducer

type KafkaProducer struct {
	*kgo.Client
	// contains filtered or unexported fields
}

KafkaProducer is a kgo.Client wrapper exposing some higher level features and metrics useful for producers.

func NewKafkaProducer

func NewKafkaProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *KafkaProducer

NewKafkaProducer returns a new KafkaProducer.

The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

func (*KafkaProducer) Close

func (c *KafkaProducer) Close()

func (*KafkaProducer) ProduceSync

func (c *KafkaProducer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults

ProduceSync produces records to Kafka and returns once all records have been successfully committed, or an error occurred.

This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, if the configured limit is reached.

type MigrationConfig

type MigrationConfig struct {
	DistributorSendToIngestersEnabled bool `yaml:"distributor_send_to_ingesters_enabled"`
}

MigrationConfig holds the configuration used to migrate Mimir to ingest storage. This config shouldn't be set for any other reason.

func (*MigrationConfig) RegisterFlags

func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet)

func (*MigrationConfig) RegisterFlagsWithPrefix

func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type PartitionReader

type PartitionReader struct {
	services.Service
	// contains filtered or unexported fields
}

func NewPartitionReaderForPusher

func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error)

func (*PartitionReader) WaitReadConsistencyUntilLastProducedOffset

func (r *PartitionReader) WaitReadConsistencyUntilLastProducedOffset(ctx context.Context) (returnErr error)

WaitReadConsistencyUntilLastProducedOffset waits until all data produced up until now has been consumed by the reader.

func (*PartitionReader) WaitReadConsistencyUntilOffset

func (r *PartitionReader) WaitReadConsistencyUntilOffset(ctx context.Context, offset int64) (returnErr error)

WaitReadConsistencyUntilOffset waits until all data up until input offset has been consumed by the reader.

type Pusher

type Pusher interface {
	PushToStorage(context.Context, *mimirpb.WriteRequest) error
}

type StrongReadConsistencyInstrumentation

type StrongReadConsistencyInstrumentation[T any] struct {
	// contains filtered or unexported fields
}

func NewStrongReadConsistencyInstrumentation

func NewStrongReadConsistencyInstrumentation[T any](component string, reg prometheus.Registerer) *StrongReadConsistencyInstrumentation[T]

func (*StrongReadConsistencyInstrumentation[T]) Observe

func (i *StrongReadConsistencyInstrumentation[T]) Observe(withOffset bool, f func() (T, error)) (_ T, returnErr error)

type TopicOffsetsReader

type TopicOffsetsReader struct {
	// contains filtered or unexported fields
}

TopicOffsetsReader is responsible to read the offsets of partitions in a topic.

func NewTopicOffsetsReader

func NewTopicOffsetsReader(client *kgo.Client, topic string, getPartitionIDs GetPartitionIDsFunc, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader

func NewTopicOffsetsReaderForAllPartitions

func NewTopicOffsetsReaderForAllPartitions(client *kgo.Client, topic string, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *TopicOffsetsReader

NewTopicOffsetsReaderForAllPartitions returns a TopicOffsetsReader instance that fetches the offsets for all existing partitions in a topic. The list of partitions is refreshed each time FetchLastProducedOffset() is called, so using a TopicOffsetsReader created by this function adds an extra latency to refresh partitions each time.

func (*TopicOffsetsReader) FetchLastProducedOffset

func (p *TopicOffsetsReader) FetchLastProducedOffset(ctx context.Context) (map[int32]int64, error)

FetchLastProducedOffset fetches and returns the last produced offset for each requested partition in the topic. The offset is -1 if a partition has been created but no record has been produced yet.

func (TopicOffsetsReader) WaitNextFetchLastProducedOffset

func (r TopicOffsetsReader) WaitNextFetchLastProducedOffset(ctx context.Context) (O, error)

WaitNextFetchLastProducedOffset returns the result of the *next* "last produced offset" request that will be issued.

The "last produced offset" is the offset of the last message written to the partition (starting from 0), or -1 if no message has been written yet.

type Writer

type Writer struct {
	services.Service
	// contains filtered or unexported fields
}

Writer is responsible to write incoming data to the ingest storage.

func NewWriter

func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registerer) *Writer

func (*Writer) WriteSync

func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string, req *mimirpb.WriteRequest) error

WriteSync the input data to the ingest storage. The function blocks until the data has been successfully committed, or an error occurred.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL