compactor

package
v1.17.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2024 License: Apache-2.0 Imports: 47 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
	BlockVisitMarkerFile = "visit-mark.json"
	// VisitMarkerVersion1 is the current supported version of visit-mark file.
	VisitMarkerVersion1 = 1
)

Variables

View Source
var (
	ErrorBlockVisitMarkerNotFound  = errors.New("block visit marker not found")
	ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
	ErrorNotBlockVisitMarker       = errors.New("file is not block visit marker")
)
View Source
var (
	RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
		return compact.NewDefaultGrouper(
			logger,
			bkt,
			cfg.AcceptMalformedIndex,
			true,
			reg,
			blocksMarkedForDeletion,
			garbageCollectedBlocks,
			blocksMarkedForNoCompaction,
			metadata.NoneFunc,
			cfg.BlockFilesConcurrency,
			cfg.BlocksFetchConcurrency)
	}

	ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
		return NewShuffleShardingGrouper(
			ctx,
			logger,
			bkt,
			cfg.AcceptMalformedIndex,
			true,
			reg,
			blocksMarkedForDeletion,
			blocksMarkedForNoCompaction,
			garbageCollectedBlocks,
			remainingPlannedCompactions,
			metadata.NoneFunc,
			cfg,
			ring,
			ringLifecycle.Addr,
			ringLifecycle.ID,
			limits,
			userID,
			cfg.BlockFilesConcurrency,
			cfg.BlocksFetchConcurrency,
			cfg.CompactionConcurrency,
			cfg.BlockVisitMarkerTimeout,
			blockVisitMarkerReadFailed,
			blockVisitMarkerWriteFailed,
			noCompactionMarkFilter.NoCompactMarkedBlocks)
	}

	DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
			return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
		}

		return compactor, plannerFactory, nil
	}

	ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {

			return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
		}
		return compactor, plannerFactory, nil
	}
)

Functions

func IsBlockVisitMarker added in v1.16.0

func IsBlockVisitMarker(path string) bool

func IsNotBlockVisitMarkerError added in v1.16.0

func IsNotBlockVisitMarkerError(err error) bool

func UpdateBlockVisitMarker added in v1.14.0

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error

Types

type BlockVisitMarker added in v1.14.0

type BlockVisitMarker struct {
	CompactorID string `json:"compactorID"`
	// VisitTime is a unix timestamp of when the block was visited (mark updated).
	VisitTime int64 `json:"visitTime"`
	// Version of the file.
	Version int `json:"version"`
}

func ReadBlockVisitMarker added in v1.14.0

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)

type BlocksCleaner added in v1.2.0

type BlocksCleaner struct {
	services.Service
	// contains filtered or unexported fields
}

func NewBlocksCleaner added in v1.2.0

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner

type BlocksCleanerConfig added in v1.2.0

type BlocksCleanerConfig struct {
	DeletionDelay                      time.Duration
	CleanupInterval                    time.Duration
	CleanupConcurrency                 int
	BlockDeletionMarksMigrationEnabled bool          // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
	TenantCleanupDelay                 time.Duration // Delay before removing tenant deletion mark and "debug".
}

type BlocksCompactorFactory added in v1.8.0

type BlocksCompactorFactory func(
	ctx context.Context,
	cfg Config,
	logger log.Logger,
	reg prometheus.Registerer,
) (compact.Compactor, PlannerFactory, error)

BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.

type BlocksGrouperFactory added in v1.8.0

type BlocksGrouperFactory func(
	ctx context.Context,
	cfg Config,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	reg prometheus.Registerer,
	blocksMarkedForDeletion prometheus.Counter,
	blocksMarkedForNoCompact prometheus.Counter,
	garbageCollectedBlocks prometheus.Counter,
	remainingPlannedCompactions prometheus.Gauge,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	ring *ring.Ring,
	ringLifecycler *ring.Lifecycler,
	limit Limits,
	userID string,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
) compact.Grouper

BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.

type Compactor

type Compactor struct {
	services.Service

	// Metrics.
	CompactorStartDurationSeconds prometheus.Gauge
	// contains filtered or unexported fields
}

Compactor is a multi-tenant TSDB blocks compactor based on Thanos.

func NewCompactor

func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error)

NewCompactor makes a new Compactor.

func (*Compactor) RingHandler added in v0.7.0

func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)

type Config

type Config struct {
	BlockRanges                           cortex_tsdb.DurationList `yaml:"block_ranges"`
	BlockSyncConcurrency                  int                      `yaml:"block_sync_concurrency"`
	MetaSyncConcurrency                   int                      `yaml:"meta_sync_concurrency"`
	ConsistencyDelay                      time.Duration            `yaml:"consistency_delay"`
	DataDir                               string                   `yaml:"data_dir"`
	CompactionInterval                    time.Duration            `yaml:"compaction_interval"`
	CompactionRetries                     int                      `yaml:"compaction_retries"`
	CompactionConcurrency                 int                      `yaml:"compaction_concurrency"`
	CleanupInterval                       time.Duration            `yaml:"cleanup_interval"`
	CleanupConcurrency                    int                      `yaml:"cleanup_concurrency"`
	DeletionDelay                         time.Duration            `yaml:"deletion_delay"`
	TenantCleanupDelay                    time.Duration            `yaml:"tenant_cleanup_delay"`
	SkipBlocksWithOutOfOrderChunksEnabled bool                     `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
	BlockFilesConcurrency                 int                      `yaml:"block_files_concurrency"`
	BlocksFetchConcurrency                int                      `yaml:"blocks_fetch_concurrency"`

	// Whether the migration of block deletion marks to the global markers location is enabled.
	BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

	// Compactors sharding.
	ShardingEnabled  bool       `yaml:"sharding_enabled"`
	ShardingStrategy string     `yaml:"sharding_strategy"`
	ShardingRing     RingConfig `yaml:"sharding_ring"`

	// Allow downstream projects to customise the blocks compactor.
	BlocksGrouperFactory   BlocksGrouperFactory   `yaml:"-"`
	BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

	// Block visit marker file config
	BlockVisitMarkerTimeout            time.Duration `yaml:"block_visit_marker_timeout"`
	BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

	AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
	CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
	// contains filtered or unexported fields
}

Config holds the Compactor config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the Compactor flags.

func (*Config) Validate added in v1.6.0

func (cfg *Config) Validate(limits validation.Limits) error

type ConfigProvider added in v1.8.0

type ConfigProvider interface {
	bucket.TenantConfigProvider
	CompactorBlocksRetentionPeriod(user string) time.Duration
}

ConfigProvider defines the per-tenant config provider for the Compactor.

type LabelRemoverFilter added in v1.2.0

type LabelRemoverFilter struct {
	// contains filtered or unexported fields
}

func NewLabelRemoverFilter added in v1.2.0

func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter

NewLabelRemoverFilter creates a LabelRemoverFilter.

func (*LabelRemoverFilter) Filter added in v1.2.0

Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.

type Limits added in v1.13.0

type Limits interface {
	CompactorTenantShardSize(userID string) int
}

Limits defines limits used by the Compactor.

type PlannerFactory added in v1.13.0

type PlannerFactory func(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	cfg Config,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
	ringLifecycle *ring.Lifecycler,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
) compact.Planner

type RingConfig added in v0.7.0

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	TokensFilePath         string   `yaml:"tokens_file_path"`
	UnregisterOnShutdown   bool     `yaml:"unregister_on_shutdown"`

	// Injected internally
	ListenPort int `yaml:"-"`

	WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

	ObservePeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags added in v0.7.0

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig added in v0.7.0

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.

type ShuffleShardingGrouper added in v1.13.0

type ShuffleShardingGrouper struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingGrouper added in v1.13.0

func NewShuffleShardingGrouper(
	ctx context.Context,
	logger log.Logger,
	bkt objstore.InstrumentedBucket,
	acceptMalformedIndex bool,
	enableVerticalCompaction bool,
	reg prometheus.Registerer,
	blocksMarkedForDeletion prometheus.Counter,
	blocksMarkedForNoCompact prometheus.Counter,
	garbageCollectedBlocks prometheus.Counter,
	remainingPlannedCompactions prometheus.Gauge,
	hashFunc metadata.HashFunc,
	compactorCfg Config,
	ring ring.ReadRing,
	ringLifecyclerAddr string,
	ringLifecyclerID string,
	limits Limits,
	userID string,
	blockFilesConcurrency int,
	blocksFetchConcurrency int,
	compactionConcurrency int,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
) *ShuffleShardingGrouper

func (*ShuffleShardingGrouper) Groups added in v1.13.0

func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error)

Groups function modified from https://github.com/cortexproject/cortex/pull/2616

type ShuffleShardingPlanner added in v1.13.0

type ShuffleShardingPlanner struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingPlanner added in v1.13.0

func NewShuffleShardingPlanner(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ranges []int64,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
	ringLifecyclerID string,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerFileUpdateInterval time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
) *ShuffleShardingPlanner

func (*ShuffleShardingPlanner) Plan added in v1.13.0

func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL