Documentation ¶
Index ¶
- Constants
- Variables
- func GetCleanerVisitMarkerFilePath() string
- func IsBlockVisitMarker(path string) bool
- func IsNotBlockVisitMarkerError(err error) bool
- func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, ...) error
- type BlockVisitMarker
- type BlocksCleaner
- type BlocksCleanerConfig
- type BlocksCompactorFactory
- type BlocksGrouperFactory
- type CleanerVisitMarker
- func (b *CleanerVisitMarker) GetStatus() VisitStatus
- func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string
- func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool
- func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool
- func (b *CleanerVisitMarker) String() string
- func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)
- type Compactor
- type Config
- type ConfigProvider
- type LabelRemoverFilter
- type Limits
- type PlannerFactory
- type RingConfig
- type ShuffleShardingGrouper
- type ShuffleShardingPlanner
- type VisitMarker
- type VisitMarkerManager
- func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)
- func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, ...)
- func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)
- func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error
- type VisitStatus
Constants ¶
const ( // BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit. BlockVisitMarkerFile = "visit-mark.json" // VisitMarkerVersion1 is the current supported version of visit-mark file. VisitMarkerVersion1 = 1 )
const ( // CleanerVisitMarkerName is the name of cleaner visit marker file. CleanerVisitMarkerName = "cleaner-visit-marker.json" // CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file. CleanerVisitMarkerVersion1 = 1 )
Variables ¶
var ( ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found") ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON") ErrorNotBlockVisitMarker = errors.New("file is not block visit marker") )
var ( RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouperWithMetrics( logger, bkt, cfg.AcceptMalformedIndex, true, compactorMetrics.compactions, compactorMetrics.compactionRunsStarted, compactorMetrics.compactionRunsCompleted, compactorMetrics.compactionFailures, compactorMetrics.verticalCompactions, syncerMetrics.BlocksMarkedForDeletion, syncerMetrics.GarbageCollectedBlocks, blocksMarkedForNoCompaction, metadata.NoneFunc, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency) } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, bkt, cfg.AcceptMalformedIndex, true, blocksMarkedForNoCompaction, metadata.NoneFunc, syncerMetrics, compactorMetrics, cfg, ring, ringLifecycle.Addr, ringLifecycle.ID, limits, userID, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency, cfg.CompactionConcurrency, cfg.BlockVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, noCompactionMarkFilter.NoCompactMarkedBlocks) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } return compactor, plannerFactory, nil } ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } return compactor, plannerFactory, nil } )
Functions ¶
func GetCleanerVisitMarkerFilePath ¶ added in v1.18.0
func GetCleanerVisitMarkerFilePath() string
func IsBlockVisitMarker ¶ added in v1.16.0
func IsNotBlockVisitMarkerError ¶ added in v1.16.0
Types ¶
type BlockVisitMarker ¶ added in v1.14.0
type BlockVisitMarker struct { CompactorID string `json:"compactorID"` // VisitTime is a unix timestamp of when the block was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` }
func ReadBlockVisitMarker ¶ added in v1.14.0
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)
type BlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner( cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, ringLifecyclerID string, reg prometheus.Registerer, cleanerVisitMarkerTimeout time.Duration, cleanerVisitMarkerFileUpdateInterval time.Duration, blocksMarkedForDeletion *prometheus.CounterVec, ) *BlocksCleaner
type BlocksCleanerConfig ¶ added in v1.2.0
type BlocksCleanerConfig struct { DeletionDelay time.Duration CleanupInterval time.Duration CleanupConcurrency int BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required. TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". }
type BlocksCompactorFactory ¶ added in v1.8.0
type BlocksCompactorFactory func( ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer, ) (compact.Compactor, PlannerFactory, error)
BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
type BlocksGrouperFactory ¶ added in v1.8.0
type BlocksGrouperFactory func( ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompact prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ) compact.Grouper
BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
type CleanerVisitMarker ¶ added in v1.18.0
type CleanerVisitMarker struct { CompactorID string `json:"compactorID"` Status VisitStatus `json:"status"` // VisitTime is a unix timestamp of when the partition was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` }
func NewCleanerVisitMarker ¶ added in v1.18.0
func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker
func (*CleanerVisitMarker) GetStatus ¶ added in v1.18.0
func (b *CleanerVisitMarker) GetStatus() VisitStatus
func (*CleanerVisitMarker) GetVisitMarkerFilePath ¶ added in v1.18.0
func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string
func (*CleanerVisitMarker) IsExpired ¶ added in v1.18.0
func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool
func (*CleanerVisitMarker) IsVisited ¶ added in v1.18.0
func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool
func (*CleanerVisitMarker) String ¶ added in v1.18.0
func (b *CleanerVisitMarker) String() string
func (*CleanerVisitMarker) UpdateStatus ¶ added in v1.18.0
func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)
type Compactor ¶
type Compactor struct { services.Service // Metrics. CompactorStartDurationSeconds prometheus.Gauge CompactionRunsStarted prometheus.Counter CompactionRunsInterrupted prometheus.Counter CompactionRunsCompleted prometheus.Counter CompactionRunsFailed prometheus.Counter CompactionRunsLastSuccess prometheus.Gauge CompactionRunDiscoveredTenants prometheus.Gauge CompactionRunSkippedTenants prometheus.Gauge CompactionRunSucceededTenants prometheus.Gauge CompactionRunFailedTenants prometheus.Gauge CompactionRunInterval prometheus.Gauge BlocksMarkedForNoCompaction prometheus.Counter // contains filtered or unexported fields }
Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
func NewCompactor ¶
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error)
NewCompactor makes a new Compactor.
func (*Compactor) RingHandler ¶ added in v0.7.0
func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)
type Config ¶
type Config struct { BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConsistencyDelay time.Duration `yaml:"consistency_delay"` DataDir string `yaml:"data_dir"` CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionRetries int `yaml:"compaction_retries"` CompactionConcurrency int `yaml:"compaction_concurrency"` CleanupInterval time.Duration `yaml:"cleanup_interval"` CleanupConcurrency int `yaml:"cleanup_concurrency"` DeletionDelay time.Duration `yaml:"deletion_delay"` TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` BlockFilesConcurrency int `yaml:"block_files_concurrency"` BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingStrategy string `yaml:"sharding_strategy"` ShardingRing RingConfig `yaml:"sharding_ring"` // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` // Block visit marker file config BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` // Cleaner visit marker file config CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"` CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"` AcceptMalformedIndex bool `yaml:"accept_malformed_index"` CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` // contains filtered or unexported fields }
Config holds the Compactor config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Compactor flags.
type ConfigProvider ¶ added in v1.8.0
type ConfigProvider interface { bucket.TenantConfigProvider CompactorBlocksRetentionPeriod(user string) time.Duration }
ConfigProvider defines the per-tenant config provider for the Compactor.
type LabelRemoverFilter ¶ added in v1.2.0
type LabelRemoverFilter struct {
// contains filtered or unexported fields
}
func NewLabelRemoverFilter ¶ added in v1.2.0
func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter
NewLabelRemoverFilter creates a LabelRemoverFilter.
func (*LabelRemoverFilter) Filter ¶ added in v1.2.0
func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec, _ block.GaugeVec) error
Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.
type PlannerFactory ¶ added in v1.13.0
type PlannerFactory func( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics, ) compact.Planner
type RingConfig ¶ added in v0.7.0
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` TokensFilePath string `yaml:"tokens_file_path"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` // Injected internally ListenPort int `yaml:"-"` WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` ObservePeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶ added in v0.7.0
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶ added in v0.7.0
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig
ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.
type ShuffleShardingGrouper ¶ added in v1.13.0
type ShuffleShardingGrouper struct {
// contains filtered or unexported fields
}
func NewShuffleShardingGrouper ¶ added in v1.13.0
func NewShuffleShardingGrouper( ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, ringLifecyclerID string, limits Limits, userID string, blockFilesConcurrency int, blocksFetchConcurrency int, compactionConcurrency int, blockVisitMarkerTimeout time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ) *ShuffleShardingGrouper
type ShuffleShardingPlanner ¶ added in v1.13.0
type ShuffleShardingPlanner struct {
// contains filtered or unexported fields
}
func NewShuffleShardingPlanner ¶ added in v1.13.0
func NewShuffleShardingPlanner( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ringLifecyclerID string, blockVisitMarkerTimeout time.Duration, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ) *ShuffleShardingPlanner
type VisitMarker ¶ added in v1.18.0
type VisitMarker interface { GetVisitMarkerFilePath() string UpdateStatus(ownerIdentifier string, status VisitStatus) GetStatus() VisitStatus IsExpired(visitMarkerTimeout time.Duration) bool String() string }
type VisitMarkerManager ¶ added in v1.18.0
type VisitMarkerManager struct {
// contains filtered or unexported fields
}
func NewVisitMarkerManager ¶ added in v1.18.0
func NewVisitMarkerManager( bkt objstore.InstrumentedBucket, logger log.Logger, ownerIdentifier string, visitMarker VisitMarker, ) *VisitMarkerManager
func (*VisitMarkerManager) DeleteVisitMarker ¶ added in v1.18.0
func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)
func (*VisitMarkerManager) MarkWithStatus ¶ added in v1.18.0
func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)
func (*VisitMarkerManager) ReadVisitMarker ¶ added in v1.18.0
func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error
type VisitStatus ¶ added in v1.18.0
type VisitStatus string
const ( Pending VisitStatus = "pending" InProgress VisitStatus = "inProgress" Completed VisitStatus = "completed" Failed VisitStatus = "failed" )