tsdb

package
v0.0.0-...-fe1d711 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: AGPL-3.0 Imports: 31 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// DeprecatedTenantIDExternalLabel is the external label containing the tenant ID,
	// set when shipping blocks to the storage.
	//
	// Mimir no longer generates blocks with this label, however old blocks may still use this label.
	DeprecatedTenantIDExternalLabel = "__org_id__"

	// DeprecatedIngesterIDExternalLabel is the external label containing the ingester ID,
	// set when shipping blocks to the storage.
	//
	// Mimir no longer generates blocks with this label, however old blocks may still use this label.
	DeprecatedIngesterIDExternalLabel = "__ingester_id__"

	// CompactorShardIDExternalLabel is the external label used to store
	// the ID of a sharded block generated by the split-and-merge compactor. If a block hasn't
	// this label, it means the block hasn't been split.
	CompactorShardIDExternalLabel = "__compactor_shard_id__"

	// DeprecatedShardIDExternalLabel is deprecated.
	DeprecatedShardIDExternalLabel = "__shard_id__"

	// OutOfOrderExternalLabel is the external label used to mark blocks
	// containing out-of-order data.
	OutOfOrderExternalLabel = "__out_of_order__"

	// OutOfOrderExternalLabelValue is the value to be used for the OutOfOrderExternalLabel label
	OutOfOrderExternalLabelValue = "true"

	// DefaultCloseIdleTSDBInterval is how often are open TSDBs checked for being idle and closed.
	DefaultCloseIdleTSDBInterval = 5 * time.Minute

	// DeletionMarkCheckInterval is how often to check for tenant deletion mark.
	DeletionMarkCheckInterval = 1 * time.Hour

	// EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases.
	// This changed in prometheus as of https://github.com/prometheus/prometheus/commit/8ef7dfdeebf0a7491973303c7fb6b68ec5cc065b
	// which capped the max XOR and histogram chunk size to 1KiB.
	// Once block with this limit become more common this constant can be revisited.
	EstimatedMaxChunkSize = 16000

	// EstimatedSeriesP99Size is the size in bytes of a single series in the TSDB index. This includes the symbol IDs in
	// the symbols table (not the actual label strings) and the chunk refs (min time, max time, offset).
	// This is an estimation that should cover >99% of series with less than 30 labels and around 50 chunks per series.
	EstimatedSeriesP99Size = 512

	// MaxSeriesSize is an estimated worst case for a series size in the index. A valid series may still be larger than this.
	// This was calculated assuming a rate of 1 sample/sec, in a 24h block we have 744 chunks per series.
	// The worst case scenario of each meta ref is 8*3=24 bytes, so 744*24 = 17856 bytes, which is 448 bytes away form 17 KiB.
	MaxSeriesSize = 17 * 1024

	// BytesPerPostingInAPostingList is the number of bytes that each posting (series ID) takes in a
	// posting list in the index. Each posting is 4 bytes (uint32) which are the offset of the series in the index file.
	BytesPerPostingInAPostingList = 4

	// DefaultPostingOffsetInMemorySampling represents default value for --store.index-header-posting-offsets-in-mem-sampling.
	// 32 value is chosen as it's a good balance for common setups. Sampling that is not too large (too many CPU cycles) and
	// not too small (too much memory).
	DefaultPostingOffsetInMemorySampling = 32

	// DefaultPostingsForMatchersCacheMaxBytes setting puts a limit cap on the per-TSDB head/block max PostingsForMatchers() cache size in bytes.
	// This limit is *in addition* to the max size in items (default to 100 items).
	//
	// This value is most useful on Mimir clusters with 1 tenant. If the tenant runs very high cardinality
	// queries (e.g. a query touching 1M series / ingester) then with the previous default cache
	// size of 10MB we may not be able to effectively use the cache.
	//
	// A single cached posting takes about 9 bytes in the cache, on average. The default max cache size as number of items
	// is 100, so having a 100MB cache per-tenant for the TSDB Head means we can cache 100MB / 100 / 9 = 116k postings
	// per cached entry on average.
	DefaultPostingsForMatchersCacheMaxBytes = 100 * 1024 * 1024

	// DefaultPartitionerMaxGapSize is the default max size - in bytes - of a gap for which the store-gateway
	// partitioner aggregates together two bucket GET object requests.
	DefaultPartitionerMaxGapSize = uint64(512 * 1024)

	// NewBlockDiscoveryDelayMultiplier is the factor used (multiplied with BucketStoreConfig.SyncInterval) to determine
	// when querying a newly uploaded block is required.
	// For example, if BucketStoreConfig.SyncInterval is 15 minutes, then a querier will require that it can query all
	// blocks uploaded at least 45 minutes ago, and not fail queries when a store-gateway does not respond to queries for
	// newer blocks (those uploaded in the last 45 minutes).
	// This gives store-gateways time to discover and load newly created blocks.
	NewBlockDiscoveryDelayMultiplier = 3

	DefaultMaxTSDBOpeningConcurrencyOnStartup = 10
)
View Source
const (
	// IndexCacheBackendInMemory is the value for the in-memory index cache backend.
	IndexCacheBackendInMemory = "inmemory"

	// IndexCacheBackendMemcached is the value for the Memcached index cache backend.
	IndexCacheBackendMemcached = cache.BackendMemcached

	// IndexCacheBackendRedis is the value for the Redis index cache backend.
	IndexCacheBackendRedis = cache.BackendRedis

	// IndexCacheBackendDefault is the value for the default index cache backend.
	IndexCacheBackendDefault = IndexCacheBackendInMemory
)
View Source
const TenantDeletionMarkPath = "markers/tenant-deletion-mark.json"

Relative to user-specific prefix.

Variables

This section is empty.

Functions

func AllUsers

func AllUsers(_ string) (bool, error)

AllUsers returns true to each call and should be used whenever the UsersScanner should not filter out any user due to sharding.

func CreateCachingBucket

func CreateCachingBucket(chunksCache cache.Cache, chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error)

func HashBlockID

func HashBlockID(id ulid.ULID) uint32

HashBlockID returns a 32-bit hash of the block ID useful for ring-based sharding.

func ListUsers

func ListUsers(ctx context.Context, bucketClient objstore.Bucket) (users []string, err error)

ListUsers returns all user IDs found scanning the root of the bucket.

func NewIndexCache

func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (indexcache.IndexCache, error)

NewIndexCache creates a new index cache based on the input configuration.

func TenantDeletionMarkExists

func TenantDeletionMarkExists(ctx context.Context, bkt objstore.BucketReader, userID string) (bool, error)

TenantDeletionMarkExists checks for deletion mark for tenant. Errors other than "object not found" are returned.

func WriteTenantDeletionMark

func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, mark *TenantDeletionMark) error

WriteTenantDeletionMark uploads deletion mark to the tenant location in the bucket.

Types

type BlocksStorageConfig

type BlocksStorageConfig struct {
	Bucket      bucket.Config     `yaml:",inline"`
	BucketStore BucketStoreConfig `` /* 141-byte string literal not displayed */
	TSDB        TSDBConfig        `yaml:"tsdb"`
}

BlocksStorageConfig holds the config information for the blocks storage.

func (*BlocksStorageConfig) RegisterFlags

func (cfg *BlocksStorageConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the TSDB flags

func (*BlocksStorageConfig) Validate

func (cfg *BlocksStorageConfig) Validate(activeSeriesCfg activeseries.Config) error

Validate the config.

type BucketIndexConfig

type BucketIndexConfig struct {
	UpdateOnErrorInterval time.Duration `yaml:"update_on_error_interval" category:"advanced"`
	IdleTimeout           time.Duration `yaml:"idle_timeout" category:"advanced"`
	MaxStalePeriod        time.Duration `yaml:"max_stale_period" category:"advanced"`
}

func (*BucketIndexConfig) RegisterFlagsWithPrefix

func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*BucketIndexConfig) Validate

func (cfg *BucketIndexConfig) Validate() error

Validate the config.

type BucketStoreConfig

type BucketStoreConfig struct {
	SyncDir                                string              `yaml:"sync_dir"`
	SyncInterval                           time.Duration       `yaml:"sync_interval" category:"advanced"`
	MaxConcurrent                          int                 `yaml:"max_concurrent" category:"advanced"`
	MaxConcurrentQueueTimeout              time.Duration       `yaml:"max_concurrent_queue_timeout" category:"advanced"`
	TenantSyncConcurrency                  int                 `yaml:"tenant_sync_concurrency" category:"advanced"`
	BlockSyncConcurrency                   int                 `yaml:"block_sync_concurrency" category:"advanced"`
	MetaSyncConcurrency                    int                 `yaml:"meta_sync_concurrency" category:"advanced"`
	IndexCache                             IndexCacheConfig    `yaml:"index_cache"`
	ChunksCache                            ChunksCacheConfig   `yaml:"chunks_cache"`
	MetadataCache                          MetadataCacheConfig `yaml:"metadata_cache"`
	IgnoreDeletionMarksInStoreGatewayDelay time.Duration       `yaml:"ignore_deletion_mark_delay" category:"advanced"`
	IgnoreDeletionMarksWhileQueryingDelay  time.Duration       `yaml:"ignore_deletion_mark_while_querying_delay" category:"experimental"`
	BucketIndex                            BucketIndexConfig   `yaml:"bucket_index"`
	IgnoreBlocksWithin                     time.Duration       `yaml:"ignore_blocks_within" category:"advanced"`

	// Series hash cache.
	SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"`

	// Controls the partitioner, used to aggregate multiple GET object API requests.
	PartitionerMaxGapBytes uint64 `yaml:"partitioner_max_gap_bytes" category:"advanced"`

	// Controls what is the ratio of postings offsets store will hold in memory.
	// Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings.
	// It's meant for setups that want low baseline memory pressure and where less traffic is expected.
	// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
	// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
	PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

	// Controls advanced options for index-header file reading.
	IndexHeader indexheader.Config `yaml:"index_header" category:"advanced"`

	StreamingBatchSize    int     `yaml:"streaming_series_batch_size" category:"advanced"`
	SeriesFetchPreference float64 `yaml:"series_fetch_preference" category:"advanced"`
}

BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.

func (*BucketStoreConfig) RegisterFlags

func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the BucketStore flags

func (*BucketStoreConfig) Validate

func (cfg *BucketStoreConfig) Validate() error

Validate the config.

type ChunksCacheConfig

type ChunksCacheConfig struct {
	cache.BackendConfig `yaml:",inline"`

	MaxGetRangeRequests        int           `yaml:"max_get_range_requests" category:"advanced"`
	AttributesTTL              time.Duration `yaml:"attributes_ttl" category:"advanced"`
	AttributesInMemoryMaxItems int           `yaml:"attributes_in_memory_max_items" category:"advanced"`
	SubrangeTTL                time.Duration `yaml:"subrange_ttl" category:"advanced"`
}

func (*ChunksCacheConfig) RegisterFlagsWithPrefix

func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*ChunksCacheConfig) Validate

func (cfg *ChunksCacheConfig) Validate() error

type DurationList

type DurationList []time.Duration

DurationList is the block ranges for a tsdb

func (*DurationList) Set

func (d *DurationList) Set(s string) error

Set implements the flag.Value interface

func (*DurationList) String

func (d *DurationList) String() string

String implements the flag.Value interface

func (*DurationList) ToMilliseconds

func (d *DurationList) ToMilliseconds() []int64

ToMilliseconds returns the duration list in milliseconds

type InMemoryIndexCacheConfig

type InMemoryIndexCacheConfig struct {
	MaxSizeBytes uint64 `yaml:"max_size_bytes"`
}

func (*InMemoryIndexCacheConfig) RegisterFlagsWithPrefix

func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type IndexCacheConfig

type IndexCacheConfig struct {
	cache.BackendConfig `yaml:",inline"`
	InMemory            InMemoryIndexCacheConfig `yaml:"inmemory"`
}

func (*IndexCacheConfig) RegisterFlags

func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet)

func (*IndexCacheConfig) RegisterFlagsWithPrefix

func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*IndexCacheConfig) Validate

func (cfg *IndexCacheConfig) Validate() error

Validate the config.

type MetadataCacheConfig

type MetadataCacheConfig struct {
	cache.BackendConfig `yaml:",inline"`

	TenantsListTTL          time.Duration `yaml:"tenants_list_ttl" category:"advanced"`
	TenantBlocksListTTL     time.Duration `yaml:"tenant_blocks_list_ttl" category:"advanced"`
	ChunksListTTL           time.Duration `yaml:"chunks_list_ttl" category:"advanced"`
	MetafileExistsTTL       time.Duration `yaml:"metafile_exists_ttl" category:"advanced"`
	MetafileDoesntExistTTL  time.Duration `yaml:"metafile_doesnt_exist_ttl" category:"advanced"`
	MetafileContentTTL      time.Duration `yaml:"metafile_content_ttl" category:"advanced"`
	MetafileMaxSize         int           `yaml:"metafile_max_size_bytes" category:"advanced"`
	MetafileAttributesTTL   time.Duration `yaml:"metafile_attributes_ttl" category:"advanced"`
	BlockIndexAttributesTTL time.Duration `yaml:"block_index_attributes_ttl" category:"advanced"`
	BucketIndexContentTTL   time.Duration `yaml:"bucket_index_content_ttl" category:"advanced"`
	BucketIndexMaxSize      int           `yaml:"bucket_index_max_size_bytes" category:"advanced"`
}

func (*MetadataCacheConfig) RegisterFlagsWithPrefix

func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*MetadataCacheConfig) Validate

func (cfg *MetadataCacheConfig) Validate() error

type TSDBConfig

type TSDBConfig struct {
	Dir                                 string        `yaml:"dir"`
	BlockRanges                         DurationList  `yaml:"block_ranges_period" category:"experimental" doc:"hidden"`
	Retention                           time.Duration `yaml:"retention_period"`
	ShipInterval                        time.Duration `yaml:"ship_interval" category:"advanced"`
	ShipConcurrency                     int           `yaml:"ship_concurrency" category:"advanced"`
	HeadCompactionInterval              time.Duration `yaml:"head_compaction_interval" category:"advanced"`
	HeadCompactionConcurrency           int           `yaml:"head_compaction_concurrency" category:"advanced"`
	HeadCompactionIdleTimeout           time.Duration `yaml:"head_compaction_idle_timeout" category:"advanced"`
	HeadChunksWriteBufferSize           int           `yaml:"head_chunks_write_buffer_size_bytes" category:"advanced"`
	HeadChunksEndTimeVariance           float64       `yaml:"head_chunks_end_time_variance" category:"experimental"`
	StripeSize                          int           `yaml:"stripe_size" category:"advanced"`
	WALCompressionEnabled               bool          `yaml:"wal_compression_enabled" category:"advanced"`
	WALSegmentSizeBytes                 int           `yaml:"wal_segment_size_bytes" category:"advanced"`
	WALReplayConcurrency                int           `yaml:"wal_replay_concurrency" category:"advanced"`
	FlushBlocksOnShutdown               bool          `yaml:"flush_blocks_on_shutdown" category:"advanced"`
	CloseIdleTSDBTimeout                time.Duration `yaml:"close_idle_tsdb_timeout" category:"advanced"`
	MemorySnapshotOnShutdown            bool          `yaml:"memory_snapshot_on_shutdown" category:"experimental"`
	HeadChunksWriteQueueSize            int           `yaml:"head_chunks_write_queue_size" category:"advanced"`
	BiggerOutOfOrderBlocksForOldSamples bool          `yaml:"bigger_out_of_order_blocks_for_old_samples" category:"experimental"`

	// Series hash cache.
	SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"`

	// If true, user TSDBs are not closed on shutdown. Only for testing.
	// If false (default), user TSDBs are closed to make sure all resources are released and closed properly.
	KeepUserTSDBOpenOnShutdown bool `yaml:"-"`

	// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
	CloseIdleTSDBInterval time.Duration `yaml:"-"`

	// For experimental out of order metrics support.
	OutOfOrderCapacityMax int `yaml:"out_of_order_capacity_max" category:"experimental"`

	// HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head.
	// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
	HeadPostingsForMatchersCacheTTL time.Duration `yaml:"head_postings_for_matchers_cache_ttl" category:"experimental"`

	// HeadPostingsForMatchersCacheMaxItems is the maximum size (in number of items) of cached postings for matchers elements in the Head.
	// It's ignored used when HeadPostingsForMatchersCacheTTL is 0.
	// Deprecated: use max bytes limit instead.
	HeadPostingsForMatchersCacheMaxItems int `yaml:"head_postings_for_matchers_cache_size" category:"deprecated"`

	// HeadPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in the Head.
	// It's ignored used when HeadPostingsForMatchersCacheTTL is 0.
	HeadPostingsForMatchersCacheMaxBytes int64 `yaml:"head_postings_for_matchers_cache_max_bytes" category:"experimental"`

	// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
	HeadPostingsForMatchersCacheForce bool `yaml:"head_postings_for_matchers_cache_force" category:"experimental"`

	// BlockPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in each compacted block.
	// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
	BlockPostingsForMatchersCacheTTL time.Duration `yaml:"block_postings_for_matchers_cache_ttl" category:"experimental"`

	// BlockPostingsForMatchersCacheMaxItems is the maximum size of cached postings for matchers elements in each compacted block.
	// It's ignored used when BlockPostingsForMatchersCacheTTL is 0.
	// Deprecated: use max bytes limit instead.
	BlockPostingsForMatchersCacheMaxItems int `yaml:"block_postings_for_matchers_cache_size" category:"deprecated"`

	// BlockPostingsForMatchersCacheMaxBytes is the maximum size (in bytes) of cached postings for matchers elements in each compacted block.
	// It's ignored used when BlockPostingsForMatchersCacheTTL is 0.
	BlockPostingsForMatchersCacheMaxBytes int64 `yaml:"block_postings_for_matchers_cache_max_bytes" category:"experimental"`

	// BlockPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls compacted blocks
	// regardless of the `concurrent` param.
	BlockPostingsForMatchersCacheForce bool `yaml:"block_postings_for_matchers_cache_force" category:"experimental"`

	EarlyHeadCompactionMinInMemorySeries                     int64 `yaml:"early_head_compaction_min_in_memory_series" category:"experimental"`
	EarlyHeadCompactionMinEstimatedSeriesReductionPercentage int   `yaml:"early_head_compaction_min_estimated_series_reduction_percentage" category:"experimental"`

	// HeadCompactionIntervalJitterEnabled is enabled by default, but allows to disable it in tests.
	HeadCompactionIntervalJitterEnabled bool `yaml:"-"`

	// HeadCompactionIntervalWhileStarting setting is hardcoded, but allowed to overwrite it in tests.
	HeadCompactionIntervalWhileStarting time.Duration `yaml:"-"`

	// TimelyHeadCompaction allows head compaction to happen when min block range can no longer be appended,
	// without requiring 1.5x the chunk range worth of data in the head.
	TimelyHeadCompaction bool `yaml:"timely_head_compaction_enabled" category:"experimental"`
}

TSDBConfig holds the config for TSDB opened in the ingesters.

func (*TSDBConfig) BlocksDir

func (cfg *TSDBConfig) BlocksDir(userID string) string

BlocksDir returns the directory path where TSDB blocks and wal should be stored by the ingester

func (*TSDBConfig) IsBlocksShippingEnabled

func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool

IsShippingEnabled returns whether blocks shipping is enabled.

func (*TSDBConfig) RegisterFlags

func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the TSDBConfig flags.

func (*TSDBConfig) Validate

func (cfg *TSDBConfig) Validate(activeSeriesCfg activeseries.Config) error

Validate the config.

func (*TSDBConfig) WALCompressionType

func (cfg *TSDBConfig) WALCompressionType() wlog.CompressionType

type TenantDeletionMark

type TenantDeletionMark struct {
	// Unix timestamp when deletion marker was created.
	DeletionTime util.UnixSeconds `json:"deletion_time"`

	// Unix timestamp when cleanup was finished.
	FinishedTime util.UnixSeconds `json:"finished_time,omitempty"`
}

func NewTenantDeletionMark

func NewTenantDeletionMark(deletionTime time.Time) *TenantDeletionMark

func ReadTenantDeletionMark

func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string, logger log.Logger) (*TenantDeletionMark, error)

ReadTenantDeletionMark returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error.

type UsersScanner

type UsersScanner struct {
	// contains filtered or unexported fields
}

func NewUsersScanner

func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) (bool, error), logger log.Logger) *UsersScanner

func (*UsersScanner) ScanUsers

func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error)

ScanUsers returns a fresh list of users found in the storage, that are not marked for deletion, and list of users marked for deletion.

If sharding is enabled, returned lists contains only the users owned by this instance.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL