blockbuilder

package
v0.0.0-...-fbfa8e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: AGPL-3.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetGroupLag

func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error)

GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.

The lag is the difference between the last produced offset (high watermark) and an offset in the "past". If the block builder committed an offset for a given partition to the consumer group at least once, then the lag is the difference between the last produced offset and the offset committed in the consumer group. Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.

Types

type BlockBuilder

type BlockBuilder struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(
	cfg Config,
	logger log.Logger,
	reg prometheus.Registerer,
	limits *validation.Overrides,
) (*BlockBuilder, error)

type Config

type Config struct {
	InstanceID          string             `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
	PartitionAssignment map[string][]int32 `yaml:"partition_assignment" category:"experimental"`
	DataDir             string             `yaml:"data_dir"`

	ConsumerGroup         string        `yaml:"consumer_group"`
	ConsumeInterval       time.Duration `yaml:"consume_interval"`
	ConsumeIntervalBuffer time.Duration `yaml:"consume_interval_buffer"`
	LookbackOnNoCommit    time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`

	ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

	// Config parameters defined outside the block-builder config and are injected dynamically.
	Kafka         ingest.KafkaConfig       `yaml:"-"`
	BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

func (*Config) Validate

func (cfg *Config) Validate() error

type PartitionState

type PartitionState struct {
	// Commit is the offset of the next record we'll start consuming.
	Commit kadm.Offset
	// CommitRecordTimestamp is the timestamp of the record whose offset was committed (and not the time of commit).
	CommitRecordTimestamp time.Time
	// LastSeenOffset is the offset of the last record consumed in the commiter-cycle. It can be greater than Commit.Offset if previous cycle overconsumed.
	LastSeenOffset int64
	// LastBlockEnd is the timestamp of the block end in the commiter-cycle.
	LastBlockEnd time.Time
}

func PartitionStateFromLag

func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackMillis int64) PartitionState

type TSDBBuilder

type TSDBBuilder struct {
	// contains filtered or unexported fields
}

func NewTSDBBuilder

func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics, applyMaxGlobalSeriesPerUserBelow int) *TSDBBuilder

func (*TSDBBuilder) Close

func (b *TSDBBuilder) Close() error

Close closes all DBs and deletes their data directories. This functions is useful when block builder has faced some unrecoverable error and has to discard the block building for the current cycle.

func (*TSDBBuilder) CompactAndUpload

func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error)

CompactAndUpload compacts the blocks of all the TSDBs and uploads them. uploadBlocks is a function that uploads the blocks to the required storage. All the DBs are closed and directories cleared irrespective of success or failure of this function.

func (*TSDBBuilder) Process

func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordAlreadyProcessed bool) (_ bool, err error)

Process puts the samples in the TSDB. Some parts taken from (*Ingester).pushSamplesToAppender. It returns false if at least one sample was skipped to process later, true otherwise. true also includes the cases where the sample was not put in the TSDB because it was discarded or was already processed before. lastBlockMax: max time of the block in the previous block building cycle. blockMax: max time of the block in the current block building cycle. This blockMax is exclusive of the last sample by design in TSDB. recordAlreadyProcessed: true if the record was processed in the previous cycle. (It gets processed again if some samples did not fit in the previous cycle.)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL