compactor

package
v0.0.0-...-fe1d711 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: AGPL-3.0 Imports: 58 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CompactionOrderOldestFirst = "smallest-range-oldest-blocks-first"
	CompactionOrderNewestFirst = "newest-blocks-first"
)

Variables

Functions

func ConvertBucketIndexToMetasForCompactionJobPlanning

func ConvertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta

Convert index into map of block Metas, but ignore blocks marked for deletion.

Types

type BlocksCleaner

type BlocksCleaner struct {
	services.Service
	// contains filtered or unexported fields
}

func NewBlocksCleaner

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner

type BlocksCleanerConfig

type BlocksCleanerConfig struct {
	DeletionDelay                 time.Duration
	CleanupInterval               time.Duration
	CleanupConcurrency            int
	TenantCleanupDelay            time.Duration // Delay before removing tenant deletion mark and "debug".
	DeleteBlocksConcurrency       int
	GetDeletionMarkersConcurrency int
	NoBlocksFileCleanupEnabled    bool
	CompactionBlockRanges         mimir_tsdb.DurationList // Used for estimating compaction jobs.
}

type BlocksCompactorFactory

type BlocksCompactorFactory func(
	ctx context.Context,
	cfg Config,
	logger log.Logger,
	reg prometheus.Registerer,
) (Compactor, Planner, error)

BlocksCompactorFactory builds and returns the compactor and planner for compacting a tenant's blocks.

type BlocksGrouperFactory

type BlocksGrouperFactory func(
	ctx context.Context,
	cfg Config,
	cfgProvider ConfigProvider,
	userID string,
	logger log.Logger,
	reg prometheus.Registerer,
) Grouper

BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.

type BucketCompactor

type BucketCompactor struct {
	// contains filtered or unexported fields
}

BucketCompactor compacts blocks in a bucket.

func NewBucketCompactor

func NewBucketCompactor(
	logger log.Logger,
	sy *metaSyncer,
	grouper Grouper,
	planner Planner,
	comp Compactor,
	compactDir string,
	bkt objstore.Bucket,
	concurrency int,
	skipUnhealthyBlocks bool,
	ownJob ownCompactionJobFunc,
	sortJobs JobsOrderFunc,
	waitPeriod time.Duration,
	blockSyncConcurrency int,
	metrics *BucketCompactorMetrics,
) (*BucketCompactor, error)

NewBucketCompactor creates a new bucket compactor.

func (*BucketCompactor) Compact

func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Duration) (rerr error)

Compact runs compaction over bucket. If maxCompactionTime is positive then after this time no more new compactions are started.

type BucketCompactorMetrics

type BucketCompactorMetrics struct {
	// contains filtered or unexported fields
}

BucketCompactorMetrics holds the metrics tracked by BucketCompactor.

func NewBucketCompactorMetrics

func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg prometheus.Registerer) *BucketCompactorMetrics

NewBucketCompactorMetrics makes a new BucketCompactorMetrics.

type Compactor

type Compactor interface {
	// Write persists one or more Blocks into a directory.
	// No Block is written when resulting Block has 0 samples and returns an empty slice.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	Write(dest string, b tsdb.BlockReader, mint, maxt int64, parent *tsdb.BlockMeta) ([]ulid.ULID, error)

	// Compact runs compaction against the provided directories. Must
	// only be called concurrently with results of Plan().
	// Can optionally pass a list of already open blocks,
	// to avoid having to reopen them.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	// When one resulting Block has 0 samples
	//  * No block is written.
	//  * The source dirs are marked Deletable.
	//  * Block is not included in the result.
	Compact(dest string, dirs []string, open []*tsdb.Block) ([]ulid.ULID, error)

	// CompactWithSplitting merges and splits the source blocks into shardCount number of compacted blocks,
	// and returns a slice of block IDs. The position of the returned block ID in the result slice corresponds to the shard index.
	// If the given compacted block has no series, the corresponding block ID will be the zero ULID value.
	CompactWithSplitting(dest string, dirs []string, open []*tsdb.Block, shardCount uint64) (result []ulid.ULID, _ error)
}

Compactor provides compaction against an underlying storage of time series data. This is similar to tsdb.Compactor just without the Plan method. TODO(bwplotka): Split the Planner from Compactor on upstream as well, so we can import it.

type Config

type Config struct {
	BlockRanges                mimir_tsdb.DurationList `yaml:"block_ranges" category:"advanced"`
	BlockSyncConcurrency       int                     `yaml:"block_sync_concurrency" category:"advanced"`
	MetaSyncConcurrency        int                     `yaml:"meta_sync_concurrency" category:"advanced"`
	DataDir                    string                  `yaml:"data_dir"`
	CompactionInterval         time.Duration           `yaml:"compaction_interval" category:"advanced"`
	CompactionRetries          int                     `yaml:"compaction_retries" category:"advanced"`
	CompactionConcurrency      int                     `yaml:"compaction_concurrency" category:"advanced"`
	CompactionWaitPeriod       time.Duration           `yaml:"first_level_compaction_wait_period"`
	CleanupInterval            time.Duration           `yaml:"cleanup_interval" category:"advanced"`
	CleanupConcurrency         int                     `yaml:"cleanup_concurrency" category:"advanced"`
	DeletionDelay              time.Duration           `yaml:"deletion_delay" category:"advanced"`
	TenantCleanupDelay         time.Duration           `yaml:"tenant_cleanup_delay" category:"advanced"`
	MaxCompactionTime          time.Duration           `yaml:"max_compaction_time" category:"advanced"`
	NoBlocksFileCleanupEnabled bool                    `yaml:"no_blocks_file_cleanup_enabled" category:"experimental"`

	// Compactor concurrency options
	MaxOpeningBlocksConcurrency         int `yaml:"max_opening_blocks_concurrency" category:"advanced"`          // Number of goroutines opening blocks before compaction.
	MaxClosingBlocksConcurrency         int `yaml:"max_closing_blocks_concurrency" category:"advanced"`          // Max number of blocks that can be closed concurrently during split compaction. Note that closing of newly compacted block uses a lot of memory for writing index.
	SymbolsFlushersConcurrency          int `yaml:"symbols_flushers_concurrency" category:"advanced"`            // Number of symbols flushers used when doing split compaction.
	MaxBlockUploadValidationConcurrency int `yaml:"max_block_upload_validation_concurrency" category:"advanced"` // Max number of uploaded blocks that can be validated concurrently.

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`

	// Compactors sharding.
	ShardingRing RingConfig `yaml:"sharding_ring"`

	CompactionJobsOrder string `yaml:"compaction_jobs_order" category:"advanced"`

	// Allow downstream projects to customise the blocks compactor.
	BlocksGrouperFactory   BlocksGrouperFactory   `yaml:"-"`
	BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`
	// contains filtered or unexported fields
}

Config holds the MultitenantCompactor config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the MultitenantCompactor flags.

func (*Config) Validate

func (cfg *Config) Validate(logger log.Logger) error

type ConfigProvider

type ConfigProvider interface {
	bucket.TenantConfigProvider

	// CompactorBlocksRetentionPeriod returns the retention period for a given user.
	CompactorBlocksRetentionPeriod(user string) time.Duration

	// CompactorSplitAndMergeShards returns the number of shards to use when splitting blocks.
	CompactorSplitAndMergeShards(userID string) int

	// CompactorSplitGroups returns the number of groups that blocks used for splitting should
	// be grouped into. Different groups are then split by different jobs.
	CompactorSplitGroups(userID string) int

	// CompactorTenantShardSize returns the number of compactors that this user can use. 0 = all compactors.
	CompactorTenantShardSize(userID string) int

	// CompactorPartialBlockDeletionDelay returns the partial block delay time period for a given user,
	// and whether the configured value is valid. If the value isn't valid, the returned delay is the default one
	// and the caller is responsible for warning the Mimir operator about it.
	CompactorPartialBlockDeletionDelay(userID string) (delay time.Duration, valid bool)

	// CompactorBlockUploadEnabled returns whether block upload is enabled for a given tenant.
	CompactorBlockUploadEnabled(tenantID string) bool

	// CompactorBlockUploadValidationEnabled returns whether block upload validation is enabled for a given tenant.
	CompactorBlockUploadValidationEnabled(tenantID string) bool

	// CompactorBlockUploadVerifyChunks returns whether chunk verification is enabled for a given tenant.
	CompactorBlockUploadVerifyChunks(tenantID string) bool

	// CompactorBlockUploadMaxBlockSizeBytes returns the maximum size in bytes of a block that is allowed to be uploaded or validated for a given user.
	CompactorBlockUploadMaxBlockSizeBytes(userID string) int64

	// CompactorInMemoryTenantMetaCacheSize returns number of parsed *Meta objects that we can keep in memory for the user between compactions.
	CompactorInMemoryTenantMetaCacheSize(userID string) int
}

ConfigProvider defines the per-tenant config provider for the MultitenantCompactor.

type CriticalError

type CriticalError struct {
	// contains filtered or unexported fields
}

CriticalError is a type wrapper for block health critical errors.

func IsCriticalError

func IsCriticalError(err error) (bool, CriticalError)

IsCriticalError returns true if the base error is a CriticalError.

func (CriticalError) Error

func (e CriticalError) Error() string

type DeleteTenantStatusResponse

type DeleteTenantStatusResponse struct {
	TenantID      string `json:"tenant_id"`
	BlocksDeleted bool   `json:"blocks_deleted"`
}

type Grouper

type Grouper interface {
	// Groups returns the compaction jobs for all blocks currently known to the syncer.
	// It creates all jobs from scratch on every call.
	Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)
}

Grouper is responsible for grouping all known blocks into concurrency safe compaction Jobs.

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job holds a compaction job, which consists of a group of blocks that should be compacted together. Not goroutine safe.

func (*Job) AppendMeta

func (job *Job) AppendMeta(meta *block.Meta) error

AppendMeta appends the block with the given meta to the job.

func (*Job) IDs

func (job *Job) IDs() (ids []ulid.ULID)

IDs returns all sorted IDs of blocks in the job.

func (*Job) Key

func (job *Job) Key() string

Key returns an identifier for the job.

func (*Job) Labels

func (job *Job) Labels() labels.Labels

Labels returns the external labels for the output block(s) of this job.

func (*Job) MaxTime

func (job *Job) MaxTime() int64

MaxTime returns the max time across all job's blocks.

func (*Job) Metas

func (job *Job) Metas() []*block.Meta

Metas returns the metadata for each block that is part of this job, ordered by the block's MinTime

func (*Job) MinCompactionLevel

func (job *Job) MinCompactionLevel() int

MinCompactionLevel returns the minimum compaction level across all source blocks in this job.

func (*Job) MinTime

func (job *Job) MinTime() int64

MinTime returns the min time across all job's blocks.

func (*Job) Resolution

func (job *Job) Resolution() int64

Resolution returns the common downsampling resolution of blocks in the job.

func (*Job) ShardingKey

func (job *Job) ShardingKey() string

ShardingKey returns the key used to shard this job across multiple instances.

func (*Job) SplittingShards

func (job *Job) SplittingShards() uint32

SplittingShards returns the number of output shards to build if splitting is enabled.

func (*Job) String

func (job *Job) String() string

func (*Job) UseSplitting

func (job *Job) UseSplitting() bool

UseSplitting returns whether blocks should be split into multiple shards when compacted.

func (*Job) UserID

func (job *Job) UserID() string

UserID returns the user/tenant to which this job belongs to.

type JobsOrderFunc

type JobsOrderFunc func(jobs []*Job) []*Job

func GetJobsOrderFunction

func GetJobsOrderFunction(name string) JobsOrderFunc

GetJobsOrderFunction returns a jobs ordering function, or nil, if name doesn't refer to any function.

type LabelRemoverFilter

type LabelRemoverFilter struct {
	// contains filtered or unexported fields
}

func NewLabelRemoverFilter

func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter

NewLabelRemoverFilter creates a LabelRemoverFilter.

func (*LabelRemoverFilter) Filter

func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*block.Meta, _ block.GaugeVec) error

Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.

type MultitenantCompactor

type MultitenantCompactor struct {
	services.Service
	// contains filtered or unexported fields
}

MultitenantCompactor is a multi-tenant TSDB block compactor based on Thanos.

func NewMultitenantCompactor

func NewMultitenantCompactor(compactorCfg Config, storageCfg mimir_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*MultitenantCompactor, error)

NewMultitenantCompactor makes a new MultitenantCompactor.

func (*MultitenantCompactor) DeleteTenant

func (c *MultitenantCompactor) DeleteTenant(w http.ResponseWriter, r *http.Request)

func (*MultitenantCompactor) DeleteTenantStatus

func (c *MultitenantCompactor) DeleteTenantStatus(w http.ResponseWriter, r *http.Request)

func (*MultitenantCompactor) FinishBlockUpload

func (c *MultitenantCompactor) FinishBlockUpload(w http.ResponseWriter, r *http.Request)

FinishBlockUpload handles request for finishing block upload.

Finishing block upload performs block validation, and if all checks pass, marks block as finished by uploading meta.json file.

func (*MultitenantCompactor) GetBlockUploadStateHandler

func (c *MultitenantCompactor) GetBlockUploadStateHandler(w http.ResponseWriter, r *http.Request)

func (*MultitenantCompactor) PlannedJobsHandler

func (c *MultitenantCompactor) PlannedJobsHandler(w http.ResponseWriter, req *http.Request)

func (*MultitenantCompactor) RingHandler

func (c *MultitenantCompactor) RingHandler(w http.ResponseWriter, req *http.Request)

func (*MultitenantCompactor) StartBlockUpload

func (c *MultitenantCompactor) StartBlockUpload(w http.ResponseWriter, r *http.Request)

StartBlockUpload handles request for starting block upload.

Starting the uploading of a block means to upload a meta file and verify that the upload can go ahead. In practice this means to check that the (complete) block isn't already in block storage, and that the meta file is valid.

func (*MultitenantCompactor) TenantsHandler

func (c *MultitenantCompactor) TenantsHandler(w http.ResponseWriter, req *http.Request)

func (*MultitenantCompactor) UploadBlockFile

func (c *MultitenantCompactor) UploadBlockFile(w http.ResponseWriter, r *http.Request)

UploadBlockFile handles requests for uploading block files. It takes the mandatory query parameter "path", specifying the file's destination path.

type NoCompactionMarkFilter

type NoCompactionMarkFilter struct {
	// contains filtered or unexported fields
}

NoCompactionMarkFilter is a block.Fetcher filter that finds all blocks with no-compact marker files, and optionally removes them from synced metas.

func NewNoCompactionMarkFilter

func NewNoCompactionMarkFilter(bkt objstore.InstrumentedBucketReader) *NoCompactionMarkFilter

NewNoCompactionMarkFilter creates NoCompactionMarkFilter.

func (*NoCompactionMarkFilter) Filter

func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true, blocks are also removed from metas. (Thanos version of the filter doesn't do removal).

func (*NoCompactionMarkFilter) NoCompactMarkedBlocks

func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{}

NoCompactMarkedBlocks returns block ids that were marked for no compaction. It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.

type OutOfOrderChunksError

type OutOfOrderChunksError struct {
	// contains filtered or unexported fields
}

OutOfOrderChunksError is a type wrapper for OOO chunk error from validating block index.

func IsOutOfOrderChunkError

func IsOutOfOrderChunkError(err error) (bool, OutOfOrderChunksError)

IsOutOfOrderChunkError returns true if the base error is a OutOfOrderChunksError.

func (OutOfOrderChunksError) Error

func (e OutOfOrderChunksError) Error() string

type Planner

type Planner interface {
	// Plan returns a list of blocks that should be compacted into single one.
	// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
	Plan(ctx context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error)
}

Planner returns blocks to compact.

type RingConfig

type RingConfig struct {
	Common util.CommonRingConfig `yaml:",inline"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`

	WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout" category:"advanced"`

	ObservePeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

type ShardAwareDeduplicateFilter

type ShardAwareDeduplicateFilter struct {
	// contains filtered or unexported fields
}

ShardAwareDeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. Not go-routine safe.

func NewShardAwareDeduplicateFilter

func NewShardAwareDeduplicateFilter() *ShardAwareDeduplicateFilter

NewShardAwareDeduplicateFilter creates a ShardAwareDeduplicateFilter.

func (*ShardAwareDeduplicateFilter) DuplicateIDs

func (f *ShardAwareDeduplicateFilter) DuplicateIDs() []ulid.ULID

DuplicateIDs returns slice of block ids that are filtered out by ShardAwareDeduplicateFilter.

func (*ShardAwareDeduplicateFilter) Filter

func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter filters out from metas, the initial map of blocks, all the blocks that are contained in other, compacted, blocks. The removed blocks are source blocks of the blocks that remain in metas after the filtering is executed.

type SplitAndMergeGrouper

type SplitAndMergeGrouper struct {
	// contains filtered or unexported fields
}

func NewSplitAndMergeGrouper

func NewSplitAndMergeGrouper(
	userID string,
	ranges []int64,
	shardCount uint32,
	splitGroupsCount uint32,
	logger log.Logger,
) *SplitAndMergeGrouper

NewSplitAndMergeGrouper makes a new SplitAndMergeGrouper. The provided ranges must be sorted. If shardCount is 0, the splitting stage is disabled.

func (*SplitAndMergeGrouper) Groups

func (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)

type SplitAndMergePlanner

type SplitAndMergePlanner struct {
	// contains filtered or unexported fields
}

func NewSplitAndMergePlanner

func NewSplitAndMergePlanner(ranges []int64) *SplitAndMergePlanner

func (*SplitAndMergePlanner) Plan

func (c *SplitAndMergePlanner) Plan(_ context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error)

Plan implements compact.Planner.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL