Documentation ¶
Overview ¶
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/compactor.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/job.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/grafana/mimir/blob/main/pkg/compactor/split_merge_compactor.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Cortex Authors.
Index ¶
- Constants
- Variables
- func DefaultGroupKey(meta block.Meta) string
- type BlockCompactor
- type BlocksCleaner
- type BlocksCleanerConfig
- type BlocksCompactorFactory
- type BlocksGrouperFactory
- type BlocksPlannerFactory
- type BucketCompactor
- type BucketCompactorMetrics
- type Compactor
- type CompactorMetrics
- type Config
- type ConfigProvider
- type DeduplicateFilter
- type DurationList
- type Grouper
- type Job
- func (job *Job) AppendMeta(meta *block.Meta) error
- func (job *Job) IDs() (ids []ulid.ULID)
- func (job *Job) Key() string
- func (job *Job) Labels() labels.Labels
- func (job *Job) MaxTime() int64
- func (job *Job) Metas() []*block.Meta
- func (job *Job) MinCompactionLevel() int
- func (job *Job) MinTime() int64
- func (job *Job) Resolution() int64
- func (job *Job) ShardingKey() string
- func (job *Job) SplitStageSize() uint32
- func (job *Job) SplittingShards() uint32
- func (job *Job) String() string
- func (job *Job) UseSplitting() bool
- func (job *Job) UserID() string
- type JobsOrderFunc
- type LabelRemoverFilter
- type MultitenantCompactor
- type NoCompactionMarkFilter
- type Planner
- type RingConfig
- type ShardAwareDeduplicateFilter
- type SplitAndMergeGrouper
- type SplitAndMergePlanner
- type Syncer
Constants ¶
const ( CompactionSplitByFingerprint = "fingerprint" CompactionSplitByStacktracePartition = "stacktracePartition" )
const ( CompactionOrderOldestFirst = "smallest-range-oldest-blocks-first" CompactionOrderNewestFirst = "newest-blocks-first" )
Variables ¶
var CompactionOrders = []string{CompactionOrderOldestFirst, CompactionOrderNewestFirst}
var CompactionSplitBys = []string{CompactionSplitByFingerprint, CompactionSplitByStacktracePartition}
var (
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
)
Functions ¶
func DefaultGroupKey ¶
DefaultGroupKey returns a unique identifier for the group the block belongs to, based on the DefaultGrouper logic. It considers the downsampling resolution and the block's labels.
Types ¶
type BlockCompactor ¶
type BlockCompactor struct {
// contains filtered or unexported fields
}
type BlocksCleaner ¶
func NewBlocksCleaner ¶
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner
type BlocksCleanerConfig ¶
type BlocksCompactorFactory ¶
type BlocksCompactorFactory func( ctx context.Context, cfg Config, cfgProvider ConfigProvider, userID string, logger log.Logger, metrics *CompactorMetrics, ) (Compactor, error)
BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
type BlocksGrouperFactory ¶
type BlocksGrouperFactory func( ctx context.Context, cfg Config, cfgProvider ConfigProvider, userID string, logger log.Logger, reg prometheus.Registerer, ) Grouper
BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
type BlocksPlannerFactory ¶ added in v1.3.0
BlocksPlannerFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
type BucketCompactor ¶
type BucketCompactor struct {
// contains filtered or unexported fields
}
BucketCompactor compacts blocks in a bucket.
func NewBucketCompactor ¶
func NewBucketCompactor( logger log.Logger, sy *Syncer, grouper Grouper, planner Planner, comp Compactor, compactDir string, bkt objstore.Bucket, concurrency int, ownJob ownCompactionJobFunc, sortJobs JobsOrderFunc, waitPeriod time.Duration, blockSyncConcurrency int, metrics *BucketCompactorMetrics, ) (*BucketCompactor, error)
NewBucketCompactor creates a new bucket compactor.
type BucketCompactorMetrics ¶
type BucketCompactorMetrics struct {
// contains filtered or unexported fields
}
BucketCompactorMetrics holds the metrics tracked by BucketCompactor.
func NewBucketCompactorMetrics ¶
func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg prometheus.Registerer) *BucketCompactorMetrics
NewBucketCompactorMetrics makes a new BucketCompactorMetrics.
type Compactor ¶
type Compactor interface { // CompactWithSplitting merges and splits the source blocks into shardCount number of compacted blocks, // and returns slice of block IDs. // If given compacted block has no series, corresponding block ID will not be returned. CompactWithSplitting(ctx context.Context, dst string, dirs []string, shardCount, stageSize uint64) (result []ulid.ULID, _ error) }
Compactor provides compaction against an underlying storage of profiling data.
type CompactorMetrics ¶
type CompactorMetrics struct { Ran *prometheus.CounterVec InProgress *prometheus.GaugeVec OverlappingBlocks prometheus.Counter Duration *prometheus.HistogramVec Size *prometheus.HistogramVec Samples *prometheus.HistogramVec Range *prometheus.HistogramVec Split *prometheus.HistogramVec }
type Config ¶
type Config struct { BlockRanges DurationList `yaml:"block_ranges" category:"advanced"` BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` DataDir string `yaml:"data_dir"` CompactionInterval time.Duration `yaml:"compaction_interval" category:"advanced"` CompactionRetries int `yaml:"compaction_retries" category:"advanced"` CompactionConcurrency int `yaml:"compaction_concurrency" category:"advanced"` CompactionWaitPeriod time.Duration `yaml:"first_level_compaction_wait_period"` CleanupInterval time.Duration `yaml:"cleanup_interval" category:"advanced"` CleanupConcurrency int `yaml:"cleanup_concurrency" category:"advanced"` DeletionDelay time.Duration `yaml:"deletion_delay" category:"advanced"` TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay" category:"advanced"` MaxCompactionTime time.Duration `yaml:"max_compaction_time" category:"advanced"` NoBlocksFileCleanupEnabled bool `yaml:"no_blocks_file_cleanup_enabled" category:"experimental"` DownsamplerEnabled bool `yaml:"downsampler_enabled" category:"advanced"` // Compactor concurrency options MaxOpeningBlocksConcurrency int `yaml:"max_opening_blocks_concurrency" category:"advanced"` // Number of goroutines opening blocks before compaction. EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"` // Compactors sharding. ShardingRing RingConfig `yaml:"sharding_ring"` CompactionJobsOrder string `yaml:"compaction_jobs_order" category:"advanced"` CompactionSplitBy string `yaml:"compaction_split_by" category:"advanced"` // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` BlocksPlannerFactory BlocksPlannerFactory `yaml:"-"` // contains filtered or unexported fields }
Config holds the MultitenantCompactor config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the MultitenantCompactor flags.
type ConfigProvider ¶
type ConfigProvider interface { objstore.TenantConfigProvider // CompactorBlocksRetentionPeriod returns the retention period for a given user. CompactorBlocksRetentionPeriod(user string) time.Duration // CompactorSplitAndMergeShards returns the number of shards to use when splitting blocks. CompactorSplitAndMergeShards(userID string) int // CompactorSplitAndMergeStageSize returns the number of stages split shards will be written to. CompactorSplitAndMergeStageSize(userID string) int // CompactorSplitGroups returns the number of groups that blocks used for splitting should // be grouped into. Different groups are then split by different jobs. CompactorSplitGroups(userID string) int // CompactorTenantShardSize returns number of compactors that this user can use. 0 = all compactors. CompactorTenantShardSize(userID string) int // CompactorPartialBlockDeletionDelay returns the partial block delay time period for a given user, // and whether the configured value was valid. If the value wasn't valid, the returned delay is the default one // and the caller is responsible to warn the Mimir operator about it. CompactorPartialBlockDeletionDelay(userID string) (delay time.Duration, valid bool) // CompactorDownsamplerEnabled returns true if the downsampler is enabled for a given user. CompactorDownsamplerEnabled(userId string) bool }
ConfigProvider defines the per-tenant config provider for the MultitenantCompactor.
type DeduplicateFilter ¶
type DeduplicateFilter interface { block.MetadataFilter // DuplicateIDs returns IDs of duplicate blocks generated by last call to Filter method. DuplicateIDs() []ulid.ULID }
type DurationList ¶
DurationList is the block ranges for a tsdb
func (*DurationList) Set ¶
func (d *DurationList) Set(s string) error
Set implements the flag.Value interface
func (*DurationList) String ¶
func (d *DurationList) String() string
String implements the flag.Value interface
func (*DurationList) ToMilliseconds ¶
func (d *DurationList) ToMilliseconds() []int64
ToMilliseconds returns the duration list in milliseconds
type Grouper ¶
type Grouper interface { // Groups returns the compaction jobs for all blocks currently known to the syncer. // It creates all jobs from the scratch on every call. Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error) }
Grouper is responsible to group all known blocks into compaction Job which are safe to be compacted concurrently.
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job holds a compaction job, which consists of a group of blocks that should be compacted together. Not goroutine safe.
func NewJob ¶
func NewJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards, splitStageSize uint32, shardingKey string) *Job
NewJob returns a new compaction Job.
func (*Job) AppendMeta ¶
AppendMeta the block with the given meta to the job.
func (*Job) Metas ¶
Metas returns the metadata for each block that is part of this job, ordered by the block's MinTime
func (*Job) MinCompactionLevel ¶
MinCompactionLevel returns the minimum compaction level across all source blocks in this job.
func (*Job) Resolution ¶
Resolution returns the common downsampling resolution of blocks in the job.
func (*Job) ShardingKey ¶
ShardingKey returns the key used to shard this job across multiple instances.
func (*Job) SplitStageSize ¶
SplitStageSize returns the number of stages split shards will be written to.
func (*Job) SplittingShards ¶
SplittingShards returns the number of output shards to build if splitting is enabled.
func (*Job) UseSplitting ¶
UseSplitting returns whether blocks should be split into multiple shards when compacted.
type JobsOrderFunc ¶
func GetJobsOrderFunction ¶
func GetJobsOrderFunction(name string) JobsOrderFunc
GetJobsOrderFunction returns jobs ordering function, or nil, if name doesn't refer to any function.
type LabelRemoverFilter ¶
type LabelRemoverFilter struct {
// contains filtered or unexported fields
}
func NewLabelRemoverFilter ¶
func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter
NewLabelRemoverFilter creates a LabelRemoverFilter.
type MultitenantCompactor ¶
MultitenantCompactor is a multi-tenant TSDB blocks compactor based on Thanos.
func NewMultitenantCompactor ¶
func NewMultitenantCompactor(compactorCfg Config, bucketClient objstore.Bucket, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*MultitenantCompactor, error)
NewMultitenantCompactor makes a new MultitenantCompactor.
func (*MultitenantCompactor) RingHandler ¶
func (c *MultitenantCompactor) RingHandler(w http.ResponseWriter, req *http.Request)
type NoCompactionMarkFilter ¶
type NoCompactionMarkFilter struct {
// contains filtered or unexported fields
}
NoCompactionMarkFilter is a block.Fetcher filter that finds all blocks with no-compact marker files, and optionally removes them from synced metas.
func NewNoCompactionMarkFilter ¶
func NewNoCompactionMarkFilter(bkt objstore.BucketReader, removeNoCompactBlocks bool) *NoCompactionMarkFilter
NewNoCompactionMarkFilter creates NoCompactionMarkFilter.
func (*NoCompactionMarkFilter) Filter ¶
func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true, blocks are also removed from metas. (Thanos version of the filter doesn't do removal).
func (*NoCompactionMarkFilter) NoCompactMarkedBlocks ¶
func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{}
NoCompactMarkedBlocks returns block ids that were marked for no compaction. It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.
type Planner ¶
type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. Plan(ctx context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error) }
Planner returns blocks to compact.
type RingConfig ¶
type RingConfig struct { Common util.CommonRingConfig `yaml:",inline"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"` WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout" category:"advanced"` ObservePeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToBasicLifecyclerConfig ¶
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
type ShardAwareDeduplicateFilter ¶
type ShardAwareDeduplicateFilter struct {
// contains filtered or unexported fields
}
ShardAwareDeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. Not go-routine safe.
func NewShardAwareDeduplicateFilter ¶
func NewShardAwareDeduplicateFilter() *ShardAwareDeduplicateFilter
NewShardAwareDeduplicateFilter creates ShardAwareDeduplicateFilter.
func (*ShardAwareDeduplicateFilter) DuplicateIDs ¶
func (f *ShardAwareDeduplicateFilter) DuplicateIDs() []ulid.ULID
DuplicateIDs returns slice of block ids that are filtered out by ShardAwareDeduplicateFilter.
func (*ShardAwareDeduplicateFilter) Filter ¶
func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
Filter filters out from metas, the initial map of blocks, all the blocks that are contained in other, compacted, blocks. The removed blocks are source blocks of the blocks that remain in metas after the filtering is executed.
type SplitAndMergeGrouper ¶
type SplitAndMergeGrouper struct {
// contains filtered or unexported fields
}
func NewSplitAndMergeGrouper ¶
func NewSplitAndMergeGrouper( userID string, ranges []int64, shardCount uint32, splitStageSize uint32, splitGroupsCount uint32, logger log.Logger, ) *SplitAndMergeGrouper
NewSplitAndMergeGrouper makes a new SplitAndMergeGrouper. The provided ranges must be sorted. If shardCount is 0, the splitting stage is disabled.
type SplitAndMergePlanner ¶
type SplitAndMergePlanner struct {
// contains filtered or unexported fields
}
func NewSplitAndMergePlanner ¶
func NewSplitAndMergePlanner(ranges []int64) *SplitAndMergePlanner
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer synchronizes block metas from a bucket into a local directory. It sorts them into compaction groups based on equal label sets.
func NewMetaSyncer ¶
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error)
NewMetaSyncer returns a new Syncer for the given Bucket and directory. Blocks must be at least as old as the sync delay for being considered.
func (*Syncer) GarbageCollect ¶
GarbageCollect marks blocks for deletion from bucket if their data is available as part of a block with a higher compaction level. Call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.