Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetGroupLag ¶
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error)
GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
The lag is the difference between the last produced offset (high watermark) and an offset in the "past". If the block builder committed an offset for a given partition to the consumer group at least once, then the lag is the difference between the last produced offset and the offset committed in the consumer group. Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
Types ¶
type BlockBuilder ¶
func New ¶
func New( cfg Config, logger log.Logger, reg prometheus.Registerer, limits *validation.Overrides, ) (*BlockBuilder, error)
type Config ¶
type Config struct { InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"` PartitionAssignment map[string][]int32 `yaml:"partition_assignment" category:"experimental"` DataDir string `yaml:"data_dir"` ConsumerGroup string `yaml:"consumer_group"` ConsumeInterval time.Duration `yaml:"consume_interval"` ConsumeIntervalBuffer time.Duration `yaml:"consume_interval_buffer"` LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"` ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"` // Config parameters defined outside the block-builder config and are injected dynamically. Kafka ingest.KafkaConfig `yaml:"-"` BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"` }
type PartitionState ¶
type PartitionState struct { // Commit is the offset of the next record we'll start consuming. Commit kadm.Offset // CommitRecordTimestamp is the timestamp of the record whose offset was committed (and not the time of commit). CommitRecordTimestamp time.Time // LastSeenOffset is the offset of the last record consumed in the commiter-cycle. It can be greater than Commit.Offset if previous cycle overconsumed. LastSeenOffset int64 // LastBlockEnd is the timestamp of the block end in the commiter-cycle. LastBlockEnd time.Time }
func PartitionStateFromLag ¶
func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackMillis int64) PartitionState
type TSDBBuilder ¶
type TSDBBuilder struct {
// contains filtered or unexported fields
}
func NewTSDBBuilder ¶
func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics, applyMaxGlobalSeriesPerUserBelow int) *TSDBBuilder
func (*TSDBBuilder) Close ¶
func (b *TSDBBuilder) Close() error
Close closes all DBs and deletes their data directories. This functions is useful when block builder has faced some unrecoverable error and has to discard the block building for the current cycle.
func (*TSDBBuilder) CompactAndUpload ¶
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error)
CompactAndUpload compacts the blocks of all the TSDBs and uploads them. uploadBlocks is a function that uploads the blocks to the required storage. All the DBs are closed and directories cleared irrespective of success or failure of this function.
func (*TSDBBuilder) Process ¶
func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error)
Process puts the samples in the TSDB. Some parts taken from (*Ingester).pushSamplesToAppender. It returns false if at least one sample was skipped to process later, true otherwise. true also includes the cases where the sample was not put in the TSDB because it was discarded or was already processed before. lastBlockMax: max time of the block in the previous block building cycle. blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB. recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.)