Documentation ¶
Index ¶
- Constants
- Variables
- func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, ...) error
- type BlockVisitMarker
- type BlocksCleaner
- type BlocksCleanerConfig
- type BlocksCompactorFactory
- type BlocksGrouperFactory
- type Compactor
- type Config
- type ConfigProvider
- type LabelRemoverFilter
- type Limits
- type PlannerFactory
- type RingConfig
- type ShuffleShardingGrouper
- type ShuffleShardingPlanner
Constants ¶
const ( // BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit. BlockVisitMarkerFile = "visit-mark.json" // VisitMarkerVersion1 is the current supported version of visit-mark file. VisitMarkerVersion1 = 1 )
Variables ¶
var ( ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found") ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON") )
var ( RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, false, true, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMarkedForNoCompaction, metadata.NoneFunc, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency) } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, bkt, false, true, reg, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks, remainingPlannedCompactions, metadata.NoneFunc, cfg, ring, ringLifecycle.Addr, ringLifecycle.ID, limits, userID, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency, cfg.CompactionConcurrency, cfg.BlockVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, noCompactionMarkFilter.NoCompactMarkedBlocks) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } return compactor, plannerFactory, nil } ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } return compactor, plannerFactory, nil } )
Functions ¶
Types ¶
type BlockVisitMarker ¶ added in v1.14.0
type BlockVisitMarker struct { CompactorID string `json:"compactorID"` // VisitTime is a unix timestamp of when the block was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` }
func ReadBlockVisitMarker ¶ added in v1.14.0
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)
type BlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner
type BlocksCleanerConfig ¶ added in v1.2.0
type BlocksCleanerConfig struct { DeletionDelay time.Duration CleanupInterval time.Duration CleanupConcurrency int BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required. TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". }
type BlocksCompactorFactory ¶ added in v1.8.0
type BlocksCompactorFactory func( ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer, ) (compact.Compactor, PlannerFactory, error)
BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
type BlocksGrouperFactory ¶ added in v1.8.0
type BlocksGrouperFactory func( ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ) compact.Grouper
BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
type Compactor ¶
Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
func NewCompactor ¶
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits Limits) (*Compactor, error)
NewCompactor makes a new Compactor.
func (*Compactor) RingHandler ¶ added in v0.7.0
func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)
type Config ¶
type Config struct { BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConsistencyDelay time.Duration `yaml:"consistency_delay"` DataDir string `yaml:"data_dir"` CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionRetries int `yaml:"compaction_retries"` CompactionConcurrency int `yaml:"compaction_concurrency"` CleanupInterval time.Duration `yaml:"cleanup_interval"` CleanupConcurrency int `yaml:"cleanup_concurrency"` DeletionDelay time.Duration `yaml:"deletion_delay"` TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` BlockFilesConcurrency int `yaml:"block_files_concurrency"` BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingStrategy string `yaml:"sharding_strategy"` ShardingRing RingConfig `yaml:"sharding_ring"` // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` // Block visit marker file config BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` // contains filtered or unexported fields }
Config holds the Compactor config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Compactor flags.
type ConfigProvider ¶ added in v1.8.0
type ConfigProvider interface { bucket.TenantConfigProvider CompactorBlocksRetentionPeriod(user string) time.Duration }
ConfigProvider defines the per-tenant config provider for the Compactor.
type LabelRemoverFilter ¶ added in v1.2.0
type LabelRemoverFilter struct {
// contains filtered or unexported fields
}
func NewLabelRemoverFilter ¶ added in v1.2.0
func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter
NewLabelRemoverFilter creates a LabelRemoverFilter.
func (*LabelRemoverFilter) Filter ¶ added in v1.2.0
func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec, _ block.GaugeVec) error
Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.
type PlannerFactory ¶ added in v1.13.0
type PlannerFactory func( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ) compact.Planner
type RingConfig ¶ added in v0.7.0
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` ObservePeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶ added in v0.7.0
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶ added in v0.7.0
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig
ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.
type ShuffleShardingGrouper ¶ added in v1.13.0
type ShuffleShardingGrouper struct {
// contains filtered or unexported fields
}
func NewShuffleShardingGrouper ¶ added in v1.13.0
func NewShuffleShardingGrouper( ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, ringLifecyclerID string, limits Limits, userID string, blockFilesConcurrency int, blocksFetchConcurrency int, compactionConcurrency int, blockVisitMarkerTimeout time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ) *ShuffleShardingGrouper
type ShuffleShardingPlanner ¶ added in v1.13.0
type ShuffleShardingPlanner struct {
// contains filtered or unexported fields
}
func NewShuffleShardingPlanner ¶ added in v1.13.0
func NewShuffleShardingPlanner( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ringLifecyclerID string, blockVisitMarkerTimeout time.Duration, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ) *ShuffleShardingPlanner